guarantee-admin-api-v0.2/Modules/ETL/WideETL.py

97 lines
3.0 KiB
Python

"""
数据处理 WideETL
"""
import time
import traceback
from Utils.ErrorUtil import APIReturnError
from Utils.ObjUtil import SpecObject
from Utils.ValidateUtil import ValidateAttr
from DBHelper.MongoHelperInstance import DB_GUA
JOB_STATUS = ["正常", "异常", "解决"]
JOB_TYPES = ["处理准备", "初始数据", "更新数据", "删除数据"]
class DataProcess(object):
"""数据处理抽象类"""
source = dict()
def __init__(self, target):
self.target = target
def extract(self):
"""抽取数据"""
raise NotImplementedError
def transform(self):
"""转换数据"""
raise NotImplementedError
def load(self):
"""加载数据"""
raise NotImplementedError
def start(self):
self.extract()
self.transform()
self.load()
class DataJob(SpecObject):
"""数据处理任务"""
title = ValidateAttr(field="title", type=str)
job_type = ValidateAttr(field="job_type", in_list=JOB_TYPES)
status = ValidateAttr(field="status", in_list=JOB_STATUS)
detail = ValidateAttr(field="detail", type=str)
fields_map = {
"title": "数据任务名称",
"job_type": "数据任务类型",
"status": "数据任务状态",
"detail": "数据任务详情",
"log_time": "任务处理时间"
}
def __init__(self, instance, title, job_type, status, detail):
self.instance = instance
self.title = title
self.job_type = job_type
self.status = status
self.detail = detail
self.log_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.processes = list()
def __process(self):
"""数据处理"""
for process in self.processes:
try:
process_instance = process(target=self.instance)
process_instance.start()
except AssertionError as e:
append_detail = '{}; '.format(e.__str__())
self.detail = self.detail + append_detail
self.__save_data_job_log()
raise APIReturnError(error_info=e.__str__(), status_code=202)
except Exception as e:
template = "数据处理流程异常, 报错流程->{}, 异常类型->{}, 错误描述->{}, 异常追溯->{}; "
error_detail = template.format(process.__name__, type(e).__name__, e.__str__(), traceback.format_exc())
self.status = "异常"
self.detail = self.detail + error_detail
self.__save_data_job_log()
raise APIReturnError(error_info="数据处理失败", status_code=202)
def __save_data_job_log(self):
"""保存数据任务日志"""
data = self.fields_toggle(fields=["title", "job_type", "status", "detail", "log_time"])
DB_GUA.insert_single_data("日志", "数据处理任务日志", data)
def start(self):
"""开始处理任务"""
self.__process()
self.__save_data_job_log()