guarantee-admin-api-v0.2/Modules/ETL/GuaranteeDataJob.py

487 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import copy
import json
import time
from datetime import datetime
import requests
from DBHelper.MongoHelperInstance import DB_TYC, DB_GUA
from Modules.Company.CompanyObject import RiskInfo, BusinessInfo
from Modules.ETL.WideETL import DataProcess, DataJob
from Utils.ErrorUtil import JustThrowError
from Utils.ObjUtil import SpecObject
class DragRiskInfoFromTYC(DataProcess):
def extract(self):
url = "http://api.fecribd.com/api/tyc/guarantee_risk_info"
headers = {'token': self.target.token}
data = {"企业名称": self.target.company_name, "数据范围": [], "时间范围": 30}
res = requests.post(url=url, headers=headers, data=json.dumps(data))
if res.status_code == 200:
source_dict = dict()
def dishonest_quantity():
dishonest = DB_TYC.find_single_data(
"司法风险",
"失信人",
{"企业名称": self.target.company_name},
['失信人']
)
total = 0
if dishonest['失信人']:
for item in dishonest['失信人']:
begin_date = datetime.strptime(item['publishdate'], '%Y-%m-%d').year
curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year
minus_date = curr_date - begin_date
if 0 <= minus_date <= 3:
total += 1
return total
def executed_quantity():
executed = DB_TYC.find_single_data(
"司法风险",
"被执行人",
{"企业名称": self.target.company_name},
['被执行人']
)
total = 0
if executed['被执行人']:
for item in executed['被执行人']:
begin_date = datetime.strptime(item['caseCreateTime'], '%Y-%m-%d').year
curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year
minus_date = curr_date - begin_date
if 0 <= minus_date <= 3:
total += 1
return total
def legal_action_quantity():
legal_action = DB_TYC.find_single_data(
"司法风险",
"诉讼",
{"企业名称": self.target.company_name},
['诉讼']
)
total = 0
if legal_action:
for item in legal_action['诉讼']:
try:
begin_date = datetime.strptime(item['submittime'], '%Y-%m-%d').year
curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year
minus_date = curr_date - begin_date
condition = self.target.company_name in item['defendants']
if 0 <= minus_date <= 3 and condition:
total += 1
except ValueError:
continue
return total
def final_case_quantity():
final_case = DB_TYC.find_single_data(
"司法风险",
"终本案件",
{"企业名称": self.target.company_name},
['终本案件']
)
total = 0
if final_case['终本案件']:
for item in final_case['终本案件']['result']:
tuptime_newest = time.localtime(item['caseFinalTime'] / 1000)
begin_date = datetime.strptime(time.strftime("%Y-%m-%d", tuptime_newest), '%Y-%m-%d').year
curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year
minus_date = curr_date - begin_date
if 0 <= minus_date <= 3:
total += 1
return total
def consumption_restriction_quantity():
executed = DB_TYC.find_single_data(
"司法风险",
"限制消费令",
{"企业名称": self.target.company_name},
['限制消费令']
)
total = 0
if executed['限制消费令']:
for item in executed['限制消费令']['result']:
tuptime_newest = time.localtime(item['publishDate'] / 1000)
begin_date = datetime.strptime(time.strftime("%Y-%m-%d", tuptime_newest), '%Y-%m-%d').year
curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year
minus_date = curr_date - begin_date
if 0 <= minus_date <= 3:
total += 1
return total
def penalties_quantity():
penalties = DB_TYC.find_single_data(
"经营风险",
"行政处罚",
{"企业名称": self.target.company_name},
['行政处罚']
)
total = 0
if penalties['行政处罚']:
for item in penalties['行政处罚']['result']:
begin_date = datetime.strptime(item['decisionDate'], '%Y-%m-%d').year
curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year
minus_date = curr_date - begin_date
if 0 <= minus_date <= 3:
total += 1
return total
source_dict['失信被执行人'] = dishonest_quantity()
source_dict['被执行人'] = executed_quantity()
source_dict['法律诉讼(被告)'] = legal_action_quantity()
source_dict['终本案件'] = final_case_quantity()
source_dict['限制消费令'] = consumption_restriction_quantity()
source_dict['行政处罚'] = penalties_quantity()
self.source = source_dict
else:
raise JustThrowError(error_info="拉取风险信息失败")
def transform(self):
source = self.source
target = self.target
target.dishonest_executor = source['失信被执行人']
target.person_to_be_executed = source['被执行人']
target.final_case = source['终本案件']
target.consumption_restriction_order = source['限制消费令']
target.administrative_penalties = source['行政处罚']
target.legal_action = source['法律诉讼(被告)']
def load(self):
target = self.target
data = target.fields_toggle()
DB_GUA.upsert_single_data(
'企业数据',
'风险信息',
{"企业名称": data['企业名称'], "报告期": data['报告期']},
data
)
class RiskInfoDataJob(RiskInfo):
def drag(self):
"""拉去天眼查数据接口的企业风险信息"""
data_job = DataJob(
instance=self,
title="拉取企业风险信息",
job_type="处理准备",
status="正常",
detail="企业名称->{};".format(self.company_name)
)
data_job.processes = [
DragRiskInfoFromTYC
]
data_job.start()
class ProcessBusinessInfo(DataProcess):
"""经营信息"""
def extract(self):
data = DB_GUA.find_all_data(
'企业数据',
'担保数据',
{'企业ID': self.target.cid},
[]
)
if not data:
error_info = "担保数据异常; "
raise JustThrowError(error_info=error_info)
else:
self.source['担保信息'] = data
def transform(self):
source = self.source
target = self.target
target.customerinfo = list()
target.regional_distribution = list()
target.industry_distribution = list()
target.balance_distribution = list()
# 获取数据日期、行业、区域
years = list()
area = list()
industry = list()
for item in source['担保信息']:
years.append(item['担保开始日期'][:4])
years.append(item['担保结束日期'][:4])
try:
if item['地级市'] is not None:
area.append(item['地级市'])
except KeyError:
continue
try:
if item['行业'] is not None:
industry.append(item['行业'])
except KeyError:
continue
# 数据去重
years = [int(y) for y in list(set(years))]
years.sort()
years = [str(y) for y in list(range(years[0], years[-1] + 1))]
areas = list(set(area))
industrys = list(set(industry))
# 初始化数据返回格式(包含报告期、区域、行业)
regional_list = list()
industry_list = list()
balance_list = list()
for r in areas:
for year in years:
new_dict = dict()
new_dict['企业ID'] = target.cid
new_dict['地区'] = r
new_dict['报告期'] = year + '-12-31'
new_dict['年末在保余额'] = 0
new_dict['年末在保责任余额'] = 0
new_dict['当年新增在保余额'] = 0
new_dict['当年新增在保责任余额'] = 0
regional_list.append(new_dict)
for d in industrys:
for year in years:
new_dict = dict()
new_dict['企业ID'] = target.cid
new_dict['报告期'] = year + '-12-31'
new_dict['所属行业'] = d
new_dict['年末在保余额'] = 0
new_dict['年末在保责任余额'] = 0
new_dict['当年新增在保余额'] = 0
new_dict['当年新增在保责任余额'] = 0
industry_list.append(new_dict)
for b in years:
new_dict = dict()
insert_dict = dict()
insert_dict['担保户数'] = 0
insert_dict['担保笔数'] = 0
insert_dict['担保责任余额'] = 0
new_dict['企业ID'] = target.cid
new_dict['报告期'] = b + '-12-31'
new_dict['100万以下含100万'] = copy.deepcopy(insert_dict)
new_dict['500万以下含500万'] = copy.deepcopy(insert_dict)
new_dict['500-1000万含1000万'] = copy.deepcopy(insert_dict)
new_dict['1000-3000万含3000万'] = copy.deepcopy(insert_dict)
new_dict['3000-5000万含5000万'] = copy.deepcopy(insert_dict)
new_dict['5000-8000万含8000万'] = copy.deepcopy(insert_dict)
new_dict['8000-10000万含10000万'] = copy.deepcopy(insert_dict)
new_dict['10000万以上'] = copy.deepcopy(insert_dict)
new_dict['合计'] = 0
balance_list.append(new_dict)
# 遍历担保信息(赋值操作)
for item in source['担保信息']:
# 客户信息
customer = target.CustomerInfo()
customer.cid = target.cid
customer.report_period = item['担保开始日期'][:4] + '-12-31'
customer.client_name = item['被担保人']
customer.guarantee_type = item['担保类型']
customer.balance_under_insurance = item['担保余额(万元)']
customer.financing_guarantee_balance = item['责任担保余额(万元)']
customer.guarantee_start_date = item['担保开始日期']
customer.guarantee_end_date = item['担保结束日期']
target.customerinfo.append(customer)
# 计算担保起始年份
begin_index = years.index(item['担保开始日期'][:4])
end_index = years.index(item['担保结束日期'][:4])
year_index = years[begin_index:end_index]
# 区域分布
for regional_ in regional_list:
if item['地级市'] == regional_['地区']:
for year_ in year_index:
# 报告期
if year_ == regional_['报告期'][:4]:
condition_01 = int(item['担保开始日期'][:4]) <= int(year_)
year_date = datetime.strptime(regional_['报告期'], '%Y-%m-%d')
current_date = datetime.strptime(item['担保结束日期'], '%Y-%m-%d')
condition_02 = (current_date - year_date).days > 0
condition_03 = item['担保开始日期'][:4] == year_
if condition_01 and condition_02:
# 年末余额开始日期应小于等于此年度结束日期应该小于此年度的12月31日
regional_['年末在保余额'] += item['担保余额(万元)']
regional_['年末在保责任余额'] += item['责任担保余额(万元)']
if condition_03:
regional_['当年新增在保余额'] += item['担保余额(万元)']
regional_['当年新增在保责任余额'] += item['责任担保余额(万元)']
# 行业分布
for industry_ in industry_list:
try:
if item['行业'] == industry_['所属行业']:
for year_ in year_index:
if year_ == industry_['报告期'][:4]:
condition_01 = int(item['担保开始日期'][:4]) <= int(year_)
year_date = datetime.strptime(industry_['报告期'], '%Y-%m-%d')
current_date = datetime.strptime(item['担保结束日期'], '%Y-%m-%d')
condition_02 = (current_date - year_date).days > 0
condition_03 = item['担保开始日期'][:4] == year_
if condition_01 and condition_02:
# 年末余额开始日期应小于等于此年度结束日期应该小于此年度的12月31日
industry_['年末在保余额'] += item['担保余额(万元)']
industry_['年末在保责任余额'] += item['责任担保余额(万元)']
if condition_03:
industry_['当年新增在保余额'] += item['担保余额(万元)']
industry_['当年新增在保责任余额'] += item['责任担保余额(万元)']
except KeyError:
continue
# 担保金额分布
for balance_ in balance_list:
for year_ in year_index:
if year_ == balance_['报告期'][:4]:
if item['担保余额(万元)'] <= 100:
balance_['100万以下含100万']['担保户数'] += 1
balance_['100万以下含100万']['担保笔数'] += 1
balance_['100万以下含100万']['担保责任余额'] += item['担保余额(万元)']
elif item['担保余额(万元)'] <= 500:
balance_['500万以下含500万']['担保户数'] += 1
balance_['500万以下含500万']['担保笔数'] += 1
balance_['500万以下含500万']['担保责任余额'] += item['担保余额(万元)']
elif item['担保余额(万元)'] <= 1000:
balance_['500-1000万含1000万']['担保户数'] += 1
balance_['500-1000万含1000万']['担保笔数'] += 1
balance_['500-1000万含1000万']['担保责任余额'] += item['担保余额(万元)']
elif item['担保余额(万元)'] <= 3000:
balance_['1000-3000万含3000万']['担保户数'] += 1
balance_['1000-3000万含3000万']['担保笔数'] += 1
balance_['1000-3000万含3000万']['担保责任余额'] += item['担保余额(万元)']
elif item['担保余额(万元)'] <= 5000:
balance_['3000-5000万含5000万']['担保户数'] += 1
balance_['3000-5000万含5000万']['担保笔数'] += 1
balance_['3000-5000万含5000万']['担保责任余额'] += item['担保余额(万元)']
elif item['担保余额(万元)'] <= 8000:
balance_['5000-8000万含8000万']['担保户数'] += 1
balance_['5000-8000万含8000万']['担保笔数'] += 1
balance_['5000-8000万含8000万']['担保责任余额'] += item['担保余额(万元)']
elif item['担保余额(万元)'] <= 10000:
balance_['8000-10000万含10000万']['担保户数'] += 1
balance_['8000-10000万含10000万']['担保笔数'] += 1
balance_['8000-10000万含10000万']['担保责任余额'] += item['担保余额(万元)']
elif item['担保余额(万元)'] > 10000:
balance_['10000万以上']['担保户数'] += 1
balance_['10000万以上']['担保笔数'] += 1
balance_['10000万以上']['担保责任余额'] += item['担保余额(万元)']
# 处理区域分布
for index in range(len(regional_list)):
regional = SpecObject.set_specify_instance(
instance=target.RegionalDistribution,
data=regional_list[index]
)
target.regional_distribution.append(regional)
# 处理行业分布
for index in range(len(industry_list)):
industry = SpecObject.set_specify_instance(
instance=target.IndustryDistribution,
data=industry_list[index]
)
target.industry_distribution.append(industry)
# 处理担保金额分布
for index in range(len(balance_list)):
balance = target.GuaranteedBalanceDistribution()
balance.report_period = balance_list[index]['报告期']
balance.cid = target.cid
balance.one_million = SpecObject.set_specify_instance(
instance=balance.GuaranteeDetail,
data=balance_list[index]['100万以下含100万']
)
balance.five_million = SpecObject.set_specify_instance(
instance=balance.GuaranteeDetail,
data=balance_list[index]['500万以下含500万']
)
balance.ten_million = SpecObject.set_specify_instance(
instance=balance.GuaranteeDetail,
data=balance_list[index]['500-1000万含1000万']
)
balance.thirty_million = SpecObject.set_specify_instance(
instance=balance.GuaranteeDetail,
data=balance_list[index]['1000-3000万含3000万']
)
balance.fifty_million = SpecObject.set_specify_instance(
instance=balance.GuaranteeDetail,
data=balance_list[index]['3000-5000万含5000万']
)
balance.eighty_million = SpecObject.set_specify_instance(
instance=balance.GuaranteeDetail,
data=balance_list[index]['5000-8000万含8000万']
)
balance.one_hundred_million = SpecObject.set_specify_instance(
instance=balance.GuaranteeDetail,
data=balance_list[index]['8000-10000万含10000万']
)
balance.more_than_one_hundred_million = SpecObject.set_specify_instance(
instance=balance.GuaranteeDetail,
data=balance_list[index]['10000万以上']
)
balance.total = sum([value['担保责任余额'] for value in balance_list[index].values() if isinstance(value, dict)])
target.balance_distribution.append(balance)
def load(self):
target = self.target
data = target.fields_toggle()
customer_data = data['客户信息']
regional_data = data['区域分布']
industry_data = data['行业分布']
amount_data = data['担保金额分布']
DB_GUA.insert_many_data(
'企业数据',
'担保数据-客户信息',
[item for item in customer_data]
)
DB_GUA.insert_many_data(
'企业数据',
'担保数据-区域分布',
[item for item in regional_data]
)
DB_GUA.insert_many_data(
'企业数据',
'担保数据-行业分布',
[item for item in industry_data]
)
DB_GUA.insert_many_data(
'企业数据',
'担保数据-担保金额分布',
[item for item in amount_data]
)
class BusinessInfoDataJob(BusinessInfo):
def create(self):
data_job = DataJob(
instance=self,
title="经营信息数据",
job_type="初始数据",
status="正常",
detail="企业名称->{}; 企业ID->{};".format(self.company_name, self.cid)
)
data_job.processes = [
ProcessBusinessInfo,
]
data_job.start()