From aa05c897d5c2f6b64968dfba23520d2a5137c209 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=80=9D=E5=B7=9D?= Date: Thu, 12 May 2022 17:04:38 +0800 Subject: [PATCH] Changes --- .../BasicBusinessInfoDataJob.py | 96 ++++++--------- .../CompanyDataAssembleDataJobImpl.py | 3 +- Implements/CompanyDataJob/__init__.py | 0 Implements/TYCDataJob/TYCDataJobImpl.py | 38 +----- Implements/__init__.py | 0 Objects/InProject/ETL/DataJob.py | 109 ++++++++++++------ Routes/__init__.py | 0 Utils/ObjUtil.py | 10 +- Utils/ValidateUtil.py | 3 +- 9 files changed, 129 insertions(+), 130 deletions(-) create mode 100644 Implements/CompanyDataJob/__init__.py create mode 100644 Implements/__init__.py create mode 100644 Routes/__init__.py diff --git a/Implements/CompanyDataJob/BasicBusinessInfoDataJob.py b/Implements/CompanyDataJob/BasicBusinessInfoDataJob.py index f5d441a..b65d80a 100644 --- a/Implements/CompanyDataJob/BasicBusinessInfoDataJob.py +++ b/Implements/CompanyDataJob/BasicBusinessInfoDataJob.py @@ -1,11 +1,23 @@ import time -from Objects.InProject.ETL.DataJob import DataJob from DBHelper.MongoHelperInstance import DB_TYC, DB_TFSE from Objects.Common.Company.FECRCompany import BasicBusinessInfo from Utils.ErrorUtil import LogConditionCheckFailed, ReturnConditionCheckFailed +def init_before_process(instance): + + is_existed = DB_TFSE.find_single_column( + "企业数据", + "基本工商信息", + {"企业ID": instance.cid}, + "企业ID" + ) + + if is_existed: + raise ReturnConditionCheckFailed(failed_info="已存在初始化数据", status_code=202) + + def init_basic_info(instance): data = DB_TYC.find_single_column( "公司背景", @@ -131,72 +143,38 @@ def init_main_members(instance): raise LogConditionCheckFailed(failed_info=failed_info, status_code=202) -class BasicBusinessInfoDataJob(BasicBusinessInfo, DataJob): - - fields_map = {**BasicBusinessInfo.fields_map, **DataJob.fields_map} - - def before_process(self, **kwargs): - pass +class BasicBusinessInfoDataJob(BasicBusinessInfo): def init_data(self): - self.data_job_title = "初始企业工商信息" - self.data_job_type = "初始数据" - self.data_job_status = "正常" - self.data_job_detail = "企业ID: 【{}】; 企业名称: 【{}】".format(self.cid, self.name) + # self.data_job_title = "初始企业工商信息" + # self.data_job_type = "初始数据" + # self.data_job_status = "正常" + # self.data_job_detail = "企业ID: 【{}】; 企业名称: 【{}】".format(self.cid, self.name) try: - is_existed = DB_TFSE.find_single_column( + + init_before_process(self) + init_basic_info(self) + init_share_holders(self) + init_main_members(self) + + DB_TFSE.insert_single_data( "企业数据", "基本工商信息", - {"企业ID": self.cid}, - "企业ID" + { + "企业ID": self.cid, + "企业名称": self.name, + "更新日期": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), + "工商信息": self.business_info.dict_keys_toggle(), + "股东信息": [item.dict_keys_toggle() for item in self.share_holder], + "主要成员": [item.dict_keys_toggle() for item in self.main_member] + } ) - if is_existed: - raise ReturnConditionCheckFailed(failed_info="已存在初始化数据", status_code=202) - else: - init_basic_info(self) - init_share_holders(self) - init_main_members(self) - - DB_TFSE.insert_single_data( - "企业数据", - "基本工商信息", - { - "企业ID": self.cid, - "企业名称": self.name, - "更新日期": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), - "工商信息": self.business_info.dict_keys_toggle(), - "股东信息": [item.dict_keys_toggle() for item in self.share_holder], - "主要成员": [item.dict_keys_toggle() for item in self.main_member] - } - ) - - self.save_data_job_log( - data_job_title="初始企业工商信息", - data_job_type="初始数据", - data_job_status="正常", - data_job_detail="企业ID: 【{}】; 企业名称: 【{}】".format(self.cid, self.name), - db_client="tfse_v0.21", - db="日志", - collection="数据处理任务日志" - ) + self.save_data_job_log() except LogConditionCheckFailed as e: - - self.save_data_job_log( - data_job_title="初始企业工商信息", - data_job_type="初始数据", - data_job_status="异常", - data_job_detail=e.failed_info, - db_client="tfse_v0.21", - db="日志", - collection="数据处理任务日志" - ) - - def update_data(self, **kwargs): - pass - - def delete_data(self, **kwargs): - pass + self.data_job_status = "异常" + self.data_job_detail = e.failed_info + self.save_data_job_log() diff --git a/Implements/CompanyDataJob/CompanyDataAssembleDataJobImpl.py b/Implements/CompanyDataJob/CompanyDataAssembleDataJobImpl.py index ad3f655..e8e7ce3 100644 --- a/Implements/CompanyDataJob/CompanyDataAssembleDataJobImpl.py +++ b/Implements/CompanyDataJob/CompanyDataAssembleDataJobImpl.py @@ -1,10 +1,9 @@ from Implements.TYCDataJob.TYCDataJobImpl import TYCDataJobImpl -from Objects.InProject.ETL.DataJob import DataJob from DBHelper.MongoHelper import MongoHelper from Objects.Common.Company.FECRCompany import BasicFinancialData, CompanyDataAssemble -class CompanyDataAssembleDataJobImpl(CompanyDataAssemble, DataJob): +class CompanyDataAssembleDataJobImpl(CompanyDataAssemble): """企业数据_更新汇总""" db_tfse = MongoHelper("tfse_v0.21") diff --git a/Implements/CompanyDataJob/__init__.py b/Implements/CompanyDataJob/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/Implements/TYCDataJob/TYCDataJobImpl.py b/Implements/TYCDataJob/TYCDataJobImpl.py index 7ccff75..33b3a3b 100644 --- a/Implements/TYCDataJob/TYCDataJobImpl.py +++ b/Implements/TYCDataJob/TYCDataJobImpl.py @@ -1,16 +1,16 @@ import json import requests -from Objects.InProject.ETL.DataJob import DataJob from Utils.ErrorUtil import ReturnConditionCheckFailed TOKEN = "uzdq51N4!I0%HY4sCaQ!aeCSIDIVIdAM" -class TYCDataJobImpl(DataJob): +class TYCDataJobImpl(object): """天眼查数据任务""" - def before_process(self, company_name): + @staticmethod + def before_process(company_name): url = "http://api.fecribd.com/api/tyc/drag_data" headers = {'token': TOKEN} @@ -18,36 +18,6 @@ class TYCDataJobImpl(DataJob): res = requests.post(url=url, headers=headers, data=json.dumps(data)) if res.status_code == 200: - - self.save_data_job_log( - data_job_title="拉取最新企业数据", - data_job_type="处理准备", - data_job_status="正常", - data_job_detail="企业名称: 【{}】".format(company_name), - db_client="tfse_v0.21", - db="日志", - collection="数据处理任务日志" - ) - + pass else: - - self.save_data_job_log( - data_job_title="拉取最新企业数据", - data_job_type="处理准备", - data_job_status="异常", - data_job_detail="企业名称: 【{}】".format(company_name), - db_client="tfse_v0.21", - db="日志", - collection="数据处理任务日志" - ) - raise ReturnConditionCheckFailed(failed_info="数据拉取失败", status_code=202) - - def init_data(self, **kwargs): - pass - - def update_data(self, **kwargs): - pass - - def delete_data(self, **kwargs): - pass diff --git a/Implements/__init__.py b/Implements/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/Objects/InProject/ETL/DataJob.py b/Objects/InProject/ETL/DataJob.py index 6d00ef6..576ab20 100644 --- a/Objects/InProject/ETL/DataJob.py +++ b/Objects/InProject/ETL/DataJob.py @@ -1,6 +1,7 @@ import time from DBHelper.MongoHelper import MongoHelper +from Utils.ErrorUtil import LogConditionCheckFailed from Utils.ObjUtil import SpecObject from Utils.ValidateUtil import ValidateAttr, Validate @@ -11,45 +12,87 @@ JOB_TYPES = ["处理准备", "初始数据", "更新数据", "删除数据"] class DataJob(SpecObject): """数据处理任务""" - data_job_title = ValidateAttr(field="data_job_title", type=str) - data_job_type = ValidateAttr(field="data_job_type", in_list=JOB_TYPES) - data_job_status = ValidateAttr(field="data_job_status", in_list=JOB_STATUS) - data_job_detail = ValidateAttr(field="data_job_detail", type=str) - data_job_log_time = ValidateAttr(field="data_job_log_time", func=Validate.time_format) + title = ValidateAttr(field="title", type=str) + job_type = ValidateAttr(field="job_type", in_list=JOB_TYPES) + status = ValidateAttr(field="status", in_list=JOB_STATUS) + detail = ValidateAttr(field="detail", type=str) + log_time = ValidateAttr(field="log_time", func=Validate.time_format) + + prepare_funcs = ValidateAttr(field="prepare_funcs", type=list) + process_funcs = ValidateAttr(field="process_funcs", type=list) + save_data_funcs = ValidateAttr(field="save_data_funcs", type=list) + + def __init__(self, title, job_type, status, detail): + self.title = title + self.job_type = job_type + self.status = status + self.detail = detail + self.log_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + self.prepare_funcs = list() + self.process_funcs = list() + self.save_data_funcs = list() fields_map = { - "data_job_title": "数据任务名称", - "data_job_type": "数据任务类型", - "data_job_status": "数据任务状态", - "data_job_detail": "数据任务详情", - "data_job_log_time": "任务处理时间" + "title": "数据任务名称", + "job_type": "数据任务类型", + "status": "数据任务状态", + "detail": "数据任务详情", + "log_time": "任务处理时间" } - def before_process(self, **kwargs): - """处理准备""" - raise NotImplementedError + def prepare(self): + """数据准备""" + for func in self.prepare_funcs: + try: + func() + except Exception as e: + self.detail = self.detail + '函数:【】 ; '.format(e.__str__()) - def init_data(self, **kwargs): - """初始数据""" - raise NotImplementedError + def process(self): + """数据处理""" + for func in self.process_funcs: + func() - def update_data(self, **kwargs): - """更新数据""" - raise NotImplementedError + def save_data(self): + """数据保存""" + for func in self.save_data_funcs: + func() - def delete_data(self, **kwargs): - """删除数据""" - raise NotImplementedError - - def save_data_job_log(self, **kwargs): + def save_data_job_log(self): """保存数据任务日志""" - - self.data_job_title = kwargs["data_job_title"] - self.data_job_type = kwargs["data_job_type"] - self.data_job_status = kwargs["data_job_status"] - self.data_job_detail = kwargs["data_job_detail"] - self.data_job_log_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) - data = self.dict_keys_toggle(columns=["数据任务名称", "数据任务类型", "数据任务状态", "数据任务详情", "任务处理时间"]) - db_client = MongoHelper(kwargs["db_client"]) - db_client.insert_single_data(kwargs["db"], kwargs["collection"], data) + db_client = MongoHelper("tfse_v0.21") + db_client.insert_single_data("日志", "数据处理任务日志", data) + + def start_job(self): + """开始处理任务""" + self.prepare() + self.process() + self.save_data() + self.save_data_job_log() + + +if __name__ == '__main__': + + data_job = DataJob( + title="测试", + job_type="处理准备", + status="正常", + detail="~~~~~~~~~" + ) + + def func01(): + print("func01") + + def func02(): + print("func02") + + def func03(): + print("func03") + + def func04(): + print("func04") + + data_job.process_funcs = [func01, func02, func03, func04] + + data_job.start_job() diff --git a/Routes/__init__.py b/Routes/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/Utils/ObjUtil.py b/Utils/ObjUtil.py index 4735001..932fac1 100644 --- a/Utils/ObjUtil.py +++ b/Utils/ObjUtil.py @@ -9,9 +9,13 @@ class SpecObject(object): """字典键值切换""" _dict_ = dict() default_types = ['str', 'int', 'float', 'dict', 'bool', 'tuple'] - for key in self.__dict__.keys(): + for key in self.fields_map.keys(): + + # 常规类型 if type(self.__dict__[key]).__name__ in default_types: _dict_[self.fields_map[key]] = self.__dict__[key] + + # 列表类型 elif type(self.__dict__[key]).__name__ == 'list': if len(self.__dict__[key]) == 0: _dict_[self.fields_map[key]] = self.__dict__[key] @@ -19,8 +23,12 @@ class SpecObject(object): _dict_[self.fields_map[key]] = self.__dict__[key] else: _dict_[self.fields_map[key]] = [item.dict_keys_toggle() for item in self.__dict__[key]] + + # 空值类型 elif self.__dict__[key] is None: _dict_[self.fields_map[key]] = self.__dict__[key] + + # 对象类型 else: _dict_[self.fields_map[key]] = self.__dict__[key].dict_keys_toggle() diff --git a/Utils/ValidateUtil.py b/Utils/ValidateUtil.py index ac8e416..58a9438 100644 --- a/Utils/ValidateUtil.py +++ b/Utils/ValidateUtil.py @@ -74,7 +74,8 @@ class ValidateAttr(object): def __set__(self, instance, value): try: if not self.info: - self.info = self.FIELD_ERROR_INFO.format(instance.fields_map[self.kwargs['field']]) + if instance.fields_map.__contains__(self.kwargs['field']): + self.info = self.FIELD_ERROR_INFO.format(instance.fields_map[self.kwargs['field']]) except AttributeError: raise ReturnConditionCheckFailed(self.FILED_MAP_ERROR, self.code)