This commit is contained in:
王思川 2022-05-12 17:04:38 +08:00
parent 3145e9237f
commit aa05c897d5
9 changed files with 129 additions and 130 deletions

View File

@ -1,11 +1,23 @@
import time
from Objects.InProject.ETL.DataJob import DataJob
from DBHelper.MongoHelperInstance import DB_TYC, DB_TFSE
from Objects.Common.Company.FECRCompany import BasicBusinessInfo
from Utils.ErrorUtil import LogConditionCheckFailed, ReturnConditionCheckFailed
def init_before_process(instance):
is_existed = DB_TFSE.find_single_column(
"企业数据",
"基本工商信息",
{"企业ID": instance.cid},
"企业ID"
)
if is_existed:
raise ReturnConditionCheckFailed(failed_info="已存在初始化数据", status_code=202)
def init_basic_info(instance):
data = DB_TYC.find_single_column(
"公司背景",
@ -131,72 +143,38 @@ def init_main_members(instance):
raise LogConditionCheckFailed(failed_info=failed_info, status_code=202)
class BasicBusinessInfoDataJob(BasicBusinessInfo, DataJob):
fields_map = {**BasicBusinessInfo.fields_map, **DataJob.fields_map}
def before_process(self, **kwargs):
pass
class BasicBusinessInfoDataJob(BasicBusinessInfo):
def init_data(self):
self.data_job_title = "初始企业工商信息"
self.data_job_type = "初始数据"
self.data_job_status = "正常"
self.data_job_detail = "企业ID: 【{}】; 企业名称: 【{}".format(self.cid, self.name)
# self.data_job_title = "初始企业工商信息"
# self.data_job_type = "初始数据"
# self.data_job_status = "正常"
# self.data_job_detail = "企业ID: 【{}】; 企业名称: 【{}】".format(self.cid, self.name)
try:
is_existed = DB_TFSE.find_single_column(
init_before_process(self)
init_basic_info(self)
init_share_holders(self)
init_main_members(self)
DB_TFSE.insert_single_data(
"企业数据",
"基本工商信息",
{"企业ID": self.cid},
"企业ID"
{
"企业ID": self.cid,
"企业名称": self.name,
"更新日期": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
"工商信息": self.business_info.dict_keys_toggle(),
"股东信息": [item.dict_keys_toggle() for item in self.share_holder],
"主要成员": [item.dict_keys_toggle() for item in self.main_member]
}
)
if is_existed:
raise ReturnConditionCheckFailed(failed_info="已存在初始化数据", status_code=202)
else:
init_basic_info(self)
init_share_holders(self)
init_main_members(self)
DB_TFSE.insert_single_data(
"企业数据",
"基本工商信息",
{
"企业ID": self.cid,
"企业名称": self.name,
"更新日期": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
"工商信息": self.business_info.dict_keys_toggle(),
"股东信息": [item.dict_keys_toggle() for item in self.share_holder],
"主要成员": [item.dict_keys_toggle() for item in self.main_member]
}
)
self.save_data_job_log(
data_job_title="初始企业工商信息",
data_job_type="初始数据",
data_job_status="正常",
data_job_detail="企业ID: 【{}】; 企业名称: 【{}".format(self.cid, self.name),
db_client="tfse_v0.21",
db="日志",
collection="数据处理任务日志"
)
self.save_data_job_log()
except LogConditionCheckFailed as e:
self.save_data_job_log(
data_job_title="初始企业工商信息",
data_job_type="初始数据",
data_job_status="异常",
data_job_detail=e.failed_info,
db_client="tfse_v0.21",
db="日志",
collection="数据处理任务日志"
)
def update_data(self, **kwargs):
pass
def delete_data(self, **kwargs):
pass
self.data_job_status = "异常"
self.data_job_detail = e.failed_info
self.save_data_job_log()

View File

@ -1,10 +1,9 @@
from Implements.TYCDataJob.TYCDataJobImpl import TYCDataJobImpl
from Objects.InProject.ETL.DataJob import DataJob
from DBHelper.MongoHelper import MongoHelper
from Objects.Common.Company.FECRCompany import BasicFinancialData, CompanyDataAssemble
class CompanyDataAssembleDataJobImpl(CompanyDataAssemble, DataJob):
class CompanyDataAssembleDataJobImpl(CompanyDataAssemble):
"""企业数据_更新汇总"""
db_tfse = MongoHelper("tfse_v0.21")

View File

View File

@ -1,16 +1,16 @@
import json
import requests
from Objects.InProject.ETL.DataJob import DataJob
from Utils.ErrorUtil import ReturnConditionCheckFailed
TOKEN = "uzdq51N4!I0%HY4sCaQ!aeCSIDIVIdAM"
class TYCDataJobImpl(DataJob):
class TYCDataJobImpl(object):
"""天眼查数据任务"""
def before_process(self, company_name):
@staticmethod
def before_process(company_name):
url = "http://api.fecribd.com/api/tyc/drag_data"
headers = {'token': TOKEN}
@ -18,36 +18,6 @@ class TYCDataJobImpl(DataJob):
res = requests.post(url=url, headers=headers, data=json.dumps(data))
if res.status_code == 200:
self.save_data_job_log(
data_job_title="拉取最新企业数据",
data_job_type="处理准备",
data_job_status="正常",
data_job_detail="企业名称: 【{}".format(company_name),
db_client="tfse_v0.21",
db="日志",
collection="数据处理任务日志"
)
pass
else:
self.save_data_job_log(
data_job_title="拉取最新企业数据",
data_job_type="处理准备",
data_job_status="异常",
data_job_detail="企业名称: 【{}".format(company_name),
db_client="tfse_v0.21",
db="日志",
collection="数据处理任务日志"
)
raise ReturnConditionCheckFailed(failed_info="数据拉取失败", status_code=202)
def init_data(self, **kwargs):
pass
def update_data(self, **kwargs):
pass
def delete_data(self, **kwargs):
pass

0
Implements/__init__.py Normal file
View File

View File

@ -1,6 +1,7 @@
import time
from DBHelper.MongoHelper import MongoHelper
from Utils.ErrorUtil import LogConditionCheckFailed
from Utils.ObjUtil import SpecObject
from Utils.ValidateUtil import ValidateAttr, Validate
@ -11,45 +12,87 @@ JOB_TYPES = ["处理准备", "初始数据", "更新数据", "删除数据"]
class DataJob(SpecObject):
"""数据处理任务"""
data_job_title = ValidateAttr(field="data_job_title", type=str)
data_job_type = ValidateAttr(field="data_job_type", in_list=JOB_TYPES)
data_job_status = ValidateAttr(field="data_job_status", in_list=JOB_STATUS)
data_job_detail = ValidateAttr(field="data_job_detail", type=str)
data_job_log_time = ValidateAttr(field="data_job_log_time", func=Validate.time_format)
title = ValidateAttr(field="title", type=str)
job_type = ValidateAttr(field="job_type", in_list=JOB_TYPES)
status = ValidateAttr(field="status", in_list=JOB_STATUS)
detail = ValidateAttr(field="detail", type=str)
log_time = ValidateAttr(field="log_time", func=Validate.time_format)
prepare_funcs = ValidateAttr(field="prepare_funcs", type=list)
process_funcs = ValidateAttr(field="process_funcs", type=list)
save_data_funcs = ValidateAttr(field="save_data_funcs", type=list)
def __init__(self, title, job_type, status, detail):
self.title = title
self.job_type = job_type
self.status = status
self.detail = detail
self.log_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.prepare_funcs = list()
self.process_funcs = list()
self.save_data_funcs = list()
fields_map = {
"data_job_title": "数据任务名称",
"data_job_type": "数据任务类型",
"data_job_status": "数据任务状态",
"data_job_detail": "数据任务详情",
"data_job_log_time": "任务处理时间"
"title": "数据任务名称",
"job_type": "数据任务类型",
"status": "数据任务状态",
"detail": "数据任务详情",
"log_time": "任务处理时间"
}
def before_process(self, **kwargs):
"""处理准备"""
raise NotImplementedError
def prepare(self):
"""数据准备"""
for func in self.prepare_funcs:
try:
func()
except Exception as e:
self.detail = self.detail + '函数:【】 ; '.format(e.__str__())
def init_data(self, **kwargs):
"""初始数据"""
raise NotImplementedError
def process(self):
"""数据处理"""
for func in self.process_funcs:
func()
def update_data(self, **kwargs):
"""更新数据"""
raise NotImplementedError
def save_data(self):
"""数据保存"""
for func in self.save_data_funcs:
func()
def delete_data(self, **kwargs):
"""删除数据"""
raise NotImplementedError
def save_data_job_log(self, **kwargs):
def save_data_job_log(self):
"""保存数据任务日志"""
self.data_job_title = kwargs["data_job_title"]
self.data_job_type = kwargs["data_job_type"]
self.data_job_status = kwargs["data_job_status"]
self.data_job_detail = kwargs["data_job_detail"]
self.data_job_log_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
data = self.dict_keys_toggle(columns=["数据任务名称", "数据任务类型", "数据任务状态", "数据任务详情", "任务处理时间"])
db_client = MongoHelper(kwargs["db_client"])
db_client.insert_single_data(kwargs["db"], kwargs["collection"], data)
db_client = MongoHelper("tfse_v0.21")
db_client.insert_single_data("日志", "数据处理任务日志", data)
def start_job(self):
"""开始处理任务"""
self.prepare()
self.process()
self.save_data()
self.save_data_job_log()
if __name__ == '__main__':
data_job = DataJob(
title="测试",
job_type="处理准备",
status="正常",
detail="~~~~~~~~~"
)
def func01():
print("func01")
def func02():
print("func02")
def func03():
print("func03")
def func04():
print("func04")
data_job.process_funcs = [func01, func02, func03, func04]
data_job.start_job()

0
Routes/__init__.py Normal file
View File

View File

@ -9,9 +9,13 @@ class SpecObject(object):
"""字典键值切换"""
_dict_ = dict()
default_types = ['str', 'int', 'float', 'dict', 'bool', 'tuple']
for key in self.__dict__.keys():
for key in self.fields_map.keys():
# 常规类型
if type(self.__dict__[key]).__name__ in default_types:
_dict_[self.fields_map[key]] = self.__dict__[key]
# 列表类型
elif type(self.__dict__[key]).__name__ == 'list':
if len(self.__dict__[key]) == 0:
_dict_[self.fields_map[key]] = self.__dict__[key]
@ -19,8 +23,12 @@ class SpecObject(object):
_dict_[self.fields_map[key]] = self.__dict__[key]
else:
_dict_[self.fields_map[key]] = [item.dict_keys_toggle() for item in self.__dict__[key]]
# 空值类型
elif self.__dict__[key] is None:
_dict_[self.fields_map[key]] = self.__dict__[key]
# 对象类型
else:
_dict_[self.fields_map[key]] = self.__dict__[key].dict_keys_toggle()

View File

@ -74,7 +74,8 @@ class ValidateAttr(object):
def __set__(self, instance, value):
try:
if not self.info:
self.info = self.FIELD_ERROR_INFO.format(instance.fields_map[self.kwargs['field']])
if instance.fields_map.__contains__(self.kwargs['field']):
self.info = self.FIELD_ERROR_INFO.format(instance.fields_map[self.kwargs['field']])
except AttributeError:
raise ReturnConditionCheckFailed(self.FILED_MAP_ERROR, self.code)