guarantee-admin-api-v0.2/Modules/ETL/GuaranteeDataJob.py

487 lines
22 KiB
Python
Raw Normal View History

import copy
import json
import time
from datetime import datetime
import requests
from DBHelper.MongoHelperInstance import DB_TYC, DB_GUA
2022-06-13 17:07:04 +08:00
from Modules.Company.CompanyObject import RiskInfo, BusinessInfo
2022-06-09 11:22:56 +08:00
from Modules.ETL.WideETL import DataProcess, DataJob
from Utils.ErrorUtil import JustThrowError
from Utils.ObjUtil import SpecObject
2022-06-09 11:22:56 +08:00
class DragRiskInfoFromTYC(DataProcess):
def extract(self):
2022-06-13 17:07:04 +08:00
url = "http://api.fecribd.com/api/tyc/guarantee_risk_info"
headers = {'token': self.target.token}
2022-06-13 17:07:04 +08:00
data = {"企业名称": self.target.company_name, "数据范围": [], "时间范围": 30}
res = requests.post(url=url, headers=headers, data=json.dumps(data))
if res.status_code == 200:
source_dict = dict()
def dishonest_quantity():
dishonest = DB_TYC.find_single_data(
"司法风险",
"失信人",
{"企业名称": self.target.company_name},
['失信人']
)
total = 0
if dishonest['失信人']:
for item in dishonest['失信人']:
begin_date = datetime.strptime(item['publishdate'], '%Y-%m-%d').year
curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year
minus_date = curr_date - begin_date
if 0 <= minus_date <= 3:
total += 1
return total
def executed_quantity():
executed = DB_TYC.find_single_data(
"司法风险",
"被执行人",
{"企业名称": self.target.company_name},
['被执行人']
)
total = 0
if executed['被执行人']:
for item in executed['被执行人']:
begin_date = datetime.strptime(item['caseCreateTime'], '%Y-%m-%d').year
curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year
minus_date = curr_date - begin_date
if 0 <= minus_date <= 3:
total += 1
return total
2022-06-13 17:07:04 +08:00
def legal_action_quantity():
legal_action = DB_TYC.find_single_data(
"司法风险",
"诉讼",
{"企业名称": self.target.company_name},
['诉讼']
)
total = 0
if legal_action:
for item in legal_action['诉讼']:
try:
begin_date = datetime.strptime(item['submittime'], '%Y-%m-%d').year
curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year
minus_date = curr_date - begin_date
condition = self.target.company_name in item['defendants']
if 0 <= minus_date <= 3 and condition:
total += 1
except ValueError:
continue
return total
def final_case_quantity():
final_case = DB_TYC.find_single_data(
"司法风险",
"终本案件",
{"企业名称": self.target.company_name},
['终本案件']
)
total = 0
if final_case['终本案件']:
for item in final_case['终本案件']['result']:
2022-06-13 17:07:04 +08:00
tuptime_newest = time.localtime(item['caseFinalTime'] / 1000)
begin_date = datetime.strptime(time.strftime("%Y-%m-%d", tuptime_newest), '%Y-%m-%d').year
curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year
minus_date = curr_date - begin_date
if 0 <= minus_date <= 3:
total += 1
return total
def consumption_restriction_quantity():
executed = DB_TYC.find_single_data(
"司法风险",
"限制消费令",
{"企业名称": self.target.company_name},
['限制消费令']
)
total = 0
if executed['限制消费令']:
for item in executed['限制消费令']['result']:
tuptime_newest = time.localtime(item['publishDate'] / 1000)
begin_date = datetime.strptime(time.strftime("%Y-%m-%d", tuptime_newest), '%Y-%m-%d').year
curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year
minus_date = curr_date - begin_date
if 0 <= minus_date <= 3:
total += 1
return total
def penalties_quantity():
penalties = DB_TYC.find_single_data(
"经营风险",
"行政处罚",
{"企业名称": self.target.company_name},
['行政处罚']
)
total = 0
if penalties['行政处罚']:
for item in penalties['行政处罚']['result']:
begin_date = datetime.strptime(item['decisionDate'], '%Y-%m-%d').year
curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year
minus_date = curr_date - begin_date
if 0 <= minus_date <= 3:
total += 1
return total
source_dict['失信被执行人'] = dishonest_quantity()
source_dict['被执行人'] = executed_quantity()
2022-06-13 17:07:04 +08:00
source_dict['法律诉讼(被告)'] = legal_action_quantity()
source_dict['终本案件'] = final_case_quantity()
source_dict['限制消费令'] = consumption_restriction_quantity()
source_dict['行政处罚'] = penalties_quantity()
self.source = source_dict
else:
raise JustThrowError(error_info="拉取风险信息失败")
2022-06-09 11:22:56 +08:00
def transform(self):
source = self.source
target = self.target
target.dishonest_executor = source['失信被执行人']
target.person_to_be_executed = source['被执行人']
target.final_case = source['终本案件']
target.consumption_restriction_order = source['限制消费令']
target.administrative_penalties = source['行政处罚']
target.legal_action = source['法律诉讼(被告)']
2022-06-09 11:22:56 +08:00
def load(self):
target = self.target
data = target.fields_toggle()
DB_GUA.upsert_single_data(
'企业数据',
'风险信息',
{"企业名称": data['企业名称'], "报告期": data['报告期']},
data
)
2022-06-09 11:22:56 +08:00
class RiskInfoDataJob(RiskInfo):
def drag(self):
"""拉去天眼查数据接口的企业风险信息"""
data_job = DataJob(
instance=self,
title="拉取企业风险信息",
job_type="处理准备",
status="正常",
detail="企业名称->{};".format(self.company_name)
2022-06-09 11:22:56 +08:00
)
data_job.processes = [
DragRiskInfoFromTYC
]
data_job.start()
2022-06-13 17:07:04 +08:00
class ProcessBusinessInfo(DataProcess):
"""经营信息"""
def extract(self):
2022-06-13 17:07:04 +08:00
data = DB_GUA.find_all_data(
'企业数据',
'担保数据',
{'企业ID': self.target.cid},
[]
)
if not data:
error_info = "担保数据异常; "
raise JustThrowError(error_info=error_info)
else:
self.source['担保信息'] = data
def transform(self):
source = self.source
target = self.target
2022-06-13 17:07:04 +08:00
target.customerinfo = list()
target.regional_distribution = list()
target.industry_distribution = list()
target.balance_distribution = list()
# 获取数据日期、行业、区域
2022-06-13 17:07:04 +08:00
years = list()
area = list()
industry = list()
2022-06-13 17:07:04 +08:00
for item in source['担保信息']:
years.append(item['担保开始日期'][:4])
years.append(item['担保结束日期'][:4])
try:
if item['地级市'] is not None:
area.append(item['地级市'])
except KeyError:
continue
try:
if item['行业'] is not None:
industry.append(item['行业'])
except KeyError:
continue
# 数据去重
years = [int(y) for y in list(set(years))]
years.sort()
years = [str(y) for y in list(range(years[0], years[-1] + 1))]
2022-06-13 17:07:04 +08:00
areas = list(set(area))
industrys = list(set(industry))
# 初始化数据返回格式(包含报告期、区域、行业)
2022-06-13 17:07:04 +08:00
regional_list = list()
industry_list = list()
balance_list = list()
2022-06-13 17:07:04 +08:00
for r in areas:
for year in years:
new_dict = dict()
new_dict['企业ID'] = target.cid
2022-06-13 17:07:04 +08:00
new_dict['地区'] = r
new_dict['报告期'] = year + '-12-31'
2022-06-13 17:07:04 +08:00
new_dict['年末在保余额'] = 0
new_dict['年末在保责任余额'] = 0
new_dict['当年新增在保余额'] = 0
new_dict['当年新增在保责任余额'] = 0
regional_list.append(new_dict)
for d in industrys:
for year in years:
new_dict = dict()
new_dict['企业ID'] = target.cid
new_dict['报告期'] = year + '-12-31'
new_dict['所属行业'] = d
new_dict['年末在保余额'] = 0
new_dict['年末在保责任余额'] = 0
new_dict['当年新增在保余额'] = 0
new_dict['当年新增在保责任余额'] = 0
industry_list.append(new_dict)
for b in years:
new_dict = dict()
insert_dict = dict()
insert_dict['担保户数'] = 0
insert_dict['担保笔数'] = 0
insert_dict['担保责任余额'] = 0
new_dict['企业ID'] = target.cid
new_dict['报告期'] = b + '-12-31'
new_dict['100万以下含100万'] = copy.deepcopy(insert_dict)
new_dict['500万以下含500万'] = copy.deepcopy(insert_dict)
new_dict['500-1000万含1000万'] = copy.deepcopy(insert_dict)
new_dict['1000-3000万含3000万'] = copy.deepcopy(insert_dict)
new_dict['3000-5000万含5000万'] = copy.deepcopy(insert_dict)
new_dict['5000-8000万含8000万'] = copy.deepcopy(insert_dict)
new_dict['8000-10000万含10000万'] = copy.deepcopy(insert_dict)
new_dict['10000万以上'] = copy.deepcopy(insert_dict)
new_dict['合计'] = 0
balance_list.append(new_dict)
# 遍历担保信息(赋值操作)
2022-06-13 17:07:04 +08:00
for item in source['担保信息']:
# 客户信息
customer = target.CustomerInfo()
customer.cid = target.cid
customer.report_period = item['担保开始日期'][:4] + '-12-31'
2022-06-13 17:07:04 +08:00
customer.client_name = item['被担保人']
customer.guarantee_type = item['担保类型']
customer.balance_under_insurance = item['担保余额(万元)']
customer.financing_guarantee_balance = item['责任担保余额(万元)']
2022-06-13 17:07:04 +08:00
customer.guarantee_start_date = item['担保开始日期']
customer.guarantee_end_date = item['担保结束日期']
target.customerinfo.append(customer)
# 计算担保起始年份
begin_index = years.index(item['担保开始日期'][:4])
end_index = years.index(item['担保结束日期'][:4])
year_index = years[begin_index:end_index]
2022-06-13 17:07:04 +08:00
# 区域分布
for regional_ in regional_list:
if item['地级市'] == regional_['地区']:
for year_ in year_index:
# 报告期
if year_ == regional_['报告期'][:4]:
condition_01 = int(item['担保开始日期'][:4]) <= int(year_)
year_date = datetime.strptime(regional_['报告期'], '%Y-%m-%d')
current_date = datetime.strptime(item['担保结束日期'], '%Y-%m-%d')
condition_02 = (current_date - year_date).days > 0
condition_03 = item['担保开始日期'][:4] == year_
if condition_01 and condition_02:
# 年末余额开始日期应小于等于此年度结束日期应该小于此年度的12月31日
regional_['年末在保余额'] += item['担保余额(万元)']
regional_['年末在保责任余额'] += item['责任担保余额(万元)']
if condition_03:
regional_['当年新增在保余额'] += item['担保余额(万元)']
regional_['当年新增在保责任余额'] += item['责任担保余额(万元)']
# 行业分布
for industry_ in industry_list:
try:
if item['行业'] == industry_['所属行业']:
for year_ in year_index:
if year_ == industry_['报告期'][:4]:
condition_01 = int(item['担保开始日期'][:4]) <= int(year_)
year_date = datetime.strptime(industry_['报告期'], '%Y-%m-%d')
current_date = datetime.strptime(item['担保结束日期'], '%Y-%m-%d')
condition_02 = (current_date - year_date).days > 0
condition_03 = item['担保开始日期'][:4] == year_
if condition_01 and condition_02:
# 年末余额开始日期应小于等于此年度结束日期应该小于此年度的12月31日
industry_['年末在保余额'] += item['担保余额(万元)']
industry_['年末在保责任余额'] += item['责任担保余额(万元)']
if condition_03:
industry_['当年新增在保余额'] += item['担保余额(万元)']
industry_['当年新增在保责任余额'] += item['责任担保余额(万元)']
except KeyError:
continue
# 担保金额分布
for balance_ in balance_list:
for year_ in year_index:
if year_ == balance_['报告期'][:4]:
if item['担保余额(万元)'] <= 100:
balance_['100万以下含100万']['担保户数'] += 1
balance_['100万以下含100万']['担保笔数'] += 1
balance_['100万以下含100万']['担保责任余额'] += item['担保余额(万元)']
elif item['担保余额(万元)'] <= 500:
balance_['500万以下含500万']['担保户数'] += 1
balance_['500万以下含500万']['担保笔数'] += 1
balance_['500万以下含500万']['担保责任余额'] += item['担保余额(万元)']
elif item['担保余额(万元)'] <= 1000:
balance_['500-1000万含1000万']['担保户数'] += 1
balance_['500-1000万含1000万']['担保笔数'] += 1
balance_['500-1000万含1000万']['担保责任余额'] += item['担保余额(万元)']
elif item['担保余额(万元)'] <= 3000:
balance_['1000-3000万含3000万']['担保户数'] += 1
balance_['1000-3000万含3000万']['担保笔数'] += 1
balance_['1000-3000万含3000万']['担保责任余额'] += item['担保余额(万元)']
elif item['担保余额(万元)'] <= 5000:
balance_['3000-5000万含5000万']['担保户数'] += 1
balance_['3000-5000万含5000万']['担保笔数'] += 1
balance_['3000-5000万含5000万']['担保责任余额'] += item['担保余额(万元)']
elif item['担保余额(万元)'] <= 8000:
balance_['5000-8000万含8000万']['担保户数'] += 1
balance_['5000-8000万含8000万']['担保笔数'] += 1
balance_['5000-8000万含8000万']['担保责任余额'] += item['担保余额(万元)']
elif item['担保余额(万元)'] <= 10000:
balance_['8000-10000万含10000万']['担保户数'] += 1
balance_['8000-10000万含10000万']['担保笔数'] += 1
balance_['8000-10000万含10000万']['担保责任余额'] += item['担保余额(万元)']
elif item['担保余额(万元)'] > 10000:
balance_['10000万以上']['担保户数'] += 1
balance_['10000万以上']['担保笔数'] += 1
balance_['10000万以上']['担保责任余额'] += item['担保余额(万元)']
# 处理区域分布
for index in range(len(regional_list)):
regional = SpecObject.set_specify_instance(
instance=target.RegionalDistribution,
data=regional_list[index]
)
target.regional_distribution.append(regional)
# 处理行业分布
for index in range(len(industry_list)):
industry = SpecObject.set_specify_instance(
instance=target.IndustryDistribution,
data=industry_list[index]
)
target.industry_distribution.append(industry)
# 处理担保金额分布
for index in range(len(balance_list)):
balance = target.GuaranteedBalanceDistribution()
balance.report_period = balance_list[index]['报告期']
balance.cid = target.cid
balance.one_million = SpecObject.set_specify_instance(
instance=balance.GuaranteeDetail,
data=balance_list[index]['100万以下含100万']
)
balance.five_million = SpecObject.set_specify_instance(
instance=balance.GuaranteeDetail,
data=balance_list[index]['500万以下含500万']
)
balance.ten_million = SpecObject.set_specify_instance(
instance=balance.GuaranteeDetail,
data=balance_list[index]['500-1000万含1000万']
)
balance.thirty_million = SpecObject.set_specify_instance(
instance=balance.GuaranteeDetail,
data=balance_list[index]['1000-3000万含3000万']
)
balance.fifty_million = SpecObject.set_specify_instance(
instance=balance.GuaranteeDetail,
data=balance_list[index]['3000-5000万含5000万']
)
balance.eighty_million = SpecObject.set_specify_instance(
instance=balance.GuaranteeDetail,
data=balance_list[index]['5000-8000万含8000万']
)
balance.one_hundred_million = SpecObject.set_specify_instance(
instance=balance.GuaranteeDetail,
data=balance_list[index]['8000-10000万含10000万']
)
balance.more_than_one_hundred_million = SpecObject.set_specify_instance(
instance=balance.GuaranteeDetail,
data=balance_list[index]['10000万以上']
)
balance.total = sum([value['担保责任余额'] for value in balance_list[index].values() if isinstance(value, dict)])
target.balance_distribution.append(balance)
2022-06-13 17:07:04 +08:00
def load(self):
target = self.target
data = target.fields_toggle()
customer_data = data['客户信息']
regional_data = data['区域分布']
industry_data = data['行业分布']
amount_data = data['担保金额分布']
DB_GUA.insert_many_data(
'企业数据',
'担保-客户信息',
[item for item in customer_data]
)
DB_GUA.insert_many_data(
'企业数据',
'担保-区域分布',
[item for item in regional_data]
)
DB_GUA.insert_many_data(
'企业数据',
'担保-行业分布',
[item for item in industry_data]
)
DB_GUA.insert_many_data(
'企业数据',
'担保-担保金额分布',
[item for item in amount_data]
)
2022-06-13 17:07:04 +08:00
class BusinessInfoDataJob(BusinessInfo):
def create(self):
data_job = DataJob(
instance=self,
title="经营信息数据",
job_type="初始数据",
status="正常",
detail="企业名称->{}; 企业ID->{};".format(self.company_name, self.cid)
)
data_job.processes = [
ProcessBusinessInfo,
]
data_job.start()