tfse-admin-api-v0.2/TestCenter/tc_impl.py

279 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import random
import re
import time
import base64
import requests
from werkzeug.security import generate_password_hash
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5 as PKCS1_cipher
from Board.board_db import FIND_DATA, INSERT_DATA
from TestCenter.tc_db import update_user_info_by_cid, find_user_info_by_cid, delete_data, find_data, update_data, \
delete_pdf
def delete_rid_cascade_scripts(rid):
"""
级联删除与评价ID有关的评价数据
1.删除评价报告
2.删除评价结果
3.删除综合评价填报
4.删除评价记录
5.删除风险数据
Parameters:
rid: 评价ID
Returns:
res: True 执行结果
"""
rids = find_data('评价', '评价记录', {'评价ID': rid})
if rids:
if rids[0]['进行状态'] == '完成':
# 删除报告/证书
report_id = rids[0]['报告fid']
delete_pdf('评价', '综合信用报告', report_id)
certificate_id = rids[0]['报告fid']
delete_pdf('评价', '评价证书', certificate_id)
# 更新此条数据
insert = {'进行状态': '进行', '评价结果': None, '评价时间': None, '报告fid': None, '证书fid': None}
res = update_data('评价', '评价记录', {'评价ID': rid}, insert)
if res:
delete_data('评价', '评价结果', {'评价ID': rid})
delete_data('评价', '报告数据', {'评价ID': rid})
return '删除成功'
else:
return '评价记录数据重置失败'
else:
return '不能删除正在进行的评价记录'
else:
return '未查询到此rid相关记录'
def delete_cid_cascade_scripts(cid):
"""
级联删除uid有关的所有数据
Parameters:
cid: 企业ID
Returns:
res: True 执行结果
"""
delete_data('用户', '用户信息', {"企业ID": cid})
delete_data('企业', '指标明细', {"企业ID": cid})
delete_data('企业', '综合评价分析', {"企业ID": cid})
delete_data('企业', '公司基本信息', {"企业ID": cid})
delete_data('企业', '财务要素分析', {"企业ID": cid})
delete_data('企业', '财务要素分析', {"企业ID": cid})
rids = find_data('评价', '评价记录', {"企业ID": cid})
if rids:
for rid in rids:
delete_rid_cascade_scripts(rid['评价ID'])
return True
def company_verify_script(company, code, legal_person, cid):
"""
输入企业名称、统一社会信用代码、法人姓名进行企业认证
Parameters:
company: 企业名称
code: 统一社会信用代码
legal_person: 法人姓名
cid: 企业ID
Returns:
result: 认证结果
"""
def check_verified():
# 是否认证
r = find_user_info_by_cid(cid)
if len(r) < 0:
return '未查到该用户信息'
else:
auth = r[0]['已认证']
if auth == '':
return '该用户已认证'
# 认证信息有缺失
if None in [company, code, legal_person]:
return '认证信息不完整'
def tyc_api():
"""
# 调用天眼查企业三要素验证接口
Parameters:
-
Returns:
result: 查询结果
"""
token = "5407573b-e41e-4ab4-9f46-daa0032ad0a4"
base = "http://open.api.tianyancha.com/services/open/ic/verify/2.0?code={}&name={}&legalPersonName={}"
url = base.format(code, company, legal_person)
headers = {'Authorization': token}
res = json.loads(requests.get(url, headers=headers).text)
return res
def main_process():
check_result = check_verified()
if check_result:
return check_result
try:
result = tyc_api()
if result['result']['result'] == 1:
# 初始化企业数据
init_url = "http://api.fecribd.com/etl_tfse/company/init_company_data"
init_token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4"
init_headers = {'token': init_token}
data = json.dumps({"cid": cid, "company_name": company})
requests.post(init_url, headers=init_headers, data=data)
# 更新用户认证状态
update_user_info_by_cid(cid, {"企业名称": company, "已认证": ""})
return '认证成功'
else:
return '认证失败'
except Exception:
return '认证失败'
return main_process()
def change_verified_script(cid):
"""
将企业认证状态改为否
Parameters:
cid: 企业ID
Returns:
res: True 执行结果
"""
# 更新用户认证状态
insert_dict = dict()
insert_dict['企业名称'] = None
insert_dict['统一社会信用代码'] = None
insert_dict['法人姓名'] = None
insert_dict['法人身份证'] = None
insert_dict['营业执照fid'] = None
insert_dict['身份证fid'] = None
info = update_user_info_by_cid(cid, {"企业名称": None, "已认证": "", "认证信息": insert_dict})
return info
def new_test_company_impl(email, company, code, legal_person):
"""
新增一家测试企业
"""
def check_mail_fmt():
"""
邮箱地址格式校验
Parameters:
-
Returns:
result: 邮箱校验结果正确返回True不正确返回False
"""
regex = "^.+\\@(\\[?)[a-zA-Z0-9\\-\\.]+\\.([a-zA-Z]{2,3}|[0-9]{1,3})(\\]?)$"
case = (len(email) > 7) and (re.match(regex, email) is not None)
result = True if case else False
return result
def check_registered():
"""
检查邮箱是否被注册
Parameters:
email: 邮箱
Returns:
邮箱未被注册 False
邮箱已被注册 True
"""
return False if FIND_DATA("TFSE", "用户", "验证记录", {"邮箱": email}) == [] else True
def gen_new_cid():
"""
生成新的企业ID如果该ID存在则重新生成
Parameters:
-
Returns:
new_id: 生成的企业ID
"""
def make_id(num):
"""
随机生成字符串
"""
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
salt = ''
for i in range(num):
salt += random.choice(choices)
return salt
# 生成新ID
new_id = make_id(8)
# 检查新ID是否存在如果存在则继续生成新ID
case = FIND_DATA("TFSE", "用户", "用户信息", {"企业ID": new_id}) is []
while case:
new_id = make_id(8)
# 新ID可使用返回新ID
return new_id
cid = gen_new_cid()
def decrypt_data(**kwargs):
"""
用私钥解密
Parameters:
encrypt_msg str 加密信息
Returns:
执行正确 解密后结果
执行错误 False
"""
try:
encrypt_msg = kwargs['encrypt_msg'] # 加密信息
with open(os.path.abspath(os.path.dirname(__file__) + '/static/rsa_private_key.pem')) as f:
data = f.read()
private_key = RSA.importKey(data)
cipher = PKCS1_cipher.new(private_key)
back_text = cipher.decrypt(base64.b64decode(encrypt_msg), 0)
return back_text.decode('utf-8')
except Exception:
return False
def create_user_by_template():
file_rel_path = "/static/user.json"
with open(os.path.abspath(os.path.dirname(__file__)+file_rel_path), "r", encoding='utf-8') as f:
user = json.load(f)
user['企业ID'] = cid
user['邮箱'] = email
user['头像fid'] = "61a15b33e55c00003e0048e4"
user['密码'] = generate_password_hash(decrypt_data(encrypt_msg="Fecr1988."))
user['注册时间'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
user['已认证'] = ''
# 插入新用户数据
INSERT_DATA("TFSE", "用户", "用户信息", user)
return True
def new_company_main_process():
# 校验邮箱格式
if not check_mail_fmt():
return "邮箱格式错误"
# 检验邮箱是否被注册
if check_registered():
return "邮箱已被注册"
# 通过以上校验,生成新用户信息,插入新用户数据
res = create_user_by_template()
if res is not True:
return '新增企业失败'
return company_verify_script(company, code, legal_person, cid)
return new_company_main_process()