import copy import json import time from datetime import datetime import requests from DBHelper.MongoHelperInstance import DB_TYC, DB_GUA from Modules.Company.CompanyObject import RiskInfo, BusinessInfo from Modules.ETL.WideETL import DataProcess, DataJob from Utils.ErrorUtil import JustThrowError from Utils.ObjUtil import SpecObject class DragRiskInfoFromTYC(DataProcess): def extract(self): url = "http://api.fecribd.com/api/tyc/guarantee_risk_info" headers = {'token': self.target.token} data = {"企业名称": self.target.company_name, "数据范围": [], "时间范围": 30} res = requests.post(url=url, headers=headers, data=json.dumps(data)) if res.status_code == 200: source_dict = dict() def dishonest_quantity(): dishonest = DB_TYC.find_single_data( "司法风险", "失信人", {"企业名称": self.target.company_name}, ['失信人'] ) total = 0 if dishonest['失信人']: for item in dishonest['失信人']: begin_date = datetime.strptime(item['publishdate'], '%Y-%m-%d').year curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year minus_date = curr_date - begin_date if 0 <= minus_date <= 3: total += 1 return total def executed_quantity(): executed = DB_TYC.find_single_data( "司法风险", "被执行人", {"企业名称": self.target.company_name}, ['被执行人'] ) total = 0 if executed['被执行人']: for item in executed['被执行人']: begin_date = datetime.strptime(item['caseCreateTime'], '%Y-%m-%d').year curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year minus_date = curr_date - begin_date if 0 <= minus_date <= 3: total += 1 return total def legal_action_quantity(): legal_action = DB_TYC.find_single_data( "司法风险", "诉讼", {"企业名称": self.target.company_name}, ['诉讼'] ) total = 0 if legal_action: for item in legal_action['诉讼']: try: begin_date = datetime.strptime(item['submittime'], '%Y-%m-%d').year curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year minus_date = curr_date - begin_date condition = self.target.company_name in item['defendants'] if 0 <= minus_date <= 3 and condition: total += 1 except ValueError: continue return total def final_case_quantity(): final_case = DB_TYC.find_single_data( "司法风险", "终本案件", {"企业名称": self.target.company_name}, ['终本案件'] ) total = 0 if final_case['终本案件']: for item in final_case['终本案件']['result']: tuptime_newest = time.localtime(item['caseFinalTime'] / 1000) begin_date = datetime.strptime(time.strftime("%Y-%m-%d", tuptime_newest), '%Y-%m-%d').year curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year minus_date = curr_date - begin_date if 0 <= minus_date <= 3: total += 1 return total def consumption_restriction_quantity(): executed = DB_TYC.find_single_data( "司法风险", "限制消费令", {"企业名称": self.target.company_name}, ['限制消费令'] ) total = 0 if executed['限制消费令']: for item in executed['限制消费令']['result']: tuptime_newest = time.localtime(item['publishDate'] / 1000) begin_date = datetime.strptime(time.strftime("%Y-%m-%d", tuptime_newest), '%Y-%m-%d').year curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year minus_date = curr_date - begin_date if 0 <= minus_date <= 3: total += 1 return total def penalties_quantity(): penalties = DB_TYC.find_single_data( "经营风险", "行政处罚", {"企业名称": self.target.company_name}, ['行政处罚'] ) total = 0 if penalties['行政处罚']: for item in penalties['行政处罚']['result']: begin_date = datetime.strptime(item['decisionDate'], '%Y-%m-%d').year curr_date = datetime.strptime(self.target.report_date, '%Y-%m-%d').year minus_date = curr_date - begin_date if 0 <= minus_date <= 3: total += 1 return total source_dict['失信被执行人'] = dishonest_quantity() source_dict['被执行人'] = executed_quantity() source_dict['法律诉讼(被告)'] = legal_action_quantity() source_dict['终本案件'] = final_case_quantity() source_dict['限制消费令'] = consumption_restriction_quantity() source_dict['行政处罚'] = penalties_quantity() self.source = source_dict else: raise JustThrowError(error_info="拉取风险信息失败") def transform(self): source = self.source target = self.target target.dishonest_executor = source['失信被执行人'] target.person_to_be_executed = source['被执行人'] target.final_case = source['终本案件'] target.consumption_restriction_order = source['限制消费令'] target.administrative_penalties = source['行政处罚'] target.legal_action = source['法律诉讼(被告)'] def load(self): target = self.target data = target.fields_toggle() DB_GUA.upsert_single_data( '企业数据', '风险信息', {"企业名称": data['企业名称'], "报告期": data['报告期']}, data ) class RiskInfoDataJob(RiskInfo): def drag(self): """拉去天眼查数据接口的企业风险信息""" data_job = DataJob( instance=self, title="拉取企业风险信息", job_type="处理准备", status="正常", detail="企业名称->{};".format(self.company_name) ) data_job.processes = [ DragRiskInfoFromTYC ] data_job.start() class ProcessBusinessInfo(DataProcess): """经营信息""" def extract(self): data = DB_GUA.find_all_data( '企业数据', '担保数据', {'企业ID': self.target.cid}, [] ) if not data: error_info = "担保数据异常; " raise JustThrowError(error_info=error_info) else: self.source['担保信息'] = data def transform(self): source = self.source target = self.target target.customerinfo = list() target.regional_distribution = list() target.industry_distribution = list() target.balance_distribution = list() # 获取数据日期、行业、区域 years = list() area = list() industry = list() for item in source['担保信息']: years.append(item['担保开始日期'][:4]) years.append(item['担保结束日期'][:4]) try: if item['地级市'] is not None: area.append(item['地级市']) except KeyError: continue try: if item['行业'] is not None: industry.append(item['行业']) except KeyError: continue # 数据去重 years = [int(y) for y in list(set(years))] years.sort() years = [str(y) for y in list(range(years[0], years[-1] + 1))] areas = list(set(area)) industrys = list(set(industry)) # 初始化数据返回格式(包含报告期、区域、行业) regional_list = list() industry_list = list() balance_list = list() for r in areas: for year in years: new_dict = dict() new_dict['企业ID'] = target.cid new_dict['地区'] = r new_dict['报告期'] = year + '-12-31' new_dict['年末在保余额'] = 0 new_dict['年末在保责任余额'] = 0 new_dict['当年新增在保余额'] = 0 new_dict['当年新增在保责任余额'] = 0 regional_list.append(new_dict) for d in industrys: for year in years: new_dict = dict() new_dict['企业ID'] = target.cid new_dict['报告期'] = year + '-12-31' new_dict['所属行业'] = d new_dict['年末在保余额'] = 0 new_dict['年末在保责任余额'] = 0 new_dict['当年新增在保余额'] = 0 new_dict['当年新增在保责任余额'] = 0 industry_list.append(new_dict) for b in years: new_dict = dict() insert_dict = dict() insert_dict['担保户数'] = 0 insert_dict['担保笔数'] = 0 insert_dict['担保责任余额'] = 0 new_dict['企业ID'] = target.cid new_dict['报告期'] = b + '-12-31' new_dict['100万以下(含100万)'] = copy.deepcopy(insert_dict) new_dict['500万以下(含500万)'] = copy.deepcopy(insert_dict) new_dict['500-1000万(含1000万)'] = copy.deepcopy(insert_dict) new_dict['1000-3000万(含3000万)'] = copy.deepcopy(insert_dict) new_dict['3000-5000万(含5000万)'] = copy.deepcopy(insert_dict) new_dict['5000-8000万(含8000万)'] = copy.deepcopy(insert_dict) new_dict['8000-10000万(含10000万)'] = copy.deepcopy(insert_dict) new_dict['10000万以上'] = copy.deepcopy(insert_dict) new_dict['合计'] = 0 balance_list.append(new_dict) # 遍历担保信息(赋值操作) for item in source['担保信息']: # 客户信息 customer = target.CustomerInfo() customer.cid = target.cid customer.report_period = item['担保开始日期'][:4] + '-12-31' customer.client_name = item['被担保人'] customer.guarantee_type = item['担保类型'] customer.balance_under_insurance = item['担保余额(万元)'] customer.financing_guarantee_balance = item['责任担保余额(万元)'] customer.guarantee_start_date = item['担保开始日期'] customer.guarantee_end_date = item['担保结束日期'] target.customerinfo.append(customer) # 计算担保起始年份 begin_index = years.index(item['担保开始日期'][:4]) end_index = years.index(item['担保结束日期'][:4]) year_index = years[begin_index:end_index] # 区域分布 for regional_ in regional_list: if item['地级市'] == regional_['地区']: for year_ in year_index: # 报告期 if year_ == regional_['报告期'][:4]: condition_01 = int(item['担保开始日期'][:4]) <= int(year_) year_date = datetime.strptime(regional_['报告期'], '%Y-%m-%d') current_date = datetime.strptime(item['担保结束日期'], '%Y-%m-%d') condition_02 = (current_date - year_date).days > 0 condition_03 = item['担保开始日期'][:4] == year_ if condition_01 and condition_02: # 年末余额:开始日期应小于等于此年度,结束日期应该小于此年度的12月31日 regional_['年末在保余额'] += item['担保余额(万元)'] regional_['年末在保责任余额'] += item['责任担保余额(万元)'] if condition_03: regional_['当年新增在保余额'] += item['担保余额(万元)'] regional_['当年新增在保责任余额'] += item['责任担保余额(万元)'] # 行业分布 for industry_ in industry_list: try: if item['行业'] == industry_['所属行业']: for year_ in year_index: if year_ == industry_['报告期'][:4]: condition_01 = int(item['担保开始日期'][:4]) <= int(year_) year_date = datetime.strptime(industry_['报告期'], '%Y-%m-%d') current_date = datetime.strptime(item['担保结束日期'], '%Y-%m-%d') condition_02 = (current_date - year_date).days > 0 condition_03 = item['担保开始日期'][:4] == year_ if condition_01 and condition_02: # 年末余额:开始日期应小于等于此年度,结束日期应该小于此年度的12月31日 industry_['年末在保余额'] += item['担保余额(万元)'] industry_['年末在保责任余额'] += item['责任担保余额(万元)'] if condition_03: industry_['当年新增在保余额'] += item['担保余额(万元)'] industry_['当年新增在保责任余额'] += item['责任担保余额(万元)'] except KeyError: continue # 担保金额分布 for balance_ in balance_list: for year_ in year_index: if year_ == balance_['报告期'][:4]: if item['担保余额(万元)'] <= 100: balance_['100万以下(含100万)']['担保户数'] += 1 balance_['100万以下(含100万)']['担保笔数'] += 1 balance_['100万以下(含100万)']['担保责任余额'] += item['担保余额(万元)'] elif item['担保余额(万元)'] <= 500: balance_['500万以下(含500万)']['担保户数'] += 1 balance_['500万以下(含500万)']['担保笔数'] += 1 balance_['500万以下(含500万)']['担保责任余额'] += item['担保余额(万元)'] elif item['担保余额(万元)'] <= 1000: balance_['500-1000万(含1000万)']['担保户数'] += 1 balance_['500-1000万(含1000万)']['担保笔数'] += 1 balance_['500-1000万(含1000万)']['担保责任余额'] += item['担保余额(万元)'] elif item['担保余额(万元)'] <= 3000: balance_['1000-3000万(含3000万)']['担保户数'] += 1 balance_['1000-3000万(含3000万)']['担保笔数'] += 1 balance_['1000-3000万(含3000万)']['担保责任余额'] += item['担保余额(万元)'] elif item['担保余额(万元)'] <= 5000: balance_['3000-5000万(含5000万)']['担保户数'] += 1 balance_['3000-5000万(含5000万)']['担保笔数'] += 1 balance_['3000-5000万(含5000万)']['担保责任余额'] += item['担保余额(万元)'] elif item['担保余额(万元)'] <= 8000: balance_['5000-8000万(含8000万)']['担保户数'] += 1 balance_['5000-8000万(含8000万)']['担保笔数'] += 1 balance_['5000-8000万(含8000万)']['担保责任余额'] += item['担保余额(万元)'] elif item['担保余额(万元)'] <= 10000: balance_['8000-10000万(含10000万)']['担保户数'] += 1 balance_['8000-10000万(含10000万)']['担保笔数'] += 1 balance_['8000-10000万(含10000万)']['担保责任余额'] += item['担保余额(万元)'] elif item['担保余额(万元)'] > 10000: balance_['10000万以上']['担保户数'] += 1 balance_['10000万以上']['担保笔数'] += 1 balance_['10000万以上']['担保责任余额'] += item['担保余额(万元)'] # 处理区域分布 for index in range(len(regional_list)): regional = SpecObject.set_specify_instance( instance=target.RegionalDistribution, data=regional_list[index] ) target.regional_distribution.append(regional) # 处理行业分布 for index in range(len(industry_list)): industry = SpecObject.set_specify_instance( instance=target.IndustryDistribution, data=industry_list[index] ) target.industry_distribution.append(industry) # 处理担保金额分布 for index in range(len(balance_list)): balance = target.GuaranteedBalanceDistribution() balance.report_period = balance_list[index]['报告期'] balance.cid = target.cid balance.one_million = SpecObject.set_specify_instance( instance=balance.GuaranteeDetail, data=balance_list[index]['100万以下(含100万)'] ) balance.five_million = SpecObject.set_specify_instance( instance=balance.GuaranteeDetail, data=balance_list[index]['500万以下(含500万)'] ) balance.ten_million = SpecObject.set_specify_instance( instance=balance.GuaranteeDetail, data=balance_list[index]['500-1000万(含1000万)'] ) balance.thirty_million = SpecObject.set_specify_instance( instance=balance.GuaranteeDetail, data=balance_list[index]['1000-3000万(含3000万)'] ) balance.fifty_million = SpecObject.set_specify_instance( instance=balance.GuaranteeDetail, data=balance_list[index]['3000-5000万(含5000万)'] ) balance.eighty_million = SpecObject.set_specify_instance( instance=balance.GuaranteeDetail, data=balance_list[index]['5000-8000万(含8000万)'] ) balance.one_hundred_million = SpecObject.set_specify_instance( instance=balance.GuaranteeDetail, data=balance_list[index]['8000-10000万(含10000万)'] ) balance.more_than_one_hundred_million = SpecObject.set_specify_instance( instance=balance.GuaranteeDetail, data=balance_list[index]['10000万以上'] ) balance.total = sum([value['担保责任余额'] for value in balance_list[index].values() if isinstance(value, dict)]) target.balance_distribution.append(balance) def load(self): target = self.target data = target.fields_toggle() customer_data = data['客户信息'] regional_data = data['区域分布'] industry_data = data['行业分布'] amount_data = data['担保金额分布'] DB_GUA.insert_many_data( '企业数据', '担保-客户信息', [item for item in customer_data] ) DB_GUA.insert_many_data( '企业数据', '担保-区域分布', [item for item in regional_data] ) DB_GUA.insert_many_data( '企业数据', '担保-行业分布', [item for item in industry_data] ) DB_GUA.insert_many_data( '企业数据', '担保-担保金额分布', [item for item in amount_data] ) class BusinessInfoDataJob(BusinessInfo): def create(self): data_job = DataJob( instance=self, title="经营信息数据", job_type="初始数据", status="正常", detail="企业名称->{}; 企业ID->{};".format(self.company_name, self.cid) ) data_job.processes = [ ProcessBusinessInfo, ] data_job.start()