guarantee-admin-api-v0.2/Modules/ETL/GuaranteeDataJob.py

276 lines
11 KiB
Python

import json
import time
from datetime import datetime
import requests
from DBHelper.MongoHelperInstance import DB_TYC, DB_GUA
from Modules.Company.CompanyObject import RiskInfo, BusinessInfo
from Modules.ETL.WideETL import DataProcess, DataJob
from Utils.ErrorUtil import JustThrowError
class DragRiskInfoFromTYC(DataProcess):
def extract(self):
url = "http://api.fecribd.com/api/tyc/guarantee_risk_info"
headers = {'token': self.target.token}
data = {"企业名称": self.target.company_name, "数据范围": [], "时间范围": 30}
res = requests.post(url=url, headers=headers, data=json.dumps(data))
if res.status_code == 200:
source_dict = dict()
def dishonest_quantity():
dishonest = DB_TYC.find_single_data(
"司法风险",
"失信人",
{"企业名称": self.target.company_name},
['失信人']
)
total = 0
if dishonest['失信人']:
for item in dishonest['失信人']:
begin_date = datetime.strptime(item['publishdate'], '%Y-%m-%d').year
curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year
minus_date = curr_date - begin_date
if 0 <= minus_date <= 3:
total += 1
return total
def executed_quantity():
executed = DB_TYC.find_single_data(
"司法风险",
"被执行人",
{"企业名称": self.target.company_name},
['被执行人']
)
total = 0
if executed['被执行人']:
for item in executed['被执行人']:
begin_date = datetime.strptime(item['caseCreateTime'], '%Y-%m-%d').year
curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year
minus_date = curr_date - begin_date
if 0 <= minus_date <= 3:
total += 1
return total
def legal_action_quantity():
legal_action = DB_TYC.find_single_data(
"司法风险",
"诉讼",
{"企业名称": self.target.company_name},
['诉讼']
)
total = 0
if legal_action:
for item in legal_action['诉讼']:
try:
begin_date = datetime.strptime(item['submittime'], '%Y-%m-%d').year
curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year
minus_date = curr_date - begin_date
condition = self.target.company_name in item['defendants']
if 0 <= minus_date <= 3 and condition:
total += 1
except ValueError:
continue
return total
def final_case_quantity():
final_case = DB_TYC.find_single_data(
"司法风险",
"终本案件",
{"企业名称": self.target.company_name},
['终本案件']
)
total = 0
if final_case['终本案件']:
for item in final_case['终本案件']['result']:
tuptime_newest = time.localtime(item['caseFinalTime'] / 1000)
begin_date = datetime.strptime(time.strftime("%Y-%m-%d", tuptime_newest), '%Y-%m-%d').year
curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year
minus_date = curr_date - begin_date
if 0 <= minus_date <= 3:
total += 1
return total
def consumption_restriction_quantity():
executed = DB_TYC.find_single_data(
"司法风险",
"限制消费令",
{"企业名称": self.target.company_name},
['限制消费令']
)
total = 0
if executed['限制消费令']:
for item in executed['限制消费令']['result']:
tuptime_newest = time.localtime(item['publishDate'] / 1000)
begin_date = datetime.strptime(time.strftime("%Y-%m-%d", tuptime_newest), '%Y-%m-%d').year
curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year
minus_date = curr_date - begin_date
if 0 <= minus_date <= 3:
total += 1
return total
def penalties_quantity():
penalties = DB_TYC.find_single_data(
"经营风险",
"行政处罚",
{"企业名称": self.target.company_name},
['行政处罚']
)
total = 0
if penalties['行政处罚']:
for item in penalties['行政处罚']['result']:
begin_date = datetime.strptime(item['decisionDate'], '%Y-%m-%d').year
curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year
minus_date = curr_date - begin_date
if 0 <= minus_date <= 3:
total += 1
return total
source_dict['失信被执行人'] = dishonest_quantity()
source_dict['被执行人'] = executed_quantity()
source_dict['法律诉讼(被告)'] = legal_action_quantity()
source_dict['终本案件'] = final_case_quantity()
source_dict['限制消费令'] = consumption_restriction_quantity()
source_dict['行政处罚'] = penalties_quantity()
self.source = source_dict
else:
raise JustThrowError(error_info="拉取风险信息失败")
def transform(self):
source = self.source
target = self.target
target.dishonest_executor = source['失信被执行人']
target.person_to_be_executed = source['被执行人']
target.final_case = source['终本案件']
target.consumption_restriction_order = source['限制消费令']
target.administrative_penalties = source['行政处罚']
target.legal_action = source['法律诉讼(被告)']
def load(self):
target = self.target
data = target.fields_toggle()
DB_GUA.upsert_single_data(
'企业数据',
'风险信息',
{"企业名称": data['企业名称'], "报告期": data['报告期']},
data
)
class RiskInfoDataJob(RiskInfo):
def drag(self):
"""拉去天眼查数据接口的企业风险信息"""
data_job = DataJob(
instance=self,
title="拉取企业风险信息",
job_type="处理准备",
status="正常",
detail="企业名称->{};".format(self.company_name)
)
data_job.processes = [
DragRiskInfoFromTYC
]
data_job.start()
class ProcessBusinessInfo(DataProcess):
"""经营信息"""
def load(self):
data = DB_GUA.find_all_data(
'企业数据',
'担保数据',
{'企业ID': self.target.cid},
[]
)
if not data:
error_info = "担保数据异常; "
raise JustThrowError(error_info=error_info)
else:
self.source['担保信息'] = data
def transform(self):
source = self.source
target = self.target
target.customerinfo = list()
target.regional_distribution = list()
target.industry_distribution = list()
target.balance_distribution = list()
years = list()
area = list()
for item in source['担保信息']:
years.append([item['担保开始日期'][:4], item['担保结束日期'][:4]])
area.append(item['地级市'])
years = list(set(years))
areas = list(set(area))
regional_list = list()
for r in areas:
for year in years:
new_dict = dict()
new_dict['地区'] = r
new_dict['报告期'] = year
new_dict['年末在保余额'] = 0
new_dict['年末在保责任余额'] = 0
new_dict['当年新增在保余额'] = 0
new_dict['当年新增在保责任余额'] = 0
regional_list.append(new_dict)
for item in source['担保信息']:
# 客户信息
customer = target.CustomerInfo()
customer.report_period = item['担保开始日期']
customer.client_name = item['被担保人']
customer.guarantee_type = item['担保类型']
customer.balance_under_insurance = item['担保余额(万元)']
customer.financing_guarantee_balance = item['责任担保余额(万元)']
customer.guarantee_start_date = item['担保开始日期']
customer.guarantee_end_date = item['担保结束日期']
target.customerinfo.append(customer)
# 区域分布
for regional_ in regional_list:
if item['省份'] == regional_['地区'] and regional_['报告期'] == item['担保开始日期'][:4]:
regional_['年末在保余额'] += item['担保余额(万元)']
regional_['年末在保责任余额'] += item['责任担保余额(万元)']
# regional = target.RegionalDistribution()
# regional.report_period = item['担保开始日期']
# regional.region = item['省份']
# regional.year_end_insured_balance = item['']
# regional.year_end_insured_liability_balance = item['']
# regional.insured_balance_newly_added = item['']
# regional.insured_liability_newly_added = item['']
def extract(self):
pass
class BusinessInfoDataJob(BusinessInfo):
def create(self):
data_job = DataJob(
instance=self,
title="经营信息数据",
job_type="初始数据",
status="正常",
detail="企业名称->{}; 企业ID->{};".format(self.company_name, self.cid)
)
data_job.processes = [
ProcessBusinessInfo,
]
data_job.start()