""" 数据处理 WideETL """ import time import traceback from Utils.ErrorUtil import APIReturnError from Utils.ObjUtil import SpecObject from Utils.ValidateUtil import ValidateAttr from DBHelper.MongoHelperInstance import DB_TFSE JOB_STATUS = ["正常", "异常", "解决"] JOB_TYPES = ["处理准备", "初始数据", "更新数据", "删除数据"] class DataProcess(object): """数据处理抽象类""" source = dict() def __init__(self, target): self.target = target def extract(self): """抽取数据""" raise NotImplementedError def transform(self): """转换数据""" raise NotImplementedError def load(self): """加载数据""" raise NotImplementedError def start(self): self.extract() self.transform() self.load() class DataJob(SpecObject): """数据处理任务""" title = ValidateAttr(field="title", type=str) job_type = ValidateAttr(field="job_type", in_list=JOB_TYPES) status = ValidateAttr(field="status", in_list=JOB_STATUS) detail = ValidateAttr(field="detail", type=str) fields_map = { "title": "数据任务名称", "job_type": "数据任务类型", "status": "数据任务状态", "detail": "数据任务详情", "log_time": "任务处理时间" } def __init__(self, instance, title, job_type, status, detail): self.instance = instance self.title = title self.job_type = job_type self.status = status self.detail = detail self.log_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) self.processes = list() def __process(self): """数据处理""" for process in self.processes: try: process_instance = process(target=self.instance) process_instance.start() except AssertionError as e: append_detail = '{}; '.format(e.__str__()) self.detail = self.detail + append_detail self.__save_data_job_log() raise APIReturnError(error_info=e.__str__(), status_code=202) except Exception as e: template = "数据处理流程异常, 报错流程->{}, 异常类型->{}, 错误描述->{}, 异常追溯->{}; " error_detail = template.format(process.__name__, type(e).__name__, e.__str__(), traceback.format_exc()) self.status = "异常" self.detail = self.detail + error_detail self.__save_data_job_log() raise APIReturnError(error_info="数据处理失败", status_code=202) def __save_data_job_log(self): """保存数据任务日志""" data = self.dict_keys_toggle(columns=["数据任务名称", "数据任务类型", "数据任务状态", "数据任务详情", "任务处理时间"]) DB_TFSE.insert_single_data("日志", "数据处理任务日志", data) def start(self): """开始处理任务""" self.__process() self.__save_data_job_log()