tfse-etl-api-v0.2/company/scripts.py

549 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import time
import requests
import pandas as pd
from common.scripts import read_json_file, sub_dict, df_iterrows
from company.db import find_data_in_tyc, insert_data_to_tfse, find_data_in_tfse
from rating.scripts.risk_detail import associate_risk_detail, change_log_detail
def drag_company_data_request(company_name):
"""
从天眼查接口获取企业数据
Parameters:
company_name: 企业名称
Returns:
-
"""
url = "http://139.9.249.34:51009/api/tyc/drag_data"
headers = {'token': "uzdq51N4!I0%HY4sCaQ!aeCSIDIVIdAM"}
data = {"企业名称": company_name}
res = requests.post(url=url, headers=headers, data=json.dumps(data))
if res.status_code == 200:
return "企业数据拉取成功"
else:
return "企业数据拉取失败"
def basic_info_etl(cid, company_name):
"""
根据企业名称,查询天眼查数据库
将数据按规定格式存储到股交企业数据库中
Parameters:
cid: 企业ID
company_name: 企业名称
Returns:
-
"""
# Params
basic_info = find_data_in_tyc('公司背景', '基本信息', {"企业名称": company_name})
holder_info = find_data_in_tyc('公司背景', '企业股东', {"企业名称": company_name})
member_info = find_data_in_tyc('公司背景', '主要人员', {"企业名称": company_name})
# Returns
data = read_json_file('/company/static/template/基本信息.json')
# 处理工商信息
def business_data():
data['企业ID'] = cid
data['更新日期'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
data['企业名称'] = basic_info[0]['企业名称']
data['工商信息']['企业状态'] = basic_info[0]['基本信息']['regStatus']
data['工商信息']['法定代表人'] = basic_info[0]['基本信息']['legalPersonName']
data['工商信息']['企业类型'] = basic_info[0]['基本信息']['companyOrgType']
data['工商信息']['纳税人识别号'] = basic_info[0]['基本信息']['taxNumber']
data['工商信息']['经营范围'] = basic_info[0]['基本信息']['businessScope']
data['工商信息']['注册资本'] = basic_info[0]['基本信息']['regCapital']
data['工商信息']['实缴资本'] = basic_info[0]['基本信息']['actualCapital']
data['工商信息']['注册地址'] = basic_info[0]['基本信息']['regLocation']
data['工商信息']['登记机关'] = basic_info[0]['基本信息']['regInstitute']
data['工商信息']['行业'] = basic_info[0]['基本信息']['industry']
data['工商信息']['人员规模'] = basic_info[0]['基本信息']['staffNumRange']
data['工商信息']['参保人数'] = basic_info[0]['基本信息']['socialStaffNum']
data['工商信息']['小微企业'] = "" if basic_info[0]['基本信息']['regStatus'] == 1 else ""
# 处理股东信息
def share_holders():
data['股东信息'] = []
for holder in holder_info[0]['企业股东']['result']:
info = dict()
info['股东'] = holder['name']
info['股东类型'] = "公司" if holder['type'] == 1 else ("个人" if holder['type'] == 2 else "其他")
if holder['capital']:
info['持股比例'] = None if holder['capital'][0]['percent'] == '' else holder['capital'][0]['percent']
info['认缴金额'] = None if holder['capital'][0]['amomon'] == '' else holder['capital'][0]['amomon']
info['认缴日期'] = None if holder['capital'][0]['time'] == '' else holder['capital'][0]['time']
else:
info['持股比例'] = None
info['认缴金额'] = None
info['认缴日期'] = None
if holder['capitalActl']:
info['实缴金额'] = []
info['实缴方式'] = []
info['实缴时间'] = []
for capital_actl in holder['capitalActl']:
info['实缴金额'].append(capital_actl['amomon'])
info['实缴方式'].append(capital_actl['paymet'])
info['实缴时间'].append(capital_actl['time'])
else:
info['实缴金额'] = []
info['实缴方式'] = []
info['实缴时间'] = []
data['股东信息'].append(info)
# 处理主要成员信息
def main_members():
data['主要成员'] = []
for member in member_info[0]['主要人员']['result']:
info = dict()
info['姓名'] = member['name']
info['职务'] = member['typeJoin']
data['主要成员'].append(info)
# 保存处理后的数据
def save_result():
insert_data_to_tfse('企业', '公司基本信息', data)
# 执行方法
business_data()
share_holders()
main_members()
save_result()
def general_rating_etl(rid):
"""
执行综合信用评价信息数据清洗程序
Parameters:
rid: str 评价ID
Returns:
res: desc
"""
# Parameters
rating_record = find_data_in_tfse('评价', '评价记录', {"评价ID": rid})[0]
rating_result = find_data_in_tfse('评价', '评价结果', {"评价ID": rid})[0]
rating_results = find_data_in_tfse('评价', '评价结果', {"企业ID": rating_result['企业ID']})
text_model = find_data_in_tfse('评价', '报告数据', {"企业ID": rating_result['企业ID'], "评价ID": rid})[0]
df_records = pd.DataFrame(rating_results).sort_values('评价时间', ascending=False)
rating_index = find_data_in_tfse('企业', '指标明细', {"企业ID": rating_result['企业ID']})
df_index = pd.DataFrame(rating_index).sort_values('年报期', ascending=False)
# Returns
result = dict()
def result_head():
# 根据rid 查询评价日期、信用等级、信用分数
result['企业ID'] = rating_result['企业ID']
result['企业名称'] = rating_result['企业名称']
result['更新时间'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 综合信用等级数据
result_general = result['综合信用等级'] = dict()
result_general['评价时间'] = rating_result['评价时间']
result_general['信用等级'] = rating_result['信用等级']
result_general['信用评分'] = rating_result['信用评分']
def make_radar_chart():
"""
综合信用指标表现雷达图数据
"""
# 生成雷达图数据
rating_score = dict()
rating_score["环境"] = rating_result['经营评分']["环境"]
rating_score["社会责任"] = rating_result['经营评分']["社会责任"]
rating_score["公司治理"] = rating_result['经营评分']["公司治理"]
rating_score["盈利能力"] = rating_result['财务评分']["盈利能力"]['合计']
rating_score["资产质量"] = rating_result['财务评分']["资产质量"]['合计']
rating_score["债务风险"] = rating_result['财务评分']["债务风险"]['合计']
rating_score["经营增长"] = rating_result['财务评分']["经营增长"]['合计']
rating_score["合规风险"] = round(43 - rating_result['风险评分']["合规风险"], 2)
rating_score["经营风险"] = round(10 - rating_result['风险评分']["经营风险"]['合计'], 2)
rating_score["关联风险"] = round(10 - rating_result['风险评分']["关联风险"]['合计'], 2)
result['综合信用表现'] = dict()
result['综合信用表现']['指标雷达'] = dict()
result['综合信用表现']['指标雷达']['指标得分'] = rating_score
result['综合信用表现']['指标雷达']['最大分数'] = read_json_file('/company/static/refers/general.json')['radar']
def credit_analysis_content():
"""
综合评价分析中信用分析数据
"""
# 经营分析
def business_analysis_content():
describe = text_model['报告内容'][1]['章节内容'][0]['小节内容'][1]['段落']
return describe
# 财务分析
def financial_analysis_content():
if text_model['行业选择'][0] == '制造业':
describe = list()
describe.append(text_model['报告内容'][3]['章节内容'][0]['小节内容'][1]['段落'])
describe.append(text_model['报告内容'][3]['章节内容'][1]['小节内容'][2]['段落'])
describe = ''.join(describe)
else:
describe = text_model['报告内容'][3]['章节内容'][0]['小节内容'][2]['段落']
return describe
# 风险分析
def risk_analysis_content():
risk_01 = text_model['报告内容'][4]['章节内容'][0]['小节内容'][0]['段落']
list_01 = risk_01.split('')
risk_02 = text_model['报告内容'][4]['章节内容'][1]['小节内容'][0]['段落']
list_02 = risk_02.split('')
risk_03 = text_model['报告内容'][4]['章节内容'][2]['小节内容'][0]['段落']
list_03 = risk_03.split('')
describe = list()
describe.append(list_01[::-1][0])
describe.append(list_02[::-1][0])
describe.append(list_03[::-1][0])
describe = ','.join(describe)
res = describe.replace('', '', 2)
return res
# 评价意见
def evaluation_comments_content():
describe = list()
eva_01 = text_model['报告内容'][5]['章节内容'][0]['小节内容'][0]['段落']
eva_02 = text_model['报告内容'][5]['章节内容'][0]['小节内容'][1]['段落']
eva_03 = text_model['报告内容'][5]['章节内容'][0]['小节内容'][2]['段落']
describe.append(eva_01)
describe.append(eva_02)
describe.append(eva_03)
describe = ''.join(describe)
return describe
# 生成信用分析数据
result['信用分析'] = dict()
result['信用分析']['经营分析'] = business_analysis_content()
result['信用分析']['财务分析'] = financial_analysis_content()
result['信用分析']['风险分析'] = risk_analysis_content()
result['信用分析']['评价意见'] = evaluation_comments_content()
result['信用分析']['查看报告'] = '/file/get_company_report?file_id={}'.format(rating_record['报告fid'])
def make_total_score():
"""
生成总得分数据
"""
ref_score = read_json_file('/company/static/refers/general.json')['total']
result['总得分'] = dict()
result['总得分']['ESG'] = '{}%'.format(round(100*rating_result['经营评分']['合计']/ref_score['ESG'], 2))
result['总得分']['财务指标'] = '{}%'.format(round(100*rating_result['财务评分']['合计']/ref_score['财务指标'], 2))
result['总得分']['风险指标'] = '{}%'.format(round(100*(1-rating_result['风险评分']['合计']/ref_score['风险指标']), 2))
def profitability():
"""
盈利能力数据
"""
date = '{}-12-31'.format(int(rating_result['评价年度'].split('')[0])-1)
df_profit = df_index[df_index['年报期'] == date]
result['盈利能力'] = dict()
result['盈利能力']['总资产报酬率'] = df_profit['总资产报酬率'].values[0]
result['盈利能力']['净资产收益率'] = df_profit['净资产收益率'].values[0]
def growth_ability():
"""
成长能力数据
"""
df_grow = df_index[:3].sort_values('年报期', ascending=True).set_index('年报期')
result['成长能力'] = json.loads(df_grow[['营业增长率', '总资产增长率']].T.to_json())
def assets_quality():
"""
计算资产质量数据
"""
date = '{}-12-31'.format(int(rating_result['评价年度'].split('')[0]) - 1)
df_assets = df_index[df_index['年报期'] == date]
result['资产质量'] = dict()
result['资产质量']['应收账款周转率'] = df_assets['应收账款周转率'].values[0]
result['资产质量']['存货周转率'] = df_assets['存货周转率'].values[0]
result['资产质量']['总资产周转率'] = df_assets['总资产周转率'].values[0]
result['资产质量']['资产负债率'] = df_assets['资产负债率'].values[0]
def rating_history():
"""
评价历史数据
"""
# 评级历史数据
df_records['变化趋势'] = (df_records['信用评分'] - df_records['信用评分'].shift(-1)).apply(
lambda x: '下降' if x < 0 else ('上升' if x > 0 else ('维持' if x == 0 else '-')))
result['历史级别'] = list(json.loads(df_records[['信用等级', "信用评分", '评价时间', "评价方式", "变化趋势"]].T.to_json()).values())
def rating_index_analysis():
"""
评价指标表现数据
"""
# 生成指标表格数据
result['指标表格'] = dict()
result['指标表格']['财务指标'] = dict()
result['指标表格']['风险指标'] = dict()
df_operate = pd.DataFrame(
{'实际值': sub_dict(rating_result['经营评分'], ['环境', '社会责任', '公司治理']), '最大值': {'环境': 10, '社会责任': 10, '公司治理': 10}})
result['指标表格']['经营指标'] = json.loads((df_operate['实际值'] / df_operate['最大值']).apply(lambda x: '' if x >= 1 else (
'' if x >= 0.75 else ('' if x >= 0.5 else ('' if x >= 0.25 else '')))).to_json())
financial = dict()
financial['盈利能力'] = rating_result['财务评分']['盈利能力']['合计']
financial['资产质量'] = rating_result['财务评分']['资产质量']['合计']
financial['债务风险'] = rating_result['财务评分']['债务风险']['合计']
financial['经营增长'] = rating_result['财务评分']['经营增长']['合计']
df_financial = pd.DataFrame({'实际值': financial, '最大值': {"盈利能力": 16, "资产质量": 18, "债务风险": 18, "经营增长": 18}})
result['指标表格']['财务指标'] = json.loads((df_financial['实际值'] / df_financial['最大值']).apply(
lambda x: '' if x >= 1 else (
'' if x >= 0.75 else ('' if x >= 0.5 else ('' if x >= 0.25 else '')))).to_json())
risks = dict()
risks['合规风险'] = 43 - rating_result['风险评分']['合规风险']
risks['经营风险'] = 10 - rating_result['风险评分']['经营风险']['合计']
risks['关联风险'] = 10 - rating_result['风险评分']['关联风险']['合计']
df_risks = pd.DataFrame({'实际值': risks, '最大值': {"合规风险": 43, "经营风险": 10, "关联风险": 10}})
result['指标表格']['风险指标'] = json.loads((df_risks['实际值'] / df_risks['最大值']).apply(lambda x: '' if x >= 1 else (
'' if x >= 0.75 else ('' if x >= 0.5 else ('' if x >= 0.25 else '')))).to_json())
result_head()
make_radar_chart()
credit_analysis_content()
make_total_score()
profitability()
growth_ability()
assets_quality()
rating_history()
rating_index_analysis()
insert_data_to_tfse('企业', '综合评价分析', result)
def financial_analysis(rid):
"""
Notes
Parameters:
-
Returns:
res: desc
"""
# Parameters
rating_result = find_data_in_tfse('评价', '评价结果', {"评价ID": rid})[0]
rating_input = find_data_in_tfse('评价', '综合评价填报', {"评价ID": rid})[0]
industry = rating_input['行业选择']
periods = list(pd.DataFrame(rating_input['财务填报']['资产负债表']).sort_values('报告期', ascending=False)['报告期'][0:2].values)
df_recent_2year = pd.DataFrame(find_data_in_tfse('企业', '指标明细', {'企业ID': rating_result['企业ID']})).sort_values('年报期', ascending=False)[0:2]
df_this = df_recent_2year[0:1]
df_last = df_recent_2year[1:2]
# Returns
result = dict()
result['企业ID'] = rating_result['企业ID']
# 财报期
result['财报期'] = periods[0]
# 更新日期
result['更新日期'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
# 与行业平均对比
score_avg = {
"盈利能力": 8,
"资产质量": 9,
"债务风险": 9,
"经营增长": 9
}
score_actl = {
"盈利能力": rating_result['财务评分']['盈利能力']['合计'],
"资产质量": rating_result['财务评分']['资产质量']['合计'],
"债务风险": rating_result['财务评分']['债务风险']['合计'],
"经营增长": rating_result['财务评分']['经营增长']['合计']
}
# 与去年对比
data_last = list(json.loads(df_last[['净资产收益率', '存货周转率', '已获利息倍数', '应收账款周转率', '总资产周转率', '总资产增长率', '总资产报酬率', '技术投入比率', '营业增长率', '资产负债率', '速动比率']].T.to_json()).values())
url = "http://139.9.249.34:51012/tfse_rating/rating/financial_score"
headers = {'token': "X0gSlC!YE8jmr2jJr&ilcjS83j!tsoh5", "content-type": "application/json"}
data = {"财务指标": data_last, "所属行业": industry}
score_last_origin = json.loads(requests.post(url=url, headers=headers, data=json.dumps(data)).text)['result']
score_last = {
"盈利能力": score_last_origin['盈利能力']['合计'],
"资产质量": score_last_origin['资产质量']['合计'],
"债务风险": score_last_origin['债务风险']['合计'],
"经营增长": score_last_origin['经营增长']['合计']
}
df = pd.DataFrame({'今年值': score_actl, '去年值': score_last, "平均值": score_avg})
# 同行对比图
df1 = df[['今年值', '平均值']]
df1 = df1.rename(columns={'今年值': '公司水平', '平均值': '平均水平'})
result['同行对比图'] = json.loads(df1.to_json())
# 去年对比图
df2 = df[['今年值', '去年值']]
df2 = df2.rename(columns={'今年值': periods[0], '去年值': periods[1]})
result['去年对比图'] = json.loads(df2.to_json())
# 去年对比表
result['去年对比表'] = json.loads((df['今年值']/df['去年值']-1).apply(lambda x: '{}%'.format(round(x*100, 2))).T.to_json())
# 同行对比表
result['同行对比表'] = json.loads((df['今年值']/df['平均值']-1).apply(lambda x: '{}%'.format(round(x * 100, 2))).T.to_json())
# 指标详情
index_value = json.loads(df_this[['净资产收益率', '总资产报酬率', '总资产周转率', '应收账款周转率', '存货周转率', '资产负债率', '已获利息倍数', '速动比率', '营业增长率', '总资产增长率', '技术投入比率']].apply(lambda x: '{}%'.format(x.values[0]) if '' in x.name else x.values[0]).T.to_json())
rate = dict()
rate['净资产收益率'] = rating_result['财务评分']['盈利能力']['净资产收益率']/8
rate['总资产报酬率'] = rating_result['财务评分']['盈利能力']['总资产报酬率']/8
rate['总资产周转率'] = rating_result['财务评分']['资产质量']['总资产周转率']/6
rate['存货周转率'] = rating_result['财务评分']['资产质量']['存货周转率']/6
rate['应收账款周转率'] = rating_result['财务评分']['资产质量']['应收账款周转率']/6
rate['资产负债率'] = rating_result['财务评分']['债务风险']['资产负债率']/8
rate['已获利息倍数'] = rating_result['财务评分']['债务风险']['已获利息倍数']/5
rate['速动比率'] = rating_result['财务评分']['债务风险']['速动比率']/5
rate['营业增长率'] = rating_result['财务评分']['经营增长']['营业增长率']/8
rate['总资产增长率'] = rating_result['财务评分']['经营增长']['总资产增长率']/5
rate['技术投入比率'] = rating_result['财务评分']['经营增长']['技术投入比率']/5
df_rate = pd.DataFrame([rate])
index_level = json.loads(df_rate.apply(lambda x: '' if x.values[0] >= 1 else ('' if x.values[0] >= 0.75 else ('' if x.values[0] >= 0.5 else ('' if x.values[0] >= 0.25 else '')))).to_json())
df_index = pd.DataFrame({'': index_value, "级别": index_level})
result['指标详情'] = df_iterrows(df_index)
insert_data_to_tfse('企业', '财务要素分析', result)
def risk_analysis_etl(rid):
"""
风险要素分析
Parameters:
rid str 评价ID
Returns:
-
"""
# Params
rating_result = find_data_in_tfse('评价', '评价结果', {"评价ID": rid})[0]
risk_data = find_data_in_tfse('评价', '风险数据', {"评价ID": rid})[0]
# Returns
result = dict()
# 计算风险分数
def risk_relative_score():
risk_score = rating_result['风险评分']['合计']
relative_score = 100 if risk_score/43 >= 1 else round(risk_score/43*100, 2)
return relative_score
# 计算风险级别
def risk_level():
risk_score = rating_result['风险评分']['合计']
if risk_score >= 43:
level = ''
elif risk_score >= 33:
level = '较高'
elif risk_score >= 23:
level = '中等'
elif risk_score >= 10:
level = '警示'
else:
level = ''
return level
# 统计风险数量
def stat_risk_num():
return sum(risk_data['合规风险'].values()) + sum(risk_data['经营风险'].values())
# 判断失信人
def is_break_trust():
return '' if risk_data['合规风险']['失信人'] >= 1 else ''
# 合规风险表格
def eligibility_risk():
def degree_of_impact(param):
if param.name in ['失信人', '严重违法']:
impact = '严重'
elif param.name in ['经营异常', '欠税公告', '税收违法']:
impact = '异常'
elif param.name in ['立案信息', '行政处罚', '环保处罚']:
impact = '中等'
else:
impact = ''
return impact
df = pd.DataFrame({'合规风险': risk_data['合规风险']})
df = df.drop(df[df['合规风险'] == 0].index)
df['影响程度'] = df.T.apply(lambda x: degree_of_impact(x))
return df_iterrows(df)
# 经营风险表格
def operating_risk():
def degree_of_impact(param):
if param.name in ['开庭公告', '法院公告', '诉讼', '送达公告'] and param.values[0] >= 50:
impact = '异常'
elif param.name in ['开庭公告', '法院公告', '诉讼', '送达公告'] and param.values[0] >= 25:
impact = '中等'
elif param.name in ['开庭公告', '法院公告', '诉讼', '送达公告'] and param.values[0] >= 10:
impact = '警示'
elif param.name in ['被执行人'] and param.values[0] >= 30:
impact = '异常'
elif param.name in ['被执行人'] and param.values[0] >= 15:
impact = '中等'
elif param.name in ['被执行人'] and param.values[0] >= 6:
impact = '警示'
elif param.name in ['股权出质'] and param.values[0] >= 10:
impact = '异常'
elif param.name in ['股权出质'] and param.values[0] >= 5:
impact = '中等'
elif param.name in ['股权出质'] and param.values[0] >= 3:
impact = '警示'
else:
impact = '轻微'
return impact
df = pd.DataFrame({'经营风险': risk_data['经营风险']})
df = df.drop(df[df['经营风险'] == 0].index)
df['影响程度'] = df.T.apply(lambda x: degree_of_impact(x))
return df_iterrows(df)
# 关联风险表格
def associate_risk():
data = associate_risk_detail(rating_result['企业名称'])
return df_iterrows(pd.DataFrame(data).sort_values('total', ascending=False).set_index('title'))
# 变更记录表格
def change_log():
data = change_log_detail(rating_result['企业名称'])
def degree_of_impact(param):
if param.name in ['法定代表人变更', '主要人员变更'] and param.values[0] >= 20:
impact = '异常'
elif param.name in ['法定代表人变更', '主要人员变更'] and param.values[0] >= 10:
impact = '中等'
elif param.name in ['法定代表人变更', '主要人员变更'] and param.values[0] >= 4:
impact = '警示'
else:
impact = '轻微'
return impact
df = pd.DataFrame(data).sort_values('total', ascending=False).set_index('title')
df['影响程度'] = df.T.apply(lambda x: degree_of_impact(x))
return df_iterrows(df[['total', '影响程度']])
# 汇总数据处理结果
result['企业ID'] = rating_result['企业ID']
result['更新日期'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
result['风险分数'] = risk_relative_score()
result['风险级别'] = risk_level()
result['风险数量'] = stat_risk_num()
result['列入失信名单'] = is_break_trust()
result['合规风险'] = eligibility_risk()
result['经营风险'] = operating_risk()
result['周边风险'] = associate_risk()
result['变更记录'] = change_log()
insert_data_to_tfse('企业', '风险要素分析', result)