Merge branch 'master' into v0.21

# Conflicts:
#	CompanyUser/CompanyUserImpl.py
This commit is contained in:
王思川 2022-04-27 15:05:43 +08:00
commit 4b96e4bee5
10 changed files with 744 additions and 51 deletions

View File

@ -0,0 +1,565 @@
import json
import time
import random
import base64
import requests
from urllib.request import Request, urlopen
from werkzeug.security import check_password_hash
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from DBHelper.MongoHelper import MongoHelper
from Utils.ErrorUtil import ReturnConditionCheckFailed, LogConditionCheckFailed
from CompanyUser.CompanyUserObj import CompanyUser, VerifyInfo, EmailVerifyCodeRecord, VerifyMaterial, \
CompanyVerifyThreeFactors
from Utils.ValidateUtil import ValidateAttr
class CompanyUserImpl(CompanyUser):
"""企业用户实现类"""
db = MongoHelper("tfse_v0.21")
def check_email_registered(self):
"""检查邮箱已被注册"""
email = self.db.find_single_column(
"应用端",
"企业用户",
{"邮箱": self.email},
"邮箱"
)
if email is not None:
raise ReturnConditionCheckFailed("邮箱已被注册", 200)
def check_email_not_registered(self):
"""检查邮箱未被注册"""
email = self.db.find_single_column(
"应用端",
"企业用户",
{"邮箱": self.email},
"邮箱"
)
if email is None:
raise ReturnConditionCheckFailed("邮箱未被注册", 200)
def check_vcode_correct(self, code):
"""检查邮箱验证码是否正确"""
verify_data = self.db.find_single_data(
"应用端",
"邮箱验证码记录",
{"邮箱": self.email},
["验证码", "验证有效期"]
)
if not verify_data:
raise ReturnConditionCheckFailed("验证码错误", 200)
if verify_data['验证码'] != code:
raise ReturnConditionCheckFailed("验证码错误", 200)
if time.time() - verify_data['验证有效期'] > 300:
raise ReturnConditionCheckFailed('验证码过期', 200)
def login(self):
"""登录"""
company_user = self.db.find_single_data(
"应用端",
"企业用户",
{"邮箱": self.email},
["企业ID", "企业名称", "邮箱", "密码", "已认证"]
)
def check_user_existed():
"""检查用户是否存在"""
if company_user is None:
raise ReturnConditionCheckFailed("不存在该用户", 200)
def check_password_correct():
"""检查密码是否正确"""
if not check_password_hash(company_user['密码'], self.pwd):
raise ReturnConditionCheckFailed("登录密码错误", 200)
def create_token(param):
"""
创建token
param: 传入参数用于创建token
return: 用户token
"""
secret_key = '0FTuOi^#Afx1@2@F'
token_expire = 14400
s = Serializer(secret_key, expires_in=token_expire)
token = '' + s.dumps(param).decode('ascii')
return token
def __main__():
""""""
check_user_existed()
check_password_correct()
self.cid = company_user['企业ID']
self.name = company_user['企业名称']
self.verify_status = company_user['已认证']
return_body = self.dict_to_show(columns=["企业名称", "认证状态", "token"])
return_body['token'] = create_token({"cid": self.cid})
return return_body
return __main__()
def register(self, code):
"""注册"""
def make_new_cid():
"""生成新的企业ID如果该ID存在则重新生成"""
def random_cid(num):
"""随机企业ID"""
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
salt = ''
for i in range(num):
salt += random.choice(choices)
return salt
new_cid = random_cid(8)
case = self.db.find_single_column(
"应用端",
"企业用户",
{"企业ID": new_cid},
"企业ID"
) is not None
while case:
new_cid = random_cid(8)
return new_cid
def __main__():
""""""
self.check_email_registered()
self.check_vcode_correct(code)
self.cid = make_new_cid()
self.avatar_id = "623152edf36000004f00124e"
self.verify_status = ""
self.verify_info = VerifyInfo()
self.db.insert_single_data(
"应用端",
"企业用户",
self.dict_to_save()
)
self.db.delete_single_data(
"应用端",
"邮箱验证码记录",
{"邮箱": self.email}
)
__main__()
def get_company_user_info(self):
"""获取企业用户信息"""
company_user_data = self.db.find_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
["企业ID", "企业名称", "邮箱", "已认证"]
)
self.name = company_user_data['企业名称']
self.email = company_user_data['邮箱']
self.verify_status = company_user_data['已认证']
return self.dict_to_show(columns=["企业ID", "企业名称", "邮箱", "认证状态"])
def get_avatar(self):
"""获取用户头像"""
self.avatar_id = self.db.find_single_column(
"应用端",
"企业用户",
{"企业ID": self.cid},
"头像fid"
)
avatar = self.db.find_file(
"文件",
"企业用户头像",
self.avatar_id
)
return avatar
def change_avatar(self, avatar_option):
"""修改企业用户头像"""
avatars = {
"avatar_01": "623152edf36000004f00124e",
"avatar_02": "623152edf36000004f001250",
"avatar_03": "623152edf36000004f00124f",
"avatar_04": "623152edf36000004f001251",
"avatar_05": "623152edf36000004f001252",
"avatar_06": "623152edf36000004f001253"
}
self.avatar_id = avatars[avatar_option]
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
{"头像fid": self.avatar_id}
)
def change_password(self, code):
"""修改登录密码"""
self.check_email_not_registered()
self.check_vcode_correct(code)
self.db.update_single_data(
"应用端",
"企业用户",
{"邮箱": self.email},
self.dict_to_save(columns=["密码"])
)
def change_email(self, code):
"""更换邮箱"""
self.check_vcode_correct(code)
self.check_email_registered()
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
self.dict_to_save(columns=["邮箱"])
)
class VerifyMaterialImpl(VerifyMaterial):
"""认证材料实现类"""
db = MongoHelper("tfse_v0.21")
@staticmethod
def read_img_by_base64(img_file):
"""读取base64编码格式的图片"""
data = img_file.read()
try:
encode_str = str(base64.b64encode(data), 'utf-8')
except TypeError:
encode_str = base64.b64encode(data)
return encode_str
def check_is_verified(self):
"""检查企业是否已认证"""
self.verify_status = self.db.find_single_column(
"应用端",
"企业用户",
{"企业ID": self.cid},
"已认证"
)
if self.verify_status == "":
raise ReturnConditionCheckFailed("企业已认证", 200)
def upload_id_card(self):
"""上传身份证"""
def trans_date_format(_time):
"""将解析出来的时间数据加上年月日"""
year = _time[:4]
month = _time[4:6]
day = _time[6:8]
return year + '.' + month + '.' + day
def id_card_recognition(side):
"""调用身份证图片识别接口"""
url = 'http://dm-51.data.aliyun.com/rest/160601/ocr/ocr_idcard.json'
APPCODE = "6d6c3146677c40bf9e3e1e2aa73404cd"
# 请求头
headers = {
'Authorization': 'APPCODE %s' % APPCODE,
'Content-Type': 'application/json; charset=UTF-8'
}
# 请求体
body = {
"configure": {'side': side},
"image": self.read_img_by_base64(self.image)
}
try:
params = json.dumps(body).encode(encoding='UTF8')
req = Request(url, params, headers)
res = urlopen(req)
html = res.read()
result = json.loads(html.decode("utf8"))
if result['is_fake']:
raise ReturnConditionCheckFailed("伪造身份证", 200)
id_card = dict()
if side == 'face':
id_card['姓名'] = result['name']
id_card['性别'] = result['sex']
id_card['民族'] = result['nationality']
id_card['出生'] = "{}{}{}".format(result['birth'][:4], result['birth'][4:6], result['birth'][6:])
id_card['住址'] = result['address']
id_card['身份证号码'] = result['num']
else:
id_card['签发机关'] = result['issue']
id_card['有效期限'] = '{}-{}'.format(trans_date_format(result['start_date']), trans_date_format(result['end_date']))
return id_card
except Exception:
raise ReturnConditionCheckFailed("身份证识别失败", 200)
def id_card_recognition_result():
"""
身份证文本识别
cid: 企业ID
image: 图片
return: 识别结果
"""
# 身份证正面文本识别
result = id_card_recognition(side='face')
# 重置image读取位置不然下一步读不出图片
self.image.seek(0)
# 更新企业认证中身份证的信息
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
{"认证信息.法人姓名": result['姓名'], "认证信息.法人身份证": result['身份证号码']}
)
return {"姓名": result['姓名'], "身份证号码": result['身份证号码']}
def __main__():
""""""
self.check_is_verified()
return id_card_recognition_result()
return __main__()
def upload_business_license(self):
"""上传营业执照"""
def trans_date_format(_time):
"""将解析出来的时间数据加上年月日"""
year = _time[:4]
month = _time[4:6]
day = _time[6:8]
return year + '' + month + '' + day + ''
def business_license_recognition():
"""调用营业执照识别接口"""
url = 'https://dm-58.data.aliyun.com/rest/160601/ocr/ocr_business_license.json'
app_code = "6d6c3146677c40bf9e3e1e2aa73404cd"
headers = {
'Authorization': 'APPCODE %s' % app_code,
'Content-Type': 'application/json; charset=UTF-8'
}
body = {
"image": self.read_img_by_base64(self.image)
}
response = requests.post(url=url, data=json.dumps(body), headers=headers)
try:
res = json.loads(response.text)
data = dict()
data['统一社会信用代码'] = res['reg_num']
data['名称'] = res['name']
data['注册资本'] = res['capital']
data['类型'] = res['type']
data['成立日期'] = trans_date_format(res['establish_date'])
data['法定代表人'] = res['person']
data['经营范围'] = res['business']
data['住所'] = res['address']
return data
except Exception:
raise ReturnConditionCheckFailed("营业执照识别异常", 200)
def business_license_recognition_result():
"""营业执照识别结果"""
# 识别营业执照
result = business_license_recognition()
# 图片识别正确 更新企业认证中营业执照的信息
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
{"认证信息.企业名称": result['名称'], "认证信息.统一社会信用代码": result['统一社会信用代码']}
)
return {"公司名称": result['名称'], "统一社会信用代码": result['统一社会信用代码']}
def _main__():
""""""
self.check_is_verified()
return business_license_recognition_result()
return _main__()
class CompanyVerifyThreeFactorsImpl(CompanyVerifyThreeFactors):
"""企业认证三要素实现类"""
db = MongoHelper("tfse_v0.21")
def company_verify(self):
"""企业认证"""
def get_verify_info():
"""获取企业认证信息"""
record = self.db.find_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
["认证信息.企业名称", "认证信息.法人姓名", "认证信息.统一社会信用代码"]
)
if not record:
raise ReturnConditionCheckFailed("企业认证信息异常", 200)
verify_info = record['认证信息']
self.code = verify_info['统一社会信用代码']
self.name = verify_info['企业名称']
self.legal_person = verify_info['法人姓名']
def check_verify_info_complete():
"""检查认证信息是否完整"""
if None in [self.code, self.name, self.legal_person]:
raise ReturnConditionCheckFailed("企业认证信息异常", 200)
def check_three_elements_by_tyc():
"""调用天眼查企业三要素验证接口"""
token = "5407573b-e41e-4ab4-9f46-daa0032ad0a4"
base = "http://open.api.tianyancha.com/services/open/ic/verify/2.0?code={}&name={}&legalPersonName={}"
url = base.format(self.code, self.name, self.legal_person)
headers = {'Authorization': token}
res = json.loads(requests.get(url, headers=headers).text)
try:
if res['result']['result'] != 1:
raise ReturnConditionCheckFailed("企业认证未通过", 200)
except Exception:
raise ReturnConditionCheckFailed("认证接口异常", 200)
def init_company_data():
"""初始化企业主页数据"""
init_url = "http://api.fecribd.com/etl_tfse/company/init_company_data"
init_token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4"
init_headers = {'token': init_token, "Content-Type": "application/json;charset=UTF-8"}
data = json.dumps({"cid": self.cid, "company_name": self.name})
response = requests.post(init_url, headers=init_headers, data=data)
res = json.loads(response.text)
if res['info'] == "企业数据初始化成功":
raise ReturnConditionCheckFailed("企业数据初始化异常", 200)
else:
company_user = CompanyUser()
company_user.cid = self.cid
company_user.name = self.name
company_user.verify_status = ""
self.db.upsert_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
company_user.dict_to_save(columns=["企业名称", "已认证"])
)
def __main__():
get_verify_info()
check_verify_info_complete()
check_three_elements_by_tyc()
init_company_data()
return __main__()
class EmailVerifyCodeRecordImpl(EmailVerifyCodeRecord):
"""用户通知邮件实现类"""
db = MongoHelper("tfse_v0.21")
def make_verify_code(self):
"""生成邮箱验证码"""
def make_verify_code():
"""随机生成6位数字验证码"""
num_code = ""
for i in range(6):
ch = chr(random.randrange(ord('0'), ord('9') + 1))
num_code += ch
return num_code
self.v_code = make_verify_code()
self.v_period = time.time() + 300
self.db.upsert_single_data(
"应用端",
"邮箱验证码记录",
{"邮箱": self.email},
self.dict_to_save()
)
def transfer_email_api(self):
"""发送邮件"""
# 邮箱服务接口
email_api = 'http://116.63.130.34:30001'
v_types = {
"register": "注册账号",
"resetpwd": "修改密码",
"resetemail": "修改邮箱"
}
headers = {"Content-Type": "application/json;charset=UTF-8"}
data = {
"title": "【远东资信】{}".format(v_types[self.v_type]),
"sender": 'fecribd@fecr.com.cn',
"recipients": [self.email],
"msg_body": "{}的验证码为 【{}5分钟内有效。".format(v_types[self.v_type], self.v_code)
}
response = requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data))
return response.text
def check_email_registered(self):
"""检查邮箱已被注册"""
email = self.db.find_single_column(
"应用端",
"企业用户",
{"邮箱": self.email},
"邮箱"
)
if email is not None:
raise ReturnConditionCheckFailed("邮箱已被注册", 200)
def check_email_not_registered(self):
"""检查邮箱未被注册"""
email = self.db.find_single_column(
"应用端",
"企业用户",
{"邮箱": self.email},
"邮箱"
)
if email is None:
raise ReturnConditionCheckFailed("邮箱未被注册", 200)
def send_email(self):
""""""
if self.v_type == 'register' or self.v_type == 'resetemail':
self.check_email_registered()
self.make_verify_code()
self.transfer_email_api()
elif self.v_type == 'resetpwd':
self.check_email_not_registered()
self.make_verify_code()
self.transfer_email_api()

