tfse-app-api-v0.2/user/user_impl.py

420 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import time
import json
import random
import requests
from werkzeug.security import check_password_hash, generate_password_hash
from user.user_utils import create_token, make_id
from user.user_db import find_verify_record, FIND_USER_INFO, update_verify_record, UPDATE_USER_INFO_BY_CID, insert_user_info, \
clean_verify_code, find_file, insert_file, delete_file
from common.rsa import decrypt_data
from common.text_recognition import id_card_recognition, business_license_recognition
def send_email(email, code, v_type):
"""
发送验证码到邮箱
Parameters:
email: 邮箱
code: 验证码
v_type: 验证类型
Returns:
邮件发送结果
"""
# 邮箱服务接口
email_api = 'http://116.63.130.34:30001'
types = {
"register": "注册账号",
"resetpwd": "修改密码",
"resetemail": "修改邮箱"
}
headers = {"Content-Type": "application/json;charset=UTF-8"}
data = {"title": "【远东资信】{}".format(types[v_type]),
"sender": 'fecribd@fecr.com.cn',
"recipients": [email],
"msg_body": "{}的验证码为 【{}5分钟内有效。".format(types[v_type], code)}
response = requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data))
return response.text
def send_feedback_email(content):
"""
发送反馈邮件
Args:
content: 反馈内容
Returns:
info: 发送信息
"""
# 邮箱服务接口
email_api = 'http://116.63.130.34:30001'
recipients = ["wangsichuan@fecr.com.cn"]
headers = {"Content-Type": "application/json;charset=UTF-8"}
data = {"title": '股交反馈邮件',
"sender": 'fecribd@fecr.com.cn',
"recipients": recipients,
"msg_body": content}
response = requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data))
return response.text
def check_verified(cid):
"""
检查企业是否已认证
Parameters:
cid:
Returns:
已认证 True
未认证 False
"""
return True if FIND_USER_INFO({"企业ID": cid})[0]['已认证'] == "" else False
def gen_new_cid():
"""
生成新的企业ID如果该ID存在则重新生成
Parameters:
-
Returns:
new_id: 生成的企业ID
"""
# 生成新ID
new_id = make_id(8)
# 检查新ID是否存在如果存在则继续生成新ID
case = FIND_USER_INFO({"企业ID": new_id}) is []
while case:
new_id = make_id(8)
# 新ID可使用返回新ID
return new_id
def register_impl(email, pwd):
"""
生成新用户
Parameters:
email: 邮箱
pwd: 密码
Returns:
user: 新用户信息
"""
file_rel_path = "/static/template/user.json"
with open(os.path.abspath(os.path.dirname(__file__)+file_rel_path), "r", encoding='utf-8') as f:
user = json.load(f)
user['企业ID'] = gen_new_cid()
user['邮箱'] = email
user['头像fid'] = "61a15b33e55c00003e0048e4"
user['密码'] = generate_password_hash(decrypt_data(encrypt_msg=pwd))
user['注册时间'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
user['已认证'] = ''
# 插入新用户数据
insert_user_info(user)
# 清除验证码和时间戳
clean_verify_code(email)
def gen_verify_code(email):
"""
生成邮箱验证码
Parameters:
email: 邮箱
Returns:
code: 验证码
"""
def make_verify_code():
"""
随机生成6位数字主要用于验证码
Parameters:
-
Returns:
num_code: 6位数字
"""
num_code = ""
for i in range(6):
ch = chr(random.randrange(ord('0'), ord('9') + 1))
num_code += ch
return num_code
code = make_verify_code()
timestamp = time.time() + 300
update_verify_record(email, {"验证码": code, "验证有效期": timestamp})
return code
def check_pwd(email, pwd):
"""
检查用户登录密码
Parameters:
email: 邮箱
pwd: 密码
Returns:
通过校验 True
未通过校验 False
"""
record = FIND_USER_INFO({"邮箱": email})
if not record:
return False
try:
if not check_password_hash(record[0]['密码'], decrypt_data(encrypt_msg=pwd)):
return False
except Exception:
return False
result = dict()
result['企业名称'] = record[0]['企业名称']
result['邮箱'] = record[0]['邮箱']
result['已认证'] = record[0]['已认证']
result['token'] = create_token({"cid": record[0]['企业ID']})
return result
def reset_pwd(email, pwd):
"""
修改密码
Args:
email: 邮箱
pwd: 新密码
Returns:
-
"""
record = FIND_USER_INFO({"邮箱": email})
cid = record[0]['企业ID']
UPDATE_USER_INFO_BY_CID(cid, {"密码": generate_password_hash(decrypt_data(encrypt_msg=pwd))})
clean_verify_code(email)
def reset_email(cid, email):
"""
更换邮箱
Args:
cid: 企业ID
email: 邮箱
Returns:
-
"""
UPDATE_USER_INFO_BY_CID(cid, {"邮箱": email})
clean_verify_code(email)
def avatar_by_cid(cid):
"""
根据企业ID获取用户头像
Args:
cid:
Returns:
filestream
"""
fid = FIND_USER_INFO({"企业ID": cid})[0]['头像fid']
filestream = find_file('头像', fid)
return filestream
def refresh_user_info(cid):
"""
根据cid查看、刷新用户信息
Args:
cid: 企业ID
Returns:
result: 用户基本信息
"""
record = FIND_USER_INFO({"企业ID": cid})
result = dict()
result['企业名称'] = record[0]['企业名称']
result['邮箱'] = record[0]['邮箱']
result['已认证'] = record[0]['已认证']
return result
def id_card_recognition_result(cid, image):
"""
身份证文本识别
Args:
cid: 企业ID
image: 图片
Returns:
"""
if check_verified(cid):
return "已认证不能上传"
# 身份证正面文本识别
result = id_card_recognition(side='face', image=image)
# 重置image读取位置不然下一步读不出图片
image.seek(0)
# 图片识别错误
if result == '识别错误':
return result
if result == '伪造身份证':
return result
else:
# 图片识别正确,保存身份证图片
new_file_id = insert_file('身份证', image.read(), image.content_type, result['姓名'])
# 删除原身份证图片
old_file_id = FIND_USER_INFO({"企业ID": cid})[0]['认证信息']['身份证fid']
if old_file_id is not None:
delete_file('身份证', old_file_id)
# 更新企业认证中身份证的信息
UPDATE_USER_INFO_BY_CID(cid, {"认证信息.法人姓名": result['姓名'],
"认证信息.法人身份证": result['身份证号码'],
"认证信息.身份证fid": new_file_id})
return '识别正确', {"姓名": result['姓名'], "身份证号码": result['身份证号码']}
def business_license_recognition_result(cid, image):
"""
营业执照识别结果
Parameters:
cid: 企业ID
image: 图片
Returns:
info 识别结果
result 识别信息
"""
if check_verified(cid):
return "已认证不能上传"
# 识别营业执照
result = business_license_recognition(image=image)
# 重置image读取位置不然下一步读不出图片
image.seek(0)
# 图片识别错误
if result == '识别错误':
return result
else:
# 图片识别正确,保存图片
new_file_id = insert_file('营业执照', image.read(), image.content_type, result['名称'])
# 删除原图片
old_file_id = FIND_USER_INFO({"企业ID": cid})[0]['认证信息']['营业执照fid']
if old_file_id is not None:
delete_file('营业执照', old_file_id)
# 更新企业认证中营业执照的信息
UPDATE_USER_INFO_BY_CID(cid, {"认证信息.企业名称": result['名称'],
"认证信息.统一社会信用代码": result['统一社会信用代码'],
"认证信息.营业执照fid": new_file_id})
return '识别正确', {"公司名称": result['名称'], "统一社会信用代码": result['统一社会信用代码']}
def tyc_company_verify(cid):
"""
天眼查企业认证三要素
Parameters:
code 统一社会信用代码
name 企业名称
legal_person 法人姓名
Returns:
通过 True
未通过 False
"""
# 根据cid获取企业上传的认证信息
records = FIND_USER_INFO({"企业ID": cid})
if not records:
return "数据异常: 没有该企业记录"
record = records[0]['认证信息']
code = record['统一社会信用代码']
name = record['企业名称']
legal_person = record['法人姓名']
# 检查认证信息是否完整
def check_verify_info_complete():
if None in [code, name, legal_person]:
return '认证信息不完整'
else:
return True
# 调用天眼查企业三要素验证接口
def check_three_elements_by_tyc():
token = "5407573b-e41e-4ab4-9f46-daa0032ad0a4"
base = "http://open.api.tianyancha.com/services/open/ic/verify/2.0?code={}&name={}&legalPersonName={}"
url = base.format(code, name, legal_person)
headers = {'Authorization': token}
res = json.loads(requests.get(url, headers=headers).text)
try:
if res['result']['result'] == 1:
return True
else:
return '企业三要素认证不通过'
except Exception:
return '接口异常: 认证接口异常'
# 初始化企业主页数据
def init_company_data():
init_url = "http://139.9.249.34:51011/etl_tfse/company/init_company_data"
init_token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4"
init_headers = {'token': init_token}
data = json.dumps({"cid": cid, "company_name": name})
requests.post(init_url, headers=init_headers, data=data)
# 更新企业认证状态
def update_verify_status():
UPDATE_USER_INFO_BY_CID(cid, {"企业名称": name, "已认证": ""})
# 执行流程
def start_impl():
res = check_verify_info_complete()
if res is not True:
return res
res = check_three_elements_by_tyc()
if res is not True:
return res
init_company_data()
update_verify_status()
return True
return start_impl()
def choose_avatar(cid, avatar_id):
"""
选择头像
Args:
cid: 企业ID
avatar_id: 头像文件ID
Returns:
-
"""
avatars = {
"avatar_01": "61a15b33e55c00003e0048e4",
"avatar_02": "61a657081b4700006a004765",
"avatar_03": "61a657081b4700006a004767",
"avatar_04": "61a657081b4700006a004766",
"avatar_05": "61a657091b4700006a004768",
"avatar_06": "61a657091b4700006a004769"
}
if avatar_id not in avatars.keys():
pass
file_id = avatars[avatar_id]
UPDATE_USER_INFO_BY_CID(cid, {"头像fid": file_id})