97 lines
3.0 KiB
Python
97 lines
3.0 KiB
Python
"""
|
|
数据处理 WideETL
|
|
"""
|
|
|
|
import time
|
|
import traceback
|
|
|
|
from Utils.ErrorUtil import APIReturnError
|
|
from Utils.ObjUtil import SpecObject
|
|
from Utils.ValidateUtil import ValidateAttr
|
|
from DBHelper.MongoHelperInstance import DB_GUA
|
|
|
|
|
|
JOB_STATUS = ["正常", "异常", "解决"]
|
|
JOB_TYPES = ["处理准备", "初始数据", "更新数据", "删除数据"]
|
|
|
|
|
|
class DataProcess(object):
|
|
"""数据处理抽象类"""
|
|
|
|
source = dict()
|
|
|
|
def __init__(self, target):
|
|
self.target = target
|
|
|
|
def extract(self):
|
|
"""抽取数据"""
|
|
raise NotImplementedError
|
|
|
|
def transform(self):
|
|
"""转换数据"""
|
|
raise NotImplementedError
|
|
|
|
def load(self):
|
|
"""加载数据"""
|
|
raise NotImplementedError
|
|
|
|
def start(self):
|
|
self.extract()
|
|
self.transform()
|
|
self.load()
|
|
|
|
|
|
class DataJob(SpecObject):
|
|
"""数据处理任务"""
|
|
|
|
title = ValidateAttr(field="title", type=str)
|
|
job_type = ValidateAttr(field="job_type", in_list=JOB_TYPES)
|
|
status = ValidateAttr(field="status", in_list=JOB_STATUS)
|
|
detail = ValidateAttr(field="detail", type=str)
|
|
|
|
fields_map = {
|
|
"title": "数据任务名称",
|
|
"job_type": "数据任务类型",
|
|
"status": "数据任务状态",
|
|
"detail": "数据任务详情",
|
|
"log_time": "任务处理时间"
|
|
}
|
|
|
|
def __init__(self, instance, title, job_type, status, detail):
|
|
self.instance = instance
|
|
self.title = title
|
|
self.job_type = job_type
|
|
self.status = status
|
|
self.detail = detail
|
|
self.log_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
|
|
self.processes = list()
|
|
|
|
def __process(self):
|
|
"""数据处理"""
|
|
for process in self.processes:
|
|
try:
|
|
process_instance = process(target=self.instance)
|
|
process_instance.start()
|
|
except AssertionError as e:
|
|
append_detail = '{}; '.format(e.__str__())
|
|
self.detail = self.detail + append_detail
|
|
self.__save_data_job_log()
|
|
raise APIReturnError(error_info=e.__str__(), status_code=202)
|
|
except Exception as e:
|
|
template = "数据处理流程异常, 报错流程->{}, 异常类型->{}, 错误描述->{}, 异常追溯->{}; "
|
|
error_detail = template.format(process.__name__, type(e).__name__, e.__str__(), traceback.format_exc())
|
|
self.status = "异常"
|
|
self.detail = self.detail + error_detail
|
|
self.__save_data_job_log()
|
|
raise APIReturnError(error_info="数据处理失败", status_code=202)
|
|
|
|
def __save_data_job_log(self):
|
|
"""保存数据任务日志"""
|
|
data = self.fields_toggle(fields=["title", "job_type", "status", "detail", "log_time"])
|
|
DB_GUA.insert_single_data("日志", "数据处理任务日志", data)
|
|
|
|
def start(self):
|
|
"""开始处理任务"""
|
|
self.__process()
|
|
self.__save_data_job_log()
|