import json import time import random import requests from DBHelper.MongoHelper import MongoHelper from Objects.Common.Rating.FECRRating import RatingRecord from Objects.InProject.Rating.esg import EnvironmentQuestionnaire, ThreeYearDataByE, OtherTypeDataByE, \ ThreeYearDataByS, SocietyQuestionnaire, OtherTypeDataByS, ThreeYearDataByG, GovernanceQuestionnaire, \ OtherTypeDataByG, EsgInputData from Objects.InProject.Rating.process import EsgNewProcess, EsgProcess from Utils.ErrorUtil import ReturnConditionCheckFailed class CreditUtils: """综合信用评价相关方法""" db = MongoHelper('tfse_v0.21') @staticmethod def get_current_year(): """获取当前年度""" current_year = time.strftime("%Y", time.localtime()) + "年" return current_year @staticmethod def gen_new_rid(): """生成新的评价ID,如果该ID存在,则重新生成""" db = MongoHelper('tfse_v0.21') def make_id(num): """ 随机生成字符串 """ choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789' salt = '' for i in range(num): salt += random.choice(choices) return salt # 生成新ID new_id = make_id(8) # 检查新ID是否存在,如果存在则继续生成新ID case = db.find_all_data_with_count( '企业数据', '评价记录', {'评价ID': new_id} ) while case > 0: new_id = make_id(8) # 新ID可使用,返回新ID return new_id @staticmethod def make_3_report_dates(): """ 生成最近三年报告期 日期 Parameters: - Returns: report_dates 三年报告期 """ current_year = int(time.strftime("%Y", time.localtime())) - 1 date_01 = str(current_year) + '-12-31' date_02 = str(current_year - 1) + '-12-31' date_03 = str(current_year - 2) + '-12-31' report_dates = [date_01, date_02, date_03] return report_dates @staticmethod def make_3_report_years(): """ 生成最近三年报告期 Parameters: - Returns: report_dates 三年报告期 """ current_year = int(time.strftime("%Y", time.localtime())) - 1 date_01 = str(current_year) + '年' date_02 = str(current_year - 1) + '年' date_03 = str(current_year - 2) + '年' report_dates = [date_01, date_02, date_03] return report_dates @staticmethod def update_process_status(rid, update_data): """ 调用更新流程接口 Parameters: update_data dict 需要更新的数据 Returns: result str 更新结果 """ url = "http://api.fecribd.com/etl_tfse/company/update_process_data" token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4" headers = {'token': token, 'Content-Type': 'application/json; charset=UTF-8'} data = json.dumps({"rid": rid, "types": "ESG评价流程", "renew_data": update_data}) requests.post(url, headers=headers, data=data) class EsgRating(EsgInputData, CreditUtils): """企业ESG评价""" db = MongoHelper('tfse_v0.21') def start_esg_rating_obj(self): """开始ESG评价""" def check_input_data(): """ input_data: 已存在的填报数据 根据企业ID+当前年度,查询是否存在填报数据 若存在填报数据,则返回填报数据 若不存在填报数据,则返回False """ input_data = self.db.find_single_data( "ESG评价数据", "填报数据", {"企业ID": self.cid, "评价年度": self.get_current_year()}, ["评价ID", "企业名称", "评价年度", "所属行业", "治理问卷", "环境问卷", "社会问卷", "公司当年收入(万元)"] ) return input_data if input_data else False def case_input_exist(param): """ param: dict 填报数据 根据评价ID查询评价进行状态 若评价已完成,返回‘本年度已评价’的提示,且返回空对象 若评价进行中,返回‘继续填报’的提示,且返回填报数据 """ # rating_status: 评价服务状态 rating_status = self.db.find_single_data( "企业数据", "评价记录", {"评价ID": param["评价ID"]}, ["进行状态"] ) # 把不需要返回的字段删掉 param.pop('评价ID') # 方法返回结果 return ("本年度已评价", {}) if rating_status['进行状态'] == "完成" else ("继续填报", param) def case_input_not_exist(): """不存在填报数据""" # new_rid: 新生成填报ID new_rid = self.gen_new_rid() # report_dates: 生成报告期 report_dates = self.make_3_report_years() # company_name: 企业名称 company_name = self.db.find_single_data( "应用端", "企业用户", {"企业ID": self.cid}, ["企业名称"] ) def prepare_input_template(): """ 根据模板生成新的填报数据 input_template: 填报数据模板 """ def environment(): """环境问卷""" def three_year_data(i): """近三年数据""" e = ThreeYearDataByE() e.year = report_dates[i] e.raw_coal = None e.coke = None e.crude = None e.fuel_oil = None e.gasoline = None e.kerosene = None e.diesel_fuel = None e.liquefied_petroleum_gas = None e.natural_gas = None e.gas = None e.electricity = None e.water = None e.green_income = None return e.dict_to_save() environment_questionnaire = EnvironmentQuestionnaire() environment_questionnaire.e_three_year_data = list() for num in range(3): environment_questionnaire.e_three_year_data.append(three_year_data(num)) environment_questionnaire.e_other_type_data = list() return environment_questionnaire def society(): """社会问卷""" def three_year_data(i): s = ThreeYearDataByS() s.year = report_dates[i] s.employee_total = None s.leaving_total = None s.salary_level = None s.work_length = None s.labor_dispute = None s.security_incident = None s.training_times = None s.supplier = None s.sales = None s.return_sales = None s.donate = None return s.dict_to_save() society_questionnaire = SocietyQuestionnaire() society_questionnaire.s_three_year_data = list() for num in range(3): society_questionnaire.s_three_year_data.append(three_year_data(num)) society_questionnaire.s_other_type_data = list() return society_questionnaire def governance(): """治理问卷""" def three_year_data(i): g = ThreeYearDataByG() g.year = report_dates[i] g.audit_report = None g.net_assets = None g.net_profit = None return g.dict_to_save() governance_questionnaire = GovernanceQuestionnaire() governance_questionnaire.g_three_year_data = list() for num in range(3): governance_questionnaire.g_three_year_data.append(three_year_data(num)) governance_questionnaire.g_other_type_data = list() return governance_questionnaire self.rid = new_rid self.cid = self.cid self.company = company_name['企业名称'] self.year = self.get_current_year() self.industry = list() self.current_year_income = dict() self.current_year_income[report_dates[0]] = None self.current_year_income[report_dates[1]] = None self.current_year_income[report_dates[2]] = None self.environment = environment() self.society = society() self.governance = governance() input_template = self.dict_to_save() # 插入新的填报记录 self.db.insert_single_data( 'ESG评价数据', '填报数据', input_template ) def prepare_rating_record_template(): """根据模板生成新的评价记录""" rating_record = RatingRecord() rating_record.rid = new_rid rating_record.cid = self.cid rating_record.company = company_name['企业名称'] rating_record.project = 'ESG评价' rating_record.methods = '企业申报' rating_record.status = '进行' rating_record.result = None rating_record.report_fid = None rating_record.certificate_fid = None rating_record.begin_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) rating_record.submit_time = None rating_record.finish_time = None rating_record.finish_date = None rating_record_template = rating_record.dict_to_save() self.db.insert_single_data( '企业数据', '评价记录', rating_record_template ) def prepare_process_template(): """根据数据模板生成新的评价流程数据""" credit_process = EsgProcess() preparation = credit_process.EsgDataPreparation() preparation.fill_in_data = 0 preparation.risk_data = 0 credit_process.data_preparation = preparation score = credit_process.EsgModelScoring() score.rating_result = 0 credit_process.model_scoring = score document = credit_process.EsgDocumentGeneration() document.report_data = 0 document.report = 0 document.certificate = 0 credit_process.document_generation = document clean = credit_process.EsgDataCleaning() clean.esg_evaluation_analysis = 0 credit_process.data_cleaning = clean process_template = dict() process_template['评价ID'] = new_rid process_template['企业ID'] = self.cid process_template['评价流程'] = credit_process.dict_to_save() self.db.insert_single_data( 'ESG评价数据', '评价流程', process_template ) def prepare_risk_data(): # 调用清洗数据接口,执行风险数据清洗程序 url = "http://api.fecribd.com/etl_tfse/rating/esg_risk_data" token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4" headers = {'token': token, 'Content-Type': 'application/json; charset=UTF-8'} data = json.dumps({"company_id": self.cid, "company_name": company_name, "evaluation_id": new_rid}) requests.post(url, headers=headers, data=data) # 风险数据清洗完成后更新流程状态 self.update_process_status(new_rid, {"评价流程.数据准备.风险数据": 1}) def __main__(): prepare_input_template() prepare_rating_record_template() prepare_process_template() prepare_risk_data() __main__() return "开始填报", check_input_data() def execute_func(): """判断是否有填报记录""" input_data = check_input_data() if input_data is not False: return case_input_exist(input_data) else: return case_input_not_exist() return execute_func() def save_input_general(self, input_form): """保存填报数据""" # 根据cid查询评价记录得到rid find = {"企业ID": self.cid, "评价项目": "ESG评价", "进行状态": "进行"} rid = self.db.find_single_data("企业数据", "评价记录", find, ['评价ID']) if not rid: return "没有正在进行的评价", 200 def handle_input_form(): """处理表单数据""" def environment(): """环境问卷""" three_year_data = input_form['环境问卷']['近三年公司数据'] for item in three_year_data: for k, v in item.items(): if isinstance(v, float) or isinstance(v, int) or item == 0: item[k] = float(v) def society(): """社会问卷""" three_year_data = input_form['社会问卷']['近三年公司数据'] filter_list = ['员工总数(年底)', '当年离职人数', '劳资纠纷次数', '安全事故发生次数', '组织培训次数'] for item in three_year_data: for k, v in item.items(): if k not in filter_list and (isinstance(v, float) or isinstance(v, int) or item == 0): item[k] = float(v) def governance(): """治理问卷""" three_year_data = input_form['治理问卷']['近三年公司数据'] for item in three_year_data: for k, v in item.items(): if isinstance(v, float) or isinstance(v, int) or item == 0: item[k] = float(v) environment() society() governance() def environment_script(): """环境问卷校验""" questionnaire = input_form['环境问卷'] def three_year_data_script(): """近三年公司数据""" three_year_data = ThreeYearDataByE() for data in questionnaire['近三年公司数据']: three_year_data.year = data['年份'] three_year_data.raw_coal = data['公司当年消耗的原煤(千克)'] three_year_data.coke = data['公司当年消耗的焦炭(千克)'] three_year_data.crude = data['公司当年消耗的原油(升)'] three_year_data.fuel_oil = data['公司当年消耗的燃料油(升)'] three_year_data.gasoline = data['公司当年消耗的汽油(升)'] three_year_data.kerosene = data['公司当年消耗的煤油(升)'] three_year_data.diesel_fuel = data['公司当年消耗的柴油(升)'] three_year_data.liquefied_petroleum_gas = data['公司当年消耗的液化石油气(千克)'] three_year_data.natural_gas = data['公司当年消耗的天然气(立方米)'] three_year_data.gas = data['公司当年消耗的煤气(立方米)'] three_year_data.electricity = data['公司当年消耗的电(千瓦时)'] three_year_data.water = data['公司当年耗水(吨)'] three_year_data.green_income = data['公司当年绿色业务收入(万元)——包括不限于清洁能源、清洁交通、绿色建筑、清洁技术等'] eq.e_three_year_data.append(three_year_data.dict_to_save()) def other_type_data_script(): """其他类型问卷""" if questionnaire['其他类型问卷']: other_type_data = OtherTypeDataByE() other_type_data.question_01 = questionnaire['其他类型问卷'][0][0] other_type_data.question_02 = questionnaire['其他类型问卷'][1][0] other_type_data.question_03 = questionnaire['其他类型问卷'][2][0] other_type_data.question_04 = questionnaire['其他类型问卷'][3][0] other_type_data.question_05 = questionnaire['其他类型问卷'][4][0] other_type_data.question_06 = questionnaire['其他类型问卷'][5][0] other_type_data.question_07 = questionnaire['其他类型问卷'][6][0] other_type_data.question_08 = questionnaire['其他类型问卷'][7][0] other_type_data.list_to_save() return questionnaire['其他类型问卷'] eq = EnvironmentQuestionnaire() eq.e_three_year_data = list() three_year_data_script() other_type_data_script() return eq def society_script(): """社会问卷校验""" questionnaire = input_form['社会问卷'] def three_year_data_script(): """近三年公司数据""" three_year_data = ThreeYearDataByS() for data in questionnaire['近三年公司数据']: three_year_data.year = data['年份'] three_year_data.employee_total = data['员工总数(年底)'] three_year_data.leaving_total = data['当年离职人数'] three_year_data.salary_level = data['人均薪酬水平(元/月)'] three_year_data.work_length = data['劳动合同要求工作时长(每周小时数)'] three_year_data.labor_dispute = data['劳资纠纷次数'] three_year_data.security_incident = data['安全事故发生次数'] three_year_data.training_times = data['组织培训次数'] three_year_data.supplier = data['公司从前3大供应商拿货占全部供应商比例(%)'] three_year_data.sales = data['公司前3大客户销售额占全部销售比例(%)'] three_year_data.return_sales = data['返修、退回、投诉产品对应销售额占全部销售比例(%)'] three_year_data.donate = data['扶贫+捐赠规模(万元)'] sq.s_three_year_data.append(three_year_data.dict_to_save()) def other_type_data_script(): """其他类型问卷""" if questionnaire['其他类型问卷']: other_type_data = OtherTypeDataByS() other_type_data.question_01 = questionnaire['其他类型问卷'][0] other_type_data.question_02 = questionnaire['其他类型问卷'][1] other_type_data.question_03 = questionnaire['其他类型问卷'][2] other_type_data.question_04 = questionnaire['其他类型问卷'][3] other_type_data.list_to_save() return questionnaire['其他类型问卷'] sq = SocietyQuestionnaire() sq.s_three_year_data = list() three_year_data_script() sq.s_other_type_data = other_type_data_script() return sq def governance_script(): """治理问卷校验""" questionnaire = input_form['治理问卷'] def three_year_data_script(): """近三年公司数据""" three_year_data = ThreeYearDataByG() for data in questionnaire['近三年公司数据']: three_year_data.year = data['年份'] three_year_data.audit_report = data['公司是否有审计报告'] three_year_data.net_assets = data['公司净资产(万元)'] three_year_data.net_profit = data['公司净利润(万元)'] gq.g_three_year_data.append(three_year_data.dict_to_save()) def other_type_data_script(): """其他类型问卷""" if questionnaire['其他类型问卷']: other_type_data = OtherTypeDataByG() other_type_data.question_01 = questionnaire['其他类型问卷'][0] other_type_data.question_02 = questionnaire['其他类型问卷'][1] other_type_data.question_03 = int(questionnaire['其他类型问卷'][2]) other_type_data.question_04 = int(questionnaire['其他类型问卷'][3]) other_type_data.question_05 = questionnaire['其他类型问卷'][4] other_type_data.question_06 = questionnaire['其他类型问卷'][5] other_type_data.question_07 = questionnaire['其他类型问卷'][6] other_type_data.question_08 = questionnaire['其他类型问卷'][7] other_type_data.list_to_save() return questionnaire['其他类型问卷'] gq = GovernanceQuestionnaire() gq.g_three_year_data = list() three_year_data_script() gq.g_other_type_data = other_type_data_script() return gq def rating_input(): """校验填报数据并保存""" self.rid = rid['评价ID'] self.company = input_form['企业名称'] self.year = input_form['评价年度'] self.industry = input_form['所属行业'] self.current_year_income = input_form['公司当年收入(万元)'] self.environment = environment_script() self.society = society_script() self.governance = governance_script() input_data = self.dict_to_save() input_data['环境问卷']['其他类型问卷'] = input_form['环境问卷']['其他类型问卷'] self.db.update_single_data( 'ESG评价数据', '填报数据', rid, input_data ) self.update_process_status(rid['评价ID'], {"评价流程.数据准备.填报数据": 1}) def __main__(): handle_input_form() rating_input() __main__() def exec_esg_rating(self): """提交数据,开始esg评价""" # 根据cid查询评价记录得到rid find = {"企业ID": self.cid, "评价项目": "ESG评价", "进行状态": "进行"} rid = self.db.find_single_data("企业数据", "评价记录", find, ['评价ID']) if not rid: return "没有正在进行的评价", 200 self.db.update_single_data( '企业数据', '评价记录', rid, {"提交填报时间": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}) def get_rating_data(): """获取打分需要的数据""" input_data = self.db.find_single_data( 'ESG评价数据', '填报数据', rid, ["评价ID", "企业ID", "企业名称", "评价年度", "所属行业", "公司当年收入(万元)", "环境问卷", "社会问卷", "治理问卷"] ) risk_data = self.db.find_single_data( 'ESG评价数据', '风险数据', rid, ["严重违法", "失信人", "法律诉讼", "税收违法", "经营异常", "行政处罚", "被执行人"] ) return {**input_data, **{"风险数据": risk_data}} rating_input = get_rating_data() def esg_model(): """模型打分""" url = "http://api.fecribd.com/tfse_rating/esg/esg_model" token = "X0gSlC!YE8jmr2jJr&ilcjS83j!tsoh5" headers = {'token': token, "Content-Type": "application/json;charset=UTF-8"} data = json.dumps({"input_data": rating_input}) res = requests.post(url, headers=headers, data=data) if res.status_code == 200: update_data = { "评价结果": json.loads(res.text)['result'], } self.db.update_single_data("企业数据", "评价记录", rid, update_data) else: raise ReturnConditionCheckFailed("模型打分失败", 200) def gen_report(): """报告生成""" url = "http://api.fecribd.com/tfse_rating/report/generate_esg_report" token = "X0gSlC!YE8jmr2jJr&ilcjS83j!tsoh5" headers = {'token': token, "Content-Type": "application/json;charset=UTF-8"} data = { "company": rating_input['企业名称'], "cid": rating_input['企业ID'], "rid": rating_input['评价ID'] } res = requests.post(url, headers=headers, data=json.dumps(data)) if res.status_code == 200: update_data = { "报告fid": json.loads(res.text)['result']['FileID'] } self.db.update_single_data("企业数据", "评价记录", rid, update_data) self.update_process_status(rid['评价ID'], {"评价流程.文档生成.ESG报告": 1}) else: self.update_process_status(rid['评价ID'], {"评价流程.文档生成.ESG报告": -1}) raise ReturnConditionCheckFailed("报告生成失败", 200) def gen_certified(): """证书生成""" url = "http://api.fecribd.com/tfse_rating/certificate/generate_certificate" token = "X0gSlC!YE8jmr2jJr&ilcjS83j!tsoh5" headers = {'token': token, "Content-Type": "application/json;charset=UTF-8"} data = { "company": rating_input['企业名称'], "cid": rating_input['企业ID'], "rid": rating_input['评价ID'], "type": "esg" } res = requests.post(url, headers=headers, data=json.dumps(data)) if res.status_code == 200: update_data = { "证书fid": json.loads(res.text)['result']['FileID'] } self.db.update_single_data("企业数据", "评价记录", rid, update_data) self.update_process_status(rid['评价ID'], {"评价流程.文档生成.ESG证书": 1}) else: self.update_process_status(rid['评价ID'], {"评价流程.文档生成.ESG证书": -1}) raise ReturnConditionCheckFailed("证书生成失败", 200) def esg_index(): url = "http://api.fecribd.com/etl_tfse/company/esg_index_etl" token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4" headers = {'token': token, "Content-Type": "application/json;charset=UTF-8"} data = { "rid": rating_input['评价ID'] } res = requests.post(url, headers=headers, data=json.dumps(data)) if res.status_code == 200: self.update_process_status(rid['评价ID'], {"评价流程.数据清洗.ESG评价分析": 1}) else: self.update_process_status(rid['评价ID'], {"评价流程.数据清洗.ESG评价分析": -1}) raise ReturnConditionCheckFailed("主页ESG数据生成失败", 200) def update_rating_records(): """更新完成时间和进行状态""" finish_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) finish_date = time.strftime("%Y-%m-%d", time.localtime()) update_data = {"进行状态": "完成", "评价完成时间": finish_time, "评价完成日期": finish_date} self.db.update_single_data('企业数据', '评价记录', rid, update_data) def __main__(): esg_model() gen_report() gen_certified() esg_index() update_rating_records() __main__() return '执行成功', 200