tfse-app-api-v0.2/Implements/Rating/EsgImpl.py

638 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import time
import random
import requests
from DBHelper.MongoHelper import MongoHelper
from Objects.Common.Rating.FECRRating import RatingRecord
from Objects.InProject.Rating.esg import EnvironmentQuestionnaire, ThreeYearDataByE, OtherTypeDataByE, \
ThreeYearDataByS, SocietyQuestionnaire, OtherTypeDataByS, ThreeYearDataByG, GovernanceQuestionnaire, \
OtherTypeDataByG, EsgInputData
from Objects.InProject.Rating.process import EsgNewProcess, EsgProcess
from Utils.ErrorUtil import ReturnConditionCheckFailed
class CreditUtils:
"""综合信用评价相关方法"""
db = MongoHelper('tfse_v0.21')
@staticmethod
def get_current_year():
"""获取当前年度"""
current_year = time.strftime("%Y", time.localtime()) + ""
return current_year
@staticmethod
def gen_new_rid():
"""生成新的评价ID如果该ID存在则重新生成"""
db = MongoHelper('tfse_v0.21')
def make_id(num):
"""
随机生成字符串
"""
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
salt = ''
for i in range(num):
salt += random.choice(choices)
return salt
# 生成新ID
new_id = make_id(8)
# 检查新ID是否存在如果存在则继续生成新ID
case = db.find_all_data_with_count(
'企业数据',
'评价记录',
{'评价ID': new_id}
)
while case > 0:
new_id = make_id(8)
# 新ID可使用返回新ID
return new_id
@staticmethod
def make_3_report_dates():
"""
生成最近三年报告期 日期
Parameters:
-
Returns:
report_dates 三年报告期
"""
current_year = int(time.strftime("%Y", time.localtime())) - 1
date_01 = str(current_year) + '-12-31'
date_02 = str(current_year - 1) + '-12-31'
date_03 = str(current_year - 2) + '-12-31'
report_dates = [date_01, date_02, date_03]
return report_dates
@staticmethod
def make_3_report_years():
"""
生成最近三年报告期
Parameters:
-
Returns:
report_dates 三年报告期
"""
current_year = int(time.strftime("%Y", time.localtime())) - 1
date_01 = str(current_year) + ''
date_02 = str(current_year - 1) + ''
date_03 = str(current_year - 2) + ''
report_dates = [date_01, date_02, date_03]
return report_dates
@staticmethod
def update_process_status(rid, update_data):
"""
调用更新流程接口
Parameters:
update_data dict 需要更新的数据
Returns:
result str 更新结果
"""
url = "http://api.fecribd.com/etl_tfse/company/update_process_data"
token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4"
headers = {'token': token, 'Content-Type': 'application/json; charset=UTF-8'}
data = json.dumps({"rid": rid, "types": "ESG评价流程", "renew_data": update_data})
requests.post(url, headers=headers, data=data)
class EsgRating(EsgInputData, CreditUtils):
"""企业ESG评价"""
db = MongoHelper('tfse_v0.21')
def start_esg_rating_obj(self):
"""开始ESG评价"""
def check_input_data():
"""
input_data: 已存在的填报数据
根据企业ID+当前年度,查询是否存在填报数据
若存在填报数据,则返回填报数据
若不存在填报数据则返回False
"""
input_data = self.db.find_single_data(
"ESG评价数据",
"填报数据",
{"企业ID": self.cid, "评价年度": self.get_current_year()},
["评价ID", "企业名称", "评价年度", "所属行业", "治理问卷", "环境问卷", "社会问卷", "公司当年收入(万元)"]
)
return input_data if input_data else False
def case_input_exist(param):
"""
param: dict 填报数据
根据评价ID查询评价进行状态
若评价已完成,返回‘本年度已评价’的提示,且返回空对象
若评价进行中,返回‘继续填报’的提示,且返回填报数据
"""
# rating_status: 评价服务状态
rating_status = self.db.find_single_data(
"企业数据",
"评价记录",
{"评价ID": param["评价ID"]},
["进行状态"]
)
# 把不需要返回的字段删掉
param.pop('评价ID')
# 方法返回结果
return ("本年度已评价", {}) if rating_status['进行状态'] == "完成" else ("继续填报", param)
def case_input_not_exist():
"""不存在填报数据"""
# new_rid: 新生成填报ID
new_rid = self.gen_new_rid()
# report_dates: 生成报告期
report_dates = self.make_3_report_years()
# company_name: 企业名称
company_name = self.db.find_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
["企业名称"]
)
def prepare_input_template():
"""
根据模板生成新的填报数据
input_template: 填报数据模板
"""
def environment():
"""环境问卷"""
def three_year_data(i):
"""近三年数据"""
e = ThreeYearDataByE()
e.year = report_dates[i]
e.raw_coal = None
e.coke = None
e.crude = None
e.fuel_oil = None
e.gasoline = None
e.kerosene = None
e.diesel_fuel = None
e.liquefied_petroleum_gas = None
e.natural_gas = None
e.gas = None
e.electricity = None
e.water = None
e.green_income = None
return e.dict_to_save()
environment_questionnaire = EnvironmentQuestionnaire()
environment_questionnaire.e_three_year_data = list()
for num in range(3):
environment_questionnaire.e_three_year_data.append(three_year_data(num))
environment_questionnaire.e_other_type_data = list()
return environment_questionnaire
def society():
"""社会问卷"""
def three_year_data(i):
s = ThreeYearDataByS()
s.year = report_dates[i]
s.employee_total = None
s.leaving_total = None
s.salary_level = None
s.work_length = None
s.labor_dispute = None
s.security_incident = None
s.training_times = None
s.supplier = None
s.sales = None
s.return_sales = None
s.donate = None
return s.dict_to_save()
society_questionnaire = SocietyQuestionnaire()
society_questionnaire.s_three_year_data = list()
for num in range(3):
society_questionnaire.s_three_year_data.append(three_year_data(num))
society_questionnaire.s_other_type_data = list()
return society_questionnaire
def governance():
"""治理问卷"""
def three_year_data(i):
g = ThreeYearDataByG()
g.year = report_dates[i]
g.audit_report = None
g.net_assets = None
g.net_profit = None
return g.dict_to_save()
governance_questionnaire = GovernanceQuestionnaire()
governance_questionnaire.g_three_year_data = list()
for num in range(3):
governance_questionnaire.g_three_year_data.append(three_year_data(num))
governance_questionnaire.g_other_type_data = list()
return governance_questionnaire
self.rid = new_rid
self.cid = self.cid
self.company = company_name['企业名称']
self.year = self.get_current_year()
self.industry = list()
self.current_year_income = dict()
self.current_year_income[report_dates[0]] = None
self.current_year_income[report_dates[1]] = None
self.current_year_income[report_dates[2]] = None
self.environment = environment()
self.society = society()
self.governance = governance()
input_template = self.dict_to_save()
# 插入新的填报记录
self.db.insert_single_data(
'ESG评价数据',
'填报数据',
input_template
)
def prepare_rating_record_template():
"""根据模板生成新的评价记录"""
rating_record = RatingRecord()
rating_record.rid = new_rid
rating_record.cid = self.cid
rating_record.company = company_name['企业名称']
rating_record.project = 'ESG评价'
rating_record.methods = '企业申报'
rating_record.status = '进行'
rating_record.result = None
rating_record.report_fid = None
rating_record.certificate_fid = None
rating_record.begin_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
rating_record.submit_time = None
rating_record.finish_time = None
rating_record.finish_date = None
rating_record_template = rating_record.dict_to_save()
self.db.insert_single_data(
'企业数据',
'评价记录',
rating_record_template
)
def prepare_process_template():
"""根据数据模板生成新的评价流程数据"""
credit_process = EsgProcess()
preparation = credit_process.EsgDataPreparation()
preparation.fill_in_data = 0
preparation.risk_data = 0
credit_process.data_preparation = preparation
score = credit_process.EsgModelScoring()
score.rating_result = 0
credit_process.model_scoring = score
document = credit_process.EsgDocumentGeneration()
document.report_data = 0
document.report = 0
document.certificate = 0
credit_process.document_generation = document
clean = credit_process.EsgDataCleaning()
clean.esg_evaluation_analysis = 0
credit_process.data_cleaning = clean
process_template = dict()
process_template['评价ID'] = new_rid
process_template['企业ID'] = self.cid
process_template['评价流程'] = credit_process.dict_to_save()
self.db.insert_single_data(
'ESG评价数据',
'评价流程',
process_template
)
def prepare_risk_data():
# 调用清洗数据接口,执行风险数据清洗程序
url = "http://api.fecribd.com/etl_tfse/rating/esg_risk_data"
token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4"
headers = {'token': token, 'Content-Type': 'application/json; charset=UTF-8'}
data = json.dumps({"company_id": self.cid, "company_name": company_name, "evaluation_id": new_rid})
requests.post(url, headers=headers, data=data)
# 风险数据清洗完成后更新流程状态
self.update_process_status(new_rid, {"评价流程.数据准备.风险数据": 1})
def __main__():
prepare_input_template()
prepare_rating_record_template()
prepare_process_template()
prepare_risk_data()
__main__()
return "开始填报", check_input_data()
def execute_func():
"""判断是否有填报记录"""
input_data = check_input_data()
if input_data is not False:
return case_input_exist(input_data)
else:
return case_input_not_exist()
return execute_func()
def save_input_general(self, input_form):
"""保存填报数据"""
# 根据cid查询评价记录得到rid
find = {"企业ID": self.cid, "评价项目": "ESG评价", "进行状态": "进行"}
rid = self.db.find_single_data("企业数据", "评价记录", find, ['评价ID'])
if not rid:
return "没有正在进行的评价", 200
def handle_input_form():
"""处理表单数据"""
def environment():
"""环境问卷"""
three_year_data = input_form['环境问卷']['近三年公司数据']
for item in three_year_data:
for k, v in item.items():
if isinstance(v, float) or isinstance(v, int) or item == 0:
item[k] = float(v)
def society():
"""社会问卷"""
three_year_data = input_form['社会问卷']['近三年公司数据']
filter_list = ['员工总数(年底)', '当年离职人数', '劳资纠纷次数', '安全事故发生次数', '组织培训次数']
for item in three_year_data:
for k, v in item.items():
if k not in filter_list and (isinstance(v, float) or isinstance(v, int) or item == 0):
item[k] = float(v)
def governance():
"""治理问卷"""
three_year_data = input_form['治理问卷']['近三年公司数据']
for item in three_year_data:
for k, v in item.items():
if isinstance(v, float) or isinstance(v, int) or item == 0:
item[k] = float(v)
environment()
society()
governance()
def environment_script():
"""环境问卷校验"""
questionnaire = input_form['环境问卷']
def three_year_data_script():
"""近三年公司数据"""
three_year_data = ThreeYearDataByE()
for data in questionnaire['近三年公司数据']:
three_year_data.year = data['年份']
three_year_data.raw_coal = data['公司当年消耗的原煤(千克)']
three_year_data.coke = data['公司当年消耗的焦炭(千克)']
three_year_data.crude = data['公司当年消耗的原油(升)']
three_year_data.fuel_oil = data['公司当年消耗的燃料油(升)']
three_year_data.gasoline = data['公司当年消耗的汽油(升)']
three_year_data.kerosene = data['公司当年消耗的煤油(升)']
three_year_data.diesel_fuel = data['公司当年消耗的柴油(升)']
three_year_data.liquefied_petroleum_gas = data['公司当年消耗的液化石油气(千克)']
three_year_data.natural_gas = data['公司当年消耗的天然气(立方米)']
three_year_data.gas = data['公司当年消耗的煤气(立方米)']
three_year_data.electricity = data['公司当年消耗的电(千瓦时)']
three_year_data.water = data['公司当年耗水(吨)']
three_year_data.green_income = data['公司当年绿色业务收入(万元)——包括不限于清洁能源、清洁交通、绿色建筑、清洁技术等']
eq.e_three_year_data.append(three_year_data.dict_to_save())
def other_type_data_script():
"""其他类型问卷"""
if questionnaire['其他类型问卷']:
other_type_data = OtherTypeDataByE()
other_type_data.question_01 = questionnaire['其他类型问卷'][0][0]
other_type_data.question_02 = questionnaire['其他类型问卷'][1][0]
other_type_data.question_03 = questionnaire['其他类型问卷'][2][0]
other_type_data.question_04 = questionnaire['其他类型问卷'][3][0]
other_type_data.question_05 = questionnaire['其他类型问卷'][4][0]
other_type_data.question_06 = questionnaire['其他类型问卷'][5][0]
other_type_data.question_07 = questionnaire['其他类型问卷'][6][0]
other_type_data.question_08 = questionnaire['其他类型问卷'][7][0]
other_type_data.list_to_save()
return questionnaire['其他类型问卷']
eq = EnvironmentQuestionnaire()
eq.e_three_year_data = list()
three_year_data_script()
other_type_data_script()
return eq
def society_script():
"""社会问卷校验"""
questionnaire = input_form['社会问卷']
def three_year_data_script():
"""近三年公司数据"""
three_year_data = ThreeYearDataByS()
for data in questionnaire['近三年公司数据']:
three_year_data.year = data['年份']
three_year_data.employee_total = data['员工总数(年底)']
three_year_data.leaving_total = data['当年离职人数']
three_year_data.salary_level = data['人均薪酬水平(元/月)']
three_year_data.work_length = data['劳动合同要求工作时长(每周小时数)']
three_year_data.labor_dispute = data['劳资纠纷次数']
three_year_data.security_incident = data['安全事故发生次数']
three_year_data.training_times = data['组织培训次数']
three_year_data.supplier = data['公司从前3大供应商拿货占全部供应商比例%']
three_year_data.sales = data['公司前3大客户销售额占全部销售比例%']
three_year_data.return_sales = data['返修、退回、投诉产品对应销售额占全部销售比例(%']
three_year_data.donate = data['扶贫+捐赠规模(万元)']
sq.s_three_year_data.append(three_year_data.dict_to_save())
def other_type_data_script():
"""其他类型问卷"""
if questionnaire['其他类型问卷']:
other_type_data = OtherTypeDataByS()
other_type_data.question_01 = questionnaire['其他类型问卷'][0]
other_type_data.question_02 = questionnaire['其他类型问卷'][1]
other_type_data.question_03 = questionnaire['其他类型问卷'][2]
other_type_data.question_04 = questionnaire['其他类型问卷'][3]
other_type_data.list_to_save()
return questionnaire['其他类型问卷']
sq = SocietyQuestionnaire()
sq.s_three_year_data = list()
three_year_data_script()
sq.s_other_type_data = other_type_data_script()
return sq
def governance_script():
"""治理问卷校验"""
questionnaire = input_form['治理问卷']
def three_year_data_script():
"""近三年公司数据"""
three_year_data = ThreeYearDataByG()
for data in questionnaire['近三年公司数据']:
three_year_data.year = data['年份']
three_year_data.audit_report = data['公司是否有审计报告']
three_year_data.net_assets = data['公司净资产(万元)']
three_year_data.net_profit = data['公司净利润(万元)']
gq.g_three_year_data.append(three_year_data.dict_to_save())
def other_type_data_script():
"""其他类型问卷"""
if questionnaire['其他类型问卷']:
other_type_data = OtherTypeDataByG()
other_type_data.question_01 = questionnaire['其他类型问卷'][0]
other_type_data.question_02 = questionnaire['其他类型问卷'][1]
other_type_data.question_03 = int(questionnaire['其他类型问卷'][2])
other_type_data.question_04 = int(questionnaire['其他类型问卷'][3])
other_type_data.question_05 = questionnaire['其他类型问卷'][4]
other_type_data.question_06 = questionnaire['其他类型问卷'][5]
other_type_data.question_07 = questionnaire['其他类型问卷'][6]
other_type_data.question_08 = questionnaire['其他类型问卷'][7]
other_type_data.list_to_save()
return questionnaire['其他类型问卷']
gq = GovernanceQuestionnaire()
gq.g_three_year_data = list()
three_year_data_script()
gq.g_other_type_data = other_type_data_script()
return gq
def rating_input():
"""校验填报数据并保存"""
self.rid = rid['评价ID']
self.company = input_form['企业名称']
self.year = input_form['评价年度']
self.industry = input_form['所属行业']
self.current_year_income = input_form['公司当年收入(万元)']
self.environment = environment_script()
self.society = society_script()
self.governance = governance_script()
input_data = self.dict_to_save()
input_data['环境问卷']['其他类型问卷'] = input_form['环境问卷']['其他类型问卷']
self.db.update_single_data(
'ESG评价数据',
'填报数据',
rid,
input_data
)
self.update_process_status(rid['评价ID'], {"评价流程.数据准备.填报数据": 1})
def __main__():
handle_input_form()
rating_input()
__main__()
def exec_esg_rating(self):
"""提交数据开始esg评价"""
# 根据cid查询评价记录得到rid
find = {"企业ID": self.cid, "评价项目": "ESG评价", "进行状态": "进行"}
rid = self.db.find_single_data("企业数据", "评价记录", find, ['评价ID'])
if not rid:
return "没有正在进行的评价", 200
self.db.update_single_data(
'企业数据', '评价记录', rid,
{"提交填报时间": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())})
def get_rating_data():
"""获取打分需要的数据"""
input_data = self.db.find_single_data(
'ESG评价数据',
'填报数据',
rid,
["评价ID", "企业ID", "企业名称", "评价年度", "所属行业", "公司当年收入(万元)", "环境问卷", "社会问卷", "治理问卷"]
)
risk_data = self.db.find_single_data(
'ESG评价数据',
'风险数据',
rid,
["严重违法", "失信人", "法律诉讼", "税收违法", "经营异常", "行政处罚", "被执行人"]
)
return {**input_data, **{"风险数据": risk_data}}
rating_input = get_rating_data()
def esg_model():
"""模型打分"""
url = "http://api.fecribd.com/tfse_rating/esg/esg_model"
token = "X0gSlC!YE8jmr2jJr&ilcjS83j!tsoh5"
headers = {'token': token, "Content-Type": "application/json;charset=UTF-8"}
data = json.dumps({"input_data": rating_input})
res = requests.post(url, headers=headers, data=data)
if res.status_code == 200:
update_data = {
"评价结果": json.loads(res.text)['result'],
}
self.db.update_single_data("企业数据", "评价记录", rid, update_data)
else:
raise ReturnConditionCheckFailed("模型打分失败", 200)
def gen_report():
"""报告生成"""
url = "http://api.fecribd.com/tfse_rating/report/generate_esg_report"
token = "X0gSlC!YE8jmr2jJr&ilcjS83j!tsoh5"
headers = {'token': token, "Content-Type": "application/json;charset=UTF-8"}
data = {
"company": rating_input['企业名称'],
"cid": rating_input['企业ID'],
"rid": rating_input['评价ID']
}
res = requests.post(url, headers=headers, data=json.dumps(data))
if res.status_code == 200:
update_data = {
"报告fid": json.loads(res.text)['result']['FileID']
}
self.db.update_single_data("企业数据", "评价记录", rid, update_data)
self.update_process_status(rid['评价ID'], {"评价流程.文档生成.ESG报告": 1})
else:
self.update_process_status(rid['评价ID'], {"评价流程.文档生成.ESG报告": -1})
raise ReturnConditionCheckFailed("报告生成失败", 200)
def gen_certified():
"""证书生成"""
url = "http://api.fecribd.com/tfse_rating/certificate/generate_certificate"
token = "X0gSlC!YE8jmr2jJr&ilcjS83j!tsoh5"
headers = {'token': token, "Content-Type": "application/json;charset=UTF-8"}
data = {
"company": rating_input['企业名称'],
"cid": rating_input['企业ID'],
"rid": rating_input['评价ID'],
"type": "esg"
}
res = requests.post(url, headers=headers, data=json.dumps(data))
if res.status_code == 200:
update_data = {
"证书fid": json.loads(res.text)['result']['FileID']
}
self.db.update_single_data("企业数据", "评价记录", rid, update_data)
self.update_process_status(rid['评价ID'], {"评价流程.文档生成.ESG证书": 1})
else:
self.update_process_status(rid['评价ID'], {"评价流程.文档生成.ESG证书": -1})
raise ReturnConditionCheckFailed("证书生成失败", 200)
def esg_index():
url = "http://api.fecribd.com/etl_tfse/company/esg_index_etl"
token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4"
headers = {'token': token, "Content-Type": "application/json;charset=UTF-8"}
data = {
"rid": rating_input['评价ID']
}
res = requests.post(url, headers=headers, data=json.dumps(data))
if res.status_code == 200:
self.update_process_status(rid['评价ID'], {"评价流程.数据清洗.ESG评价分析": 1})
else:
self.update_process_status(rid['评价ID'], {"评价流程.数据清洗.ESG评价分析": -1})
raise ReturnConditionCheckFailed("主页ESG数据生成失败", 200)
def update_rating_records():
"""更新完成时间和进行状态"""
finish_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
finish_date = time.strftime("%Y-%m-%d", time.localtime())
update_data = {"进行状态": "完成", "评价完成时间": finish_time, "评价完成日期": finish_date}
self.db.update_single_data('企业数据', '评价记录', rid, update_data)
def __main__():
esg_model()
gen_report()
gen_certified()
esg_index()
update_rating_records()
__main__()
return '执行成功', 200