View File

@ -8,7 +8,7 @@ from Rating.Credit.CreditObj import RatingRecord, CreditInputData, FinancialRepo
IncomeSheet, AppendixDataSheet
from Rating.Process.ProcessObj import CreditProcess
from Rating.Questionnaire.QuestionObj import SingleChoiceQuestion, MultipleChoiceQuestion, FillInTheBlank
from Utils.ErrorUtil import LogConditionCheckFailed
from Utils.ErrorUtil import ReturnConditionCheckFailed, ReturnConditionCheckFailed
class CreditUtils:
@ -525,7 +525,7 @@ class CreditRating(CreditInputData, CreditUtils):
self.update_process_status(rid['评价ID'], {"评价流程.模型打分": 1})
else:
self.update_process_status(rid['评价ID'], {"评价流程.模型打分": -1})
raise LogConditionCheckFailed("模型"
raise ReturnConditionCheckFailed("模型"
"打分失败", 200)
def gen_report():
@ -547,7 +547,7 @@ class CreditRating(CreditInputData, CreditUtils):
self.update_process_status(rid['评价ID'], {"评价流程.报告生成": 1})
else:
self.update_process_status(rid['评价ID'], {"评价流程.报告生成": -1})
raise LogConditionCheckFailed("生成报告失败", 200)
raise ReturnConditionCheckFailed("生成报告失败", 200)
def gen_certified():
"""生成证书"""
@ -568,7 +568,7 @@ class CreditRating(CreditInputData, CreditUtils):
self.update_process_status(rid['评价ID'], {"评价流程.证书生成": 1})
else:
self.update_process_status(rid['评价ID'], {"评价流程.证书生成": -1})
raise LogConditionCheckFailed("生成证书失败", 200)
raise ReturnConditionCheckFailed("生成证书失败", 200)
def company_index():
"""生成主页数据"""
@ -585,7 +585,7 @@ class CreditRating(CreditInputData, CreditUtils):
self.update_process_status(rid['评价ID'], {"评价流程.公司风险分析主页": 1})
else:
self.update_process_status(rid['评价ID'], {"评价流程.综合信用评价主页": -1})
raise LogConditionCheckFailed("生成主页数据失败", 200)
raise ReturnConditionCheckFailed("生成主页数据失败", 200)
def update_rating_records():
"""更新完成时间和进行状态"""
@ -602,20 +602,21 @@ class CreditRating(CreditInputData, CreditUtils):
update_rating_records()
__main__()
return '执行成功', 200
def choose_industry(self):
"""选择行业"""
name = self.db.find_single_data('企业数据', '企业数据_更新汇总', {"企业ID": self.cid}, ['一级行业'])['一级行业']
industry = self.db.find_single_data('模型端', '综信评价行业分类', {}, ['行业'])
new_industry = list()
for k, v in industry['行业'].items():
insert_dict = dict()
insert_dict['value'] = k
insert_dict['children'] = list()
for item in v:
_dict = dict()
_dict['value'] = item
insert_dict['children'].append(_dict)
new_industry.append(insert_dict)
insert_dict = dict()
insert_dict['value'] = name
insert_dict['children'] = list()
for item in industry['行业'][name]:
_dict = dict()
_dict['value'] = item
insert_dict['children'].append(_dict)
new_industry.append(insert_dict)
return new_industry
def questionnaire_obj(self):

View File

@ -81,7 +81,7 @@ class IncomeSheet(SpecObject):
"total_profit": "利润总额",
"net_profit": "净利润"
}
class AppendixDataSheet(SpecObject):
"""补充数据表"""
@ -119,6 +119,7 @@ class BusinessQuestionnaire(object):
"""列表保存"""
return self.questionnaire
class CreditInputData(SpecObject):
"""填报数据"""
rid = ValidateAttr(field='rid', type=str, length=8)

