tfse-app-api-v0.2/Implements/Rating/EsgImpl.py

638 lines
29 KiB
Python
Raw Normal View History

import json
import time
import random
import requests
from DBHelper.MongoHelper import MongoHelper
2022-05-05 16:20:53 +08:00
from Objects.Common.Rating.FECRRating import RatingRecord
from Objects.InProject.Rating.esg import EnvironmentQuestionnaire, ThreeYearDataByE, OtherTypeDataByE, \
2022-04-15 15:06:05 +08:00
ThreeYearDataByS, SocietyQuestionnaire, OtherTypeDataByS, ThreeYearDataByG, GovernanceQuestionnaire, \
OtherTypeDataByG, EsgInputData
from Objects.InProject.Rating.process import EsgNewProcess, EsgProcess
from Utils.ErrorUtil import ReturnConditionCheckFailed
class CreditUtils:
"""综合信用评价相关方法"""
db = MongoHelper('tfse_v0.21')
@staticmethod
def get_current_year():
"""获取当前年度"""
current_year = time.strftime("%Y", time.localtime()) + ""
return current_year
@staticmethod
def gen_new_rid():
"""生成新的评价ID如果该ID存在则重新生成"""
db = MongoHelper('tfse_v0.21')
def make_id(num):
"""
随机生成字符串
"""
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
salt = ''
for i in range(num):
salt += random.choice(choices)
return salt
# 生成新ID
new_id = make_id(8)
# 检查新ID是否存在如果存在则继续生成新ID
case = db.find_all_data_with_count(
'企业数据',
'评价记录',
{'评价ID': new_id}
)
while case > 0:
new_id = make_id(8)
# 新ID可使用返回新ID
return new_id
@staticmethod
def make_3_report_dates():
"""
生成最近三年报告期 日期
Parameters:
-
Returns:
report_dates 三年报告期
"""
current_year = int(time.strftime("%Y", time.localtime())) - 1
date_01 = str(current_year) + '-12-31'
date_02 = str(current_year - 1) + '-12-31'
date_03 = str(current_year - 2) + '-12-31'
report_dates = [date_01, date_02, date_03]
return report_dates
2022-04-15 15:06:05 +08:00
@staticmethod
def make_3_report_years():
"""
生成最近三年报告期
Parameters:
-
Returns:
report_dates 三年报告期
"""
current_year = int(time.strftime("%Y", time.localtime())) - 1
date_01 = str(current_year) + ''
date_02 = str(current_year - 1) + ''
date_03 = str(current_year - 2) + ''
report_dates = [date_01, date_02, date_03]
return report_dates
@staticmethod
def update_process_status(rid, update_data):
"""
调用更新流程接口
Parameters:
update_data dict 需要更新的数据
Returns:
result str 更新结果
"""
url = "http://api.fecribd.com/etl_tfse/company/update_process_data"
token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4"
headers = {'token': token, 'Content-Type': 'application/json; charset=UTF-8'}
2022-04-15 15:06:05 +08:00
data = json.dumps({"rid": rid, "types": "ESG评价流程", "renew_data": update_data})
requests.post(url, headers=headers, data=data)
2022-04-15 15:06:05 +08:00
class EsgRating(EsgInputData, CreditUtils):
"""企业ESG评价"""
db = MongoHelper('tfse_v0.21')
def start_esg_rating_obj(self):
"""开始ESG评价"""
def check_input_data():
"""
input_data: 已存在的填报数据
根据企业ID+当前年度,查询是否存在填报数据
若存在填报数据则返回填报数据
若不存在填报数据则返回False
"""
input_data = self.db.find_single_data(
"ESG评价数据",
"填报数据",
{"企业ID": self.cid, "评价年度": self.get_current_year()},
2022-04-15 15:06:05 +08:00
["评价ID", "企业名称", "评价年度", "所属行业", "治理问卷", "环境问卷", "社会问卷", "公司当年收入(万元)"]
)
return input_data if input_data else False
def case_input_exist(param):
"""
param: dict 填报数据
根据评价ID查询评价进行状态
若评价已完成返回本年度已评价的提示且返回空对象
若评价进行中返回继续填报的提示且返回填报数据
"""
# rating_status: 评价服务状态
rating_status = self.db.find_single_data(
"企业数据",
"评价记录",
{"评价ID": param["评价ID"]},
["进行状态"]
)
# 把不需要返回的字段删掉
param.pop('评价ID')
# 方法返回结果
return ("本年度已评价", {}) if rating_status['进行状态'] == "完成" else ("继续填报", param)
def case_input_not_exist():
"""不存在填报数据"""
# new_rid: 新生成填报ID
new_rid = self.gen_new_rid()
# report_dates: 生成报告期
2022-04-15 15:06:05 +08:00
report_dates = self.make_3_report_years()
# company_name: 企业名称
company_name = self.db.find_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
["企业名称"]
)
def prepare_input_template():
"""
根据模板生成新的填报数据
input_template: 填报数据模板
"""
def environment():
"""环境问卷"""
def three_year_data(i):
"""近三年数据"""
e = ThreeYearDataByE()
e.year = report_dates[i]
e.raw_coal = None
e.coke = None
e.crude = None
e.fuel_oil = None
e.gasoline = None
e.kerosene = None
e.diesel_fuel = None
e.liquefied_petroleum_gas = None
e.natural_gas = None
e.gas = None
e.electricity = None
e.water = None
e.green_income = None
return e.dict_to_save()
environment_questionnaire = EnvironmentQuestionnaire()
environment_questionnaire.e_three_year_data = list()
for num in range(3):
environment_questionnaire.e_three_year_data.append(three_year_data(num))
2022-04-15 15:06:05 +08:00
environment_questionnaire.e_other_type_data = list()
return environment_questionnaire
def society():
"""社会问卷"""
def three_year_data(i):
s = ThreeYearDataByS()
2022-04-15 15:06:05 +08:00
s.year = report_dates[i]
s.employee_total = None
s.leaving_total = None
s.salary_level = None
s.work_length = None
s.labor_dispute = None
s.security_incident = None
s.training_times = None
s.supplier = None
s.sales = None
s.return_sales = None
s.donate = None
return s.dict_to_save()
society_questionnaire = SocietyQuestionnaire()
society_questionnaire.s_three_year_data = list()
for num in range(3):
society_questionnaire.s_three_year_data.append(three_year_data(num))
2022-04-15 15:06:05 +08:00
society_questionnaire.s_other_type_data = list()
return society_questionnaire
def governance():
"""治理问卷"""
def three_year_data(i):
g = ThreeYearDataByG()
2022-04-15 15:06:05 +08:00
g.year = report_dates[i]
g.audit_report = None
g.net_assets = None
g.net_profit = None
return g.dict_to_save()
governance_questionnaire = GovernanceQuestionnaire()
governance_questionnaire.g_three_year_data = list()
for num in range(3):
governance_questionnaire.g_three_year_data.append(three_year_data(num))
2022-04-15 15:06:05 +08:00
governance_questionnaire.g_other_type_data = list()
return governance_questionnaire
self.rid = new_rid
self.cid = self.cid
self.company = company_name['企业名称']
self.year = self.get_current_year()
self.industry = list()
self.current_year_income = dict()
self.current_year_income[report_dates[0]] = None
self.current_year_income[report_dates[1]] = None
self.current_year_income[report_dates[2]] = None
self.environment = environment()
self.society = society()
self.governance = governance()
input_template = self.dict_to_save()
# 插入新的填报记录
self.db.insert_single_data(
2022-04-15 15:06:05 +08:00
'ESG评价数据',
'填报数据',
input_template
)
def prepare_rating_record_template():
"""根据模板生成新的评价记录"""
rating_record = RatingRecord()
rating_record.rid = new_rid
rating_record.cid = self.cid
rating_record.company = company_name['企业名称']
2022-04-15 15:06:05 +08:00
rating_record.project = 'ESG评价'
rating_record.methods = '企业申报'
rating_record.status = '进行'
rating_record.result = None
rating_record.report_fid = None
rating_record.certificate_fid = None
rating_record.begin_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
rating_record.submit_time = None
rating_record.finish_time = None
rating_record.finish_date = None
rating_record_template = rating_record.dict_to_save()
self.db.insert_single_data(
'企业数据',
'评价记录',
rating_record_template
)
def prepare_process_template():
"""根据数据模板生成新的评价流程数据"""
credit_process = EsgProcess()
preparation = credit_process.EsgDataPreparation()
preparation.fill_in_data = 0
preparation.risk_data = 0
credit_process.data_preparation = preparation
score = credit_process.EsgModelScoring()
score.rating_result = 0
credit_process.model_scoring = score
document = credit_process.EsgDocumentGeneration()
document.report_data = 0
document.report = 0
document.certificate = 0
credit_process.document_generation = document
clean = credit_process.EsgDataCleaning()
clean.esg_evaluation_analysis = 0
credit_process.data_cleaning = clean
process_template = dict()
process_template['评价ID'] = new_rid
process_template['企业ID'] = self.cid
process_template['评价流程'] = credit_process.dict_to_save()
self.db.insert_single_data(
'ESG评价数据',
'评价流程',
process_template
)
def prepare_risk_data():
# 调用清洗数据接口,执行风险数据清洗程序
2022-04-15 15:06:05 +08:00
url = "http://api.fecribd.com/etl_tfse/rating/esg_risk_data"
token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4"
headers = {'token': token, 'Content-Type': 'application/json; charset=UTF-8'}
data = json.dumps({"company_id": self.cid, "company_name": company_name, "evaluation_id": new_rid})
requests.post(url, headers=headers, data=data)
# 风险数据清洗完成后更新流程状态
2022-05-18 19:55:07 +08:00
self.update_process_status(new_rid, {"评价流程.数据准备.风险数据": 1})
def __main__():
prepare_input_template()
prepare_rating_record_template()
prepare_process_template()
prepare_risk_data()
__main__()
return "开始填报", check_input_data()
def execute_func():
"""判断是否有填报记录"""
input_data = check_input_data()
if input_data is not False:
return case_input_exist(input_data)
else:
return case_input_not_exist()
return execute_func()
2022-04-15 15:06:05 +08:00
def save_input_general(self, input_form):
"""保存填报数据"""
# 根据cid查询评价记录得到rid
find = {"企业ID": self.cid, "评价项目": "ESG评价", "进行状态": "进行"}
rid = self.db.find_single_data("企业数据", "评价记录", find, ['评价ID'])
if not rid:
return "没有正在进行的评价", 200
def handle_input_form():
"""处理表单数据"""
def environment():
"""环境问卷"""
three_year_data = input_form['环境问卷']['近三年公司数据']
for item in three_year_data:
for k, v in item.items():
if isinstance(v, float) or isinstance(v, int) or item == 0:
item[k] = float(v)
def society():
"""社会问卷"""
three_year_data = input_form['社会问卷']['近三年公司数据']
filter_list = ['员工总数(年底)', '当年离职人数', '劳资纠纷次数', '安全事故发生次数', '组织培训次数']
for item in three_year_data:
for k, v in item.items():
if k not in filter_list and (isinstance(v, float) or isinstance(v, int) or item == 0):
item[k] = float(v)
def governance():
"""治理问卷"""
three_year_data = input_form['治理问卷']['近三年公司数据']
for item in three_year_data:
for k, v in item.items():
if isinstance(v, float) or isinstance(v, int) or item == 0:
item[k] = float(v)
environment()
society()
governance()
def environment_script():
"""环境问卷校验"""
questionnaire = input_form['环境问卷']
def three_year_data_script():
"""近三年公司数据"""
three_year_data = ThreeYearDataByE()
for data in questionnaire['近三年公司数据']:
three_year_data.year = data['年份']
three_year_data.raw_coal = data['公司当年消耗的原煤(千克)']
three_year_data.coke = data['公司当年消耗的焦炭(千克)']
three_year_data.crude = data['公司当年消耗的原油(升)']
three_year_data.fuel_oil = data['公司当年消耗的燃料油(升)']
three_year_data.gasoline = data['公司当年消耗的汽油(升)']
three_year_data.kerosene = data['公司当年消耗的煤油(升)']
three_year_data.diesel_fuel = data['公司当年消耗的柴油(升)']
three_year_data.liquefied_petroleum_gas = data['公司当年消耗的液化石油气(千克)']
three_year_data.natural_gas = data['公司当年消耗的天然气(立方米)']
three_year_data.gas = data['公司当年消耗的煤气(立方米)']
three_year_data.electricity = data['公司当年消耗的电(千瓦时)']
three_year_data.water = data['公司当年耗水(吨)']
three_year_data.green_income = data['公司当年绿色业务收入(万元)——包括不限于清洁能源、清洁交通、绿色建筑、清洁技术等']
eq.e_three_year_data.append(three_year_data.dict_to_save())
def other_type_data_script():
"""其他类型问卷"""
2022-04-24 10:27:43 +08:00
if questionnaire['其他类型问卷']:
other_type_data = OtherTypeDataByE()
other_type_data.question_01 = questionnaire['其他类型问卷'][0][0]
other_type_data.question_02 = questionnaire['其他类型问卷'][1][0]
other_type_data.question_03 = questionnaire['其他类型问卷'][2][0]
other_type_data.question_04 = questionnaire['其他类型问卷'][3][0]
other_type_data.question_05 = questionnaire['其他类型问卷'][4][0]
other_type_data.question_06 = questionnaire['其他类型问卷'][5][0]
other_type_data.question_07 = questionnaire['其他类型问卷'][6][0]
other_type_data.question_08 = questionnaire['其他类型问卷'][7][0]
other_type_data.list_to_save()
2022-04-15 15:06:05 +08:00
return questionnaire['其他类型问卷']
eq = EnvironmentQuestionnaire()
eq.e_three_year_data = list()
three_year_data_script()
other_type_data_script()
return eq
def society_script():
"""社会问卷校验"""
questionnaire = input_form['社会问卷']
def three_year_data_script():
"""近三年公司数据"""
three_year_data = ThreeYearDataByS()
for data in questionnaire['近三年公司数据']:
three_year_data.year = data['年份']
three_year_data.employee_total = data['员工总数(年底)']
three_year_data.leaving_total = data['当年离职人数']
three_year_data.salary_level = data['人均薪酬水平(元/月)']
three_year_data.work_length = data['劳动合同要求工作时长(每周小时数)']
three_year_data.labor_dispute = data['劳资纠纷次数']
three_year_data.security_incident = data['安全事故发生次数']
three_year_data.training_times = data['组织培训次数']
three_year_data.supplier = data['公司从前3大供应商拿货占全部供应商比例%']
three_year_data.sales = data['公司前3大客户销售额占全部销售比例%']
three_year_data.return_sales = data['返修、退回、投诉产品对应销售额占全部销售比例(%']
three_year_data.donate = data['扶贫+捐赠规模(万元)']
sq.s_three_year_data.append(three_year_data.dict_to_save())
def other_type_data_script():
"""其他类型问卷"""
2022-04-24 10:27:43 +08:00
if questionnaire['其他类型问卷']:
other_type_data = OtherTypeDataByS()
other_type_data.question_01 = questionnaire['其他类型问卷'][0]
other_type_data.question_02 = questionnaire['其他类型问卷'][1]
other_type_data.question_03 = questionnaire['其他类型问卷'][2]
other_type_data.question_04 = questionnaire['其他类型问卷'][3]
other_type_data.list_to_save()
2022-04-15 15:06:05 +08:00
return questionnaire['其他类型问卷']
sq = SocietyQuestionnaire()
sq.s_three_year_data = list()
three_year_data_script()
sq.s_other_type_data = other_type_data_script()
return sq
def governance_script():
"""治理问卷校验"""
questionnaire = input_form['治理问卷']
def three_year_data_script():
"""近三年公司数据"""
three_year_data = ThreeYearDataByG()
for data in questionnaire['近三年公司数据']:
three_year_data.year = data['年份']
three_year_data.audit_report = data['公司是否有审计报告']
three_year_data.net_assets = data['公司净资产(万元)']
three_year_data.net_profit = data['公司净利润(万元)']
gq.g_three_year_data.append(three_year_data.dict_to_save())
def other_type_data_script():
"""其他类型问卷"""
2022-04-24 10:27:43 +08:00
if questionnaire['其他类型问卷']:
other_type_data = OtherTypeDataByG()
other_type_data.question_01 = questionnaire['其他类型问卷'][0]
other_type_data.question_02 = questionnaire['其他类型问卷'][1]
other_type_data.question_03 = int(questionnaire['其他类型问卷'][2])
other_type_data.question_04 = int(questionnaire['其他类型问卷'][3])
other_type_data.question_05 = questionnaire['其他类型问卷'][4]
other_type_data.question_06 = questionnaire['其他类型问卷'][5]
other_type_data.question_07 = questionnaire['其他类型问卷'][6]
other_type_data.question_08 = questionnaire['其他类型问卷'][7]
other_type_data.list_to_save()
2022-04-15 15:06:05 +08:00
return questionnaire['其他类型问卷']
gq = GovernanceQuestionnaire()
gq.g_three_year_data = list()
three_year_data_script()
gq.g_other_type_data = other_type_data_script()
return gq
def rating_input():
"""校验填报数据并保存"""
self.rid = rid['评价ID']
self.company = input_form['企业名称']
self.year = input_form['评价年度']
self.industry = input_form['所属行业']
self.current_year_income = input_form['公司当年收入(万元)']
self.environment = environment_script()
self.society = society_script()
self.governance = governance_script()
input_data = self.dict_to_save()
input_data['环境问卷']['其他类型问卷'] = input_form['环境问卷']['其他类型问卷']
self.db.update_single_data(
'ESG评价数据',
'填报数据',
rid,
input_data
)
2022-05-18 19:55:07 +08:00
self.update_process_status(rid['评价ID'], {"评价流程.数据准备.填报数据": 1})
2022-04-15 15:06:05 +08:00
def __main__():
handle_input_form()
rating_input()
__main__()
def exec_esg_rating(self):
"""提交数据开始esg评价"""
# 根据cid查询评价记录得到rid
find = {"企业ID": self.cid, "评价项目": "ESG评价", "进行状态": "进行"}
rid = self.db.find_single_data("企业数据", "评价记录", find, ['评价ID'])
if not rid:
return "没有正在进行的评价", 200
self.db.update_single_data(
'企业数据', '评价记录', rid,
{"提交填报时间": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())})
def get_rating_data():
"""获取打分需要的数据"""
input_data = self.db.find_single_data(
'ESG评价数据',
'填报数据',
rid,
["评价ID", "企业ID", "企业名称", "评价年度", "所属行业", "公司当年收入(万元)", "环境问卷", "社会问卷", "治理问卷"]
)
risk_data = self.db.find_single_data(
'ESG评价数据',
'风险数据',
rid,
["严重违法", "失信人", "法律诉讼", "税收违法", "经营异常", "行政处罚", "被执行人"]
)
return {**input_data, **{"风险数据": risk_data}}
rating_input = get_rating_data()
def esg_model():
"""模型打分"""
url = "http://api.fecribd.com/tfse_rating/esg/esg_model"
token = "X0gSlC!YE8jmr2jJr&ilcjS83j!tsoh5"
headers = {'token': token, "Content-Type": "application/json;charset=UTF-8"}
data = json.dumps({"input_data": rating_input})
res = requests.post(url, headers=headers, data=data)
if res.status_code == 200:
update_data = {
"评价结果": json.loads(res.text)['result'],
}
self.db.update_single_data("企业数据", "评价记录", rid, update_data)
else:
raise ReturnConditionCheckFailed("模型打分失败", 200)
2022-04-15 15:06:05 +08:00
def gen_report():
"""报告生成"""
url = "http://api.fecribd.com/tfse_rating/report/generate_esg_report"
token = "X0gSlC!YE8jmr2jJr&ilcjS83j!tsoh5"
headers = {'token': token, "Content-Type": "application/json;charset=UTF-8"}
data = {
"company": rating_input['企业名称'],
"cid": rating_input['企业ID'],
"rid": rating_input['评价ID']
}
res = requests.post(url, headers=headers, data=json.dumps(data))
if res.status_code == 200:
update_data = {
"报告fid": json.loads(res.text)['result']['FileID']
}
self.db.update_single_data("企业数据", "评价记录", rid, update_data)
2022-05-18 19:55:07 +08:00
self.update_process_status(rid['评价ID'], {"评价流程.文档生成.ESG报告": 1})
2022-04-15 15:06:05 +08:00
else:
2022-05-18 19:55:07 +08:00
self.update_process_status(rid['评价ID'], {"评价流程.文档生成.ESG报告": -1})
raise ReturnConditionCheckFailed("报告生成失败", 200)
2022-04-15 15:06:05 +08:00
def gen_certified():
"""证书生成"""
url = "http://api.fecribd.com/tfse_rating/certificate/generate_certificate"
token = "X0gSlC!YE8jmr2jJr&ilcjS83j!tsoh5"
headers = {'token': token, "Content-Type": "application/json;charset=UTF-8"}
data = {
"company": rating_input['企业名称'],
"cid": rating_input['企业ID'],
"rid": rating_input['评价ID'],
"type": "esg"
}
res = requests.post(url, headers=headers, data=json.dumps(data))
if res.status_code == 200:
update_data = {
"证书fid": json.loads(res.text)['result']['FileID']
}
self.db.update_single_data("企业数据", "评价记录", rid, update_data)
2022-05-18 19:55:07 +08:00
self.update_process_status(rid['评价ID'], {"评价流程.文档生成.ESG证书": 1})
2022-04-15 15:06:05 +08:00
else:
2022-05-18 19:55:07 +08:00
self.update_process_status(rid['评价ID'], {"评价流程.文档生成.ESG证书": -1})
raise ReturnConditionCheckFailed("证书生成失败", 200)
2022-04-15 15:06:05 +08:00
def esg_index():
url = "http://api.fecribd.com/etl_tfse/company/esg_index_etl"
token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4"
headers = {'token': token, "Content-Type": "application/json;charset=UTF-8"}
data = {
"rid": rating_input['评价ID']
}
res = requests.post(url, headers=headers, data=json.dumps(data))
if res.status_code == 200:
2022-05-18 19:55:07 +08:00
self.update_process_status(rid['评价ID'], {"评价流程.数据清洗.ESG评价分析": 1})
2022-04-15 15:06:05 +08:00
else:
2022-05-18 19:55:07 +08:00
self.update_process_status(rid['评价ID'], {"评价流程.数据清洗.ESG评价分析": -1})
raise ReturnConditionCheckFailed("主页ESG数据生成失败", 200)
2022-04-15 15:06:05 +08:00
def update_rating_records():
"""更新完成时间和进行状态"""
finish_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
finish_date = time.strftime("%Y-%m-%d", time.localtime())
update_data = {"进行状态": "完成", "评价完成时间": finish_time, "评价完成日期": finish_date}
self.db.update_single_data('企业数据', '评价记录', rid, update_data)
def __main__():
esg_model()
gen_report()
gen_certified()
esg_index()
update_rating_records()
__main__()
2022-04-24 10:09:31 +08:00
return '执行成功', 200