View File

@ -80,7 +80,7 @@ def save_cc_rating_input_route(**kwargs):
@verify_token
def submit_general_rating_input(**kwargs):
"""
提交填报数据开始评
提交填报数据开始评
Parameters:
cid 企业ID
Returns:
@ -89,8 +89,8 @@ def submit_general_rating_input(**kwargs):
try:
credit = CreditRating()
credit.cid = kwargs['cid']
credit.exec_cc_rating()
return {"info": "执行成功"}, 200
info, status = credit.exec_cc_rating()
return {"info": info}, status
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.__str__()}, e.status_code
@ -110,6 +110,7 @@ def choose_industry_route(**kwargs):
"""
try:
credit = CreditRating()
credit.cid = kwargs['cid']
result = credit.choose_industry()
return {"info": "查询成功", "result": result}
except KeyError:

View File

@ -8,7 +8,7 @@ from Rating.Esg.EsgObj import RatingRecord, EnvironmentQuestionnaire, ThreeYearD
ThreeYearDataByS, SocietyQuestionnaire, OtherTypeDataByS, ThreeYearDataByG, GovernanceQuestionnaire, \
OtherTypeDataByG, EsgInputData
from Rating.Process.ProcessObj import EsgProcess
from Utils.ErrorUtil import LogConditionCheckFailed
from Utils.ErrorUtil import ReturnConditionCheckFailed
class CreditUtils:
@ -387,16 +387,17 @@ class EsgRating(EsgInputData, CreditUtils):
def other_type_data_script():
"""其他类型问卷"""
other_type_data = OtherTypeDataByE()
other_type_data.question_01 = questionnaire['其他类型问卷'][0][0]
other_type_data.question_02 = questionnaire['其他类型问卷'][1][0]
other_type_data.question_03 = questionnaire['其他类型问卷'][2][0]
other_type_data.question_04 = questionnaire['其他类型问卷'][3][0]
other_type_data.question_05 = questionnaire['其他类型问卷'][4][0]
other_type_data.question_06 = questionnaire['其他类型问卷'][5][0]
other_type_data.question_07 = questionnaire['其他类型问卷'][6][0]
other_type_data.question_08 = questionnaire['其他类型问卷'][7][0]
other_type_data.list_to_save()
if questionnaire['其他类型问卷']:
other_type_data = OtherTypeDataByE()
other_type_data.question_01 = questionnaire['其他类型问卷'][0][0]
other_type_data.question_02 = questionnaire['其他类型问卷'][1][0]
other_type_data.question_03 = questionnaire['其他类型问卷'][2][0]
other_type_data.question_04 = questionnaire['其他类型问卷'][3][0]
other_type_data.question_05 = questionnaire['其他类型问卷'][4][0]
other_type_data.question_06 = questionnaire['其他类型问卷'][5][0]
other_type_data.question_07 = questionnaire['其他类型问卷'][6][0]
other_type_data.question_08 = questionnaire['其他类型问卷'][7][0]
other_type_data.list_to_save()
return questionnaire['其他类型问卷']
eq = EnvironmentQuestionnaire()
@ -429,12 +430,13 @@ class EsgRating(EsgInputData, CreditUtils):
def other_type_data_script():
"""其他类型问卷"""
other_type_data = OtherTypeDataByS()
other_type_data.question_01 = questionnaire['其他类型问卷'][0]
other_type_data.question_02 = questionnaire['其他类型问卷'][1]
other_type_data.question_03 = questionnaire['其他类型问卷'][2]
other_type_data.question_04 = questionnaire['其他类型问卷'][3]
other_type_data.list_to_save()
if questionnaire['其他类型问卷']:
other_type_data = OtherTypeDataByS()
other_type_data.question_01 = questionnaire['其他类型问卷'][0]
other_type_data.question_02 = questionnaire['其他类型问卷'][1]
other_type_data.question_03 = questionnaire['其他类型问卷'][2]
other_type_data.question_04 = questionnaire['其他类型问卷'][3]
other_type_data.list_to_save()
return questionnaire['其他类型问卷']
sq = SocietyQuestionnaire()
@ -459,16 +461,17 @@ class EsgRating(EsgInputData, CreditUtils):
def other_type_data_script():
"""其他类型问卷"""
other_type_data = OtherTypeDataByG()
other_type_data.question_01 = questionnaire['其他类型问卷'][0]
other_type_data.question_02 = questionnaire['其他类型问卷'][1]
other_type_data.question_03 = int(questionnaire['其他类型问卷'][2])
other_type_data.question_04 = int(questionnaire['其他类型问卷'][3])
other_type_data.question_05 = questionnaire['其他类型问卷'][4]
other_type_data.question_06 = questionnaire['其他类型问卷'][5]
other_type_data.question_07 = questionnaire['其他类型问卷'][6]
other_type_data.question_08 = questionnaire['其他类型问卷'][7]
other_type_data.list_to_save()
if questionnaire['其他类型问卷']:
other_type_data = OtherTypeDataByG()
other_type_data.question_01 = questionnaire['其他类型问卷'][0]
other_type_data.question_02 = questionnaire['其他类型问卷'][1]
other_type_data.question_03 = int(questionnaire['其他类型问卷'][2])
other_type_data.question_04 = int(questionnaire['其他类型问卷'][3])
other_type_data.question_05 = questionnaire['其他类型问卷'][4]
other_type_data.question_06 = questionnaire['其他类型问卷'][5]
other_type_data.question_07 = questionnaire['其他类型问卷'][6]
other_type_data.question_08 = questionnaire['其他类型问卷'][7]
other_type_data.list_to_save()
return questionnaire['其他类型问卷']
gq = GovernanceQuestionnaire()
@ -549,7 +552,7 @@ class EsgRating(EsgInputData, CreditUtils):
self.update_process_status(rid['评价ID'], {"评价流程.模型打分": 1})
else:
self.update_process_status(rid['评价ID'], {"评价流程.模型打分": -1})
raise LogConditionCheckFailed("模型打分失败", 200)
raise ReturnConditionCheckFailed("模型打分失败", 200)
def gen_report():
"""报告生成"""
@ -570,7 +573,7 @@ class EsgRating(EsgInputData, CreditUtils):
self.update_process_status(rid['评价ID'], {"评价流程.报告生成": 1})
else:
self.update_process_status(rid['评价ID'], {"评价流程.报告生成": -1})
raise LogConditionCheckFailed("报告生成失败", 200)
raise ReturnConditionCheckFailed("报告生成失败", 200)
def gen_certified():
"""证书生成"""
@ -592,7 +595,7 @@ class EsgRating(EsgInputData, CreditUtils):
self.update_process_status(rid['评价ID'], {"评价流程.证书生成": 1})
else:
self.update_process_status(rid['评价ID'], {"评价流程.证书生成": -1})
raise LogConditionCheckFailed("证书生成失败", 200)
raise ReturnConditionCheckFailed("证书生成失败", 200)
def esg_index():
url = "http://api.fecribd.com/etl_tfse/company/esg_index_etl"
@ -606,7 +609,7 @@ class EsgRating(EsgInputData, CreditUtils):
self.update_process_status(rid['评价ID'], {"评价流程.主页ESG数据生成": 1})
else:
self.update_process_status(rid['评价ID'], {"评价流程.主页ESG数据生成": -1})
raise LogConditionCheckFailed("主页ESG数据生成失败", 200)
raise ReturnConditionCheckFailed("主页ESG数据生成失败", 200)
def update_rating_records():
"""更新完成时间和进行状态"""
@ -623,3 +626,4 @@ class EsgRating(EsgInputData, CreditUtils):
update_rating_records()
__main__()
return '执行成功', 200

View File

@ -64,8 +64,8 @@ def submit_general_rating_input(**kwargs):
try:
esg = EsgRating()
esg.cid = kwargs['cid']
esg.exec_esg_rating()
return {"info": '执行成功'}, 200
info, status = esg.exec_esg_rating()
return {"info": info}, status
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.__str__()}, e.status_code

21
TestForAdmin/APIAuth.py Normal file
View File

@ -0,0 +1,21 @@
import functools
from flask import request
def api_secret(func):
"""
校验接口请求密钥
"""
secret = "EZgo9ykxrYuBMYnYmmKIh" # 接口密钥
@functools.wraps(func)
def internal(*args, **kwargs):
try:
token = request.headers.get('secret')
if token != secret:
return {"info": "接口密钥错误"}, 401
except Exception:
return {"info": "请求异常"}, 401
return func(*args, **kwargs)
return internal

0
TestForAdmin/__init__.py Normal file
View File

98
TestForAdmin/route.py Normal file
View File

@ -0,0 +1,98 @@
from flask import Blueprint, request, Response
from werkzeug.security import generate_password_hash
from CompanyUser.CompanyUserImpl import CompanyVerifyThreeFactorsImpl, CompanyUserImpl, EmailVerifyCodeRecordImpl
from Rating.Credit.CreditImpl import CreditRating
from Rating.Esg.EsgImpl import EsgRating
from TestForAdmin.APIAuth import api_secret
from Utils.ErrorUtil import ReturnConditionCheckFailed
test_admin_route = Blueprint('test_admin', __name__)
@test_admin_route.route('/company_verify', methods=['GET'])
@api_secret
def company_verify_route():
"""企业认证"""
try:
company_verify = CompanyVerifyThreeFactorsImpl()
company_verify.cid = request.args['cid']
company_verify.company_verify()
return {"info": "认证成功"}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.failed_info}, e.status_code
except KeyError:
return {"info": "参数异常"}, 400
@test_admin_route.route('/general_rating', methods=['POST'])
@api_secret
def general_rating():
"""评价打分"""
try:
project = request.json['评价项目']
cid = request.json['企业ID']
if project == '综合信用评价':
credit = CreditRating()
credit.cid = cid
info, status = credit.exec_cc_rating()
return {"info": info}, status
else:
esg = EsgRating()
esg.cid = cid
info, status = esg.exec_esg_rating()
return {"info": info}, status
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.failed_info}, e.status_code
except KeyError:
return {"info": "参数异常"}, 400
@test_admin_route.route('/verify_email', methods=['POST'])
@api_secret
def verify_email():
"""
新注册企业
1.调用发送注册邮箱获取验证码
2.使用邮箱密码验证码进行注册
3.使用认证接口进行认证初始化企业数据
"""
try:
req = request.json
# 发送验证码
email_verify = EmailVerifyCodeRecordImpl()
email_verify.email = req['email']
email_verify.v_type = 'register'
email_verify.send_email()
return {"info": "邮件已发送"}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.failed_info}, e.status_code
except KeyError:
return {"info": "参数异常"}, 400
@test_admin_route.route('/register', methods=['POST'])
@api_secret
def register():
"""
新注册企业
1.调用发送注册邮箱获取验证码
2.使用邮箱密码验证码进行注册
3.使用认证接口进行认证初始化企业数据
"""
try:
req = request.json
# 注册
company_user = CompanyUserImpl()
company_user.email = req['email']
company_user.pwd = generate_password_hash("Fecr1988.")
company_user.register(req['code'])
return {"info": "注册成功"}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.failed_info}, e.status_code
except KeyError:
return {"info": "参数异常"}, 400

2
app.py
View File

@ -9,6 +9,7 @@ from Routes.User.TFSECompanyUserRoute import company_user_route
from file.file_routes import file_route
from Rating.Credit.CreditRoutes import credit_route
from Rating.Esg.EsgRoutes import esg_route
from TestForAdmin.route import test_admin_route
app = Flask(__name__)
CORS(app, supports_credentials=True)
@ -21,6 +22,7 @@ app.register_blueprint(company_user_route, url_prefix='/app/user')
app.register_blueprint(email_route, url_prefix='/app/notice/email')
app.register_blueprint(credit_route, url_prefix='/app/input')
app.register_blueprint(esg_route, url_prefix='/app/esg')
app.register_blueprint(test_admin_route, url_prefix='/app/test')
@app.route('/app/version')