tfse-app-api-v0.2/user/user_impl.py

449 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import time
import json
import random
import requests
from werkzeug.security import check_password_hash, generate_password_hash
from user.user_auth import create_token
from user.user_utils import make_id, check_mail_fmt, check_pwd_fmt, check_registered, check_verify_code
from user.user_db import FIND_USER_INFO, UPDATE_VERIFY_CODE, UPDATE_USER_INFO_BY_CID, INSERT_USER_INFO, \
CLEAN_VERIFY_CODE, FIND_FILE, INSERT_FILE, DELETE_FILE
from common.rsa import decrypt_data
from common.text_recognition import id_card_recognition, business_license_recognition
def send_email(email, code, v_type):
"""
发送验证码到邮箱
Parameters:
email: 邮箱
code: 验证码
v_type: 验证类型
Returns:
邮件发送结果
"""
# 邮箱服务接口
email_api = 'http://116.63.130.34:30001'
types = {
"register": "注册账号",
"resetpwd": "修改密码",
"resetemail": "修改邮箱"
}
headers = {"Content-Type": "application/json;charset=UTF-8"}
data = {"title": "【远东资信】{}".format(types[v_type]),
"sender": 'fecribd@fecr.com.cn',
"recipients": [email],
"msg_body": "{}的验证码为 【{}5分钟内有效。".format(types[v_type], code)}
response = requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data))
return response.text
def send_feedback_email(content):
"""
发送反馈邮件
Args:
content: 反馈内容
Returns:
info: 发送信息
"""
# 邮箱服务接口
email_api = 'http://116.63.130.34:30001'
recipients = ["wangsichuan@fecr.com.cn"]
headers = {"Content-Type": "application/json;charset=UTF-8"}
data = {"title": '股交反馈邮件',
"sender": 'fecribd@fecr.com.cn',
"recipients": recipients,
"msg_body": content}
response = requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data))
return response.text
def check_verified(cid):
"""
检查企业是否已认证
Parameters:
cid:
Returns:
已认证 True
未认证 False
"""
return True if FIND_USER_INFO({"企业ID": cid})[0]['已认证'] == "" else False
def gen_new_cid():
"""
生成新的企业ID如果该ID存在则重新生成
Parameters:
-
Returns:
new_id: 生成的企业ID
"""
# 生成新ID
new_id = make_id(8)
# 检查新ID是否存在如果存在则继续生成新ID
case = FIND_USER_INFO({"企业ID": new_id}) is []
while case:
new_id = make_id(8)
# 新ID可使用返回新ID
return new_id
def register_impl(email, pwd, code):
"""
生成新用户
Parameters:
email: 邮箱
pwd: 密码
code: 校验码
Returns:
user: 新用户信息
"""
# 注册用户前检查
def check_before_register():
# 校验邮箱格式
if not check_mail_fmt(email):
return "邮箱格式错误"
# 校验密码强度
if not check_pwd_fmt(pwd):
return "密码强度太低"
# 检验邮箱是否被注册
if check_registered(email):
return "邮箱已被注册"
# 验证码是否正确
info = check_verify_code(email, code)
if info is not True:
return info
return True
# 根据模板创建用户
def create_user_by_template():
file_rel_path = "/static/template/user.json"
with open(os.path.abspath(os.path.dirname(__file__)+file_rel_path), "r", encoding='utf-8') as f:
user = json.load(f)
user['企业ID'] = gen_new_cid()
user['邮箱'] = email
user['头像fid'] = "61a15b33e55c00003e0048e4"
user['密码'] = generate_password_hash(decrypt_data(encrypt_msg=pwd))
user['注册时间'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
user['已认证'] = ''
# 插入新用户数据
INSERT_USER_INFO(user)
# 清除验证码和时间戳
CLEAN_VERIFY_CODE(email)
return True
def main_process():
# 注册前检查
res = check_before_register()
if res is not True:
return res
# 通过以上校验,生成新用户信息,插入新用户数据,清除验证码和时间戳
res = create_user_by_template()
if res is not True:
return '注册失败'
else:
return '注册成功'
return main_process()
def gen_verify_code(email):
"""
生成邮箱验证码
Parameters:
email: 邮箱
Returns:
code: 验证码
"""
def make_verify_code():
"""
随机生成6位数字主要用于验证码
Parameters:
-
Returns:
num_code: 6位数字
"""
num_code = ""
for i in range(6):
ch = chr(random.randrange(ord('0'), ord('9') + 1))
num_code += ch
return num_code
code = make_verify_code()
timestamp = time.time() + 300
UPDATE_VERIFY_CODE(email, {"验证码": code, "验证有效期": timestamp})
return code
def login_impl(email, pwd):
"""
检查用户登录密码
Parameters:
email: 邮箱
pwd: 密码
Returns:
通过校验 True
未通过校验 False
"""
record = FIND_USER_INFO({"邮箱": email})
if not record:
return False
try:
if not check_password_hash(record[0]['密码'], decrypt_data(encrypt_msg=pwd)):
return False
except Exception:
return False
result = dict()
result['企业名称'] = record[0]['企业名称']
result['邮箱'] = record[0]['邮箱']
result['已认证'] = record[0]['已认证']
result['token'] = create_token({"cid": record[0]['企业ID']})
return result
def change_pwd_impl(email, pwd):
"""
修改密码
Args:
email: 邮箱
pwd: 新密码
Returns:
-
"""
record = FIND_USER_INFO({"邮箱": email})
cid = record[0]['企业ID']
UPDATE_USER_INFO_BY_CID(cid, {"密码": generate_password_hash(decrypt_data(encrypt_msg=pwd))})
CLEAN_VERIFY_CODE(email)
def change_email_impl(cid, email):
"""
更换邮箱
Args:
cid: 企业ID
email: 邮箱
Returns:
-
"""
UPDATE_USER_INFO_BY_CID(cid, {"邮箱": email})
CLEAN_VERIFY_CODE(email)
def get_avatar_impl(cid):
"""
根据企业ID获取用户头像
Args:
cid:
Returns:
filestream
"""
fid = FIND_USER_INFO({"企业ID": cid})[0]['头像fid']
filestream = FIND_FILE('头像', fid)
return filestream
def refresh_user_info(cid):
"""
根据cid查看、刷新用户信息
Args:
cid: 企业ID
Returns:
result: 用户基本信息
"""
record = FIND_USER_INFO({"企业ID": cid})
result = dict()
result['企业名称'] = record[0]['企业名称']
result['邮箱'] = record[0]['邮箱']
result['已认证'] = record[0]['已认证']
return result
def id_card_recognition_result(cid, image):
"""
身份证文本识别
Args:
cid: 企业ID
image: 图片
Returns:
"""
if check_verified(cid):
return "已认证不能上传"
# 身份证正面文本识别
result = id_card_recognition(side='face', image=image)
# 重置image读取位置不然下一步读不出图片
image.seek(0)
# 图片识别错误
if result == '识别错误':
return result
if result == '伪造身份证':
return result
else:
# 图片识别正确,保存身份证图片
new_file_id = INSERT_FILE('身份证', image.read(), image.content_type, result['姓名'])
# 删除原身份证图片
old_file_id = FIND_USER_INFO({"企业ID": cid})[0]['认证信息']['身份证fid']
if old_file_id is not None:
DELETE_FILE('身份证', old_file_id)
# 更新企业认证中身份证的信息
UPDATE_USER_INFO_BY_CID(cid, {"认证信息.法人姓名": result['姓名'],
"认证信息.法人身份证": result['身份证号码'],
"认证信息.身份证fid": new_file_id})
return '识别正确', {"姓名": result['姓名'], "身份证号码": result['身份证号码']}
def business_license_recognition_result(cid, image):
"""
营业执照识别结果
Parameters:
cid: 企业ID
image: 图片
Returns:
info 识别结果
result 识别信息
"""
if check_verified(cid):
return "已认证不能上传"
# 识别营业执照
result = business_license_recognition(image=image)
# 重置image读取位置不然下一步读不出图片
image.seek(0)
# 图片识别错误
if result == '识别错误':
return result
else:
# 图片识别正确,保存图片
new_file_id = INSERT_FILE('营业执照', image.read(), image.content_type, result['名称'])
# 删除原图片
old_file_id = FIND_USER_INFO({"企业ID": cid})[0]['认证信息']['营业执照fid']
if old_file_id is not None:
DELETE_FILE('营业执照', old_file_id)
# 更新企业认证中营业执照的信息
UPDATE_USER_INFO_BY_CID(cid, {"认证信息.企业名称": result['名称'],
"认证信息.统一社会信用代码": result['统一社会信用代码'],
"认证信息.营业执照fid": new_file_id})
return '识别正确', {"公司名称": result['名称'], "统一社会信用代码": result['统一社会信用代码']}
def tyc_company_verify(cid):
"""
天眼查企业认证三要素
Parameters:
code 统一社会信用代码
name 企业名称
legal_person 法人姓名
Returns:
通过 True
未通过 False
"""
# 根据cid获取企业上传的认证信息
records = FIND_USER_INFO({"企业ID": cid})
if not records:
return "数据异常: 没有该企业记录"
record = records[0]['认证信息']
code = record['统一社会信用代码']
name = record['企业名称']
legal_person = record['法人姓名']
# 检查认证信息是否完整
def check_verify_info_complete():
if None in [code, name, legal_person]:
return '认证信息不完整'
else:
return True
# 调用天眼查企业三要素验证接口
def check_three_elements_by_tyc():
token = "5407573b-e41e-4ab4-9f46-daa0032ad0a4"
base = "http://open.api.tianyancha.com/services/open/ic/verify/2.0?code={}&name={}&legalPersonName={}"
url = base.format(code, name, legal_person)
headers = {'Authorization': token}
res = json.loads(requests.get(url, headers=headers).text)
try:
if res['result']['result'] == 1:
return True
else:
return '企业三要素认证不通过'
except Exception:
return '接口异常: 认证接口异常'
# 初始化企业主页数据
def init_company_data():
init_url = "http://api.fecribd.com/etl_tfse/company/init_company_data"
init_token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4"
init_headers = {'token': init_token}
data = json.dumps({"cid": cid, "company_name": name})
requests.post(init_url, headers=init_headers, data=data)
# 更新企业认证状态
def update_verify_status():
UPDATE_USER_INFO_BY_CID(cid, {"企业名称": name, "已认证": ""})
# 执行流程
def start_impl():
res = check_verify_info_complete()
if res is not True:
return res
res = check_three_elements_by_tyc()
if res is not True:
return res
init_company_data()
update_verify_status()
return True
return start_impl()
def choose_avatar(cid, avatar_id):
"""
选择头像
Args:
cid: 企业ID
avatar_id: 头像文件ID
Returns:
-
"""
avatars = {
"avatar_01": "61a15b33e55c00003e0048e4",
"avatar_02": "61a657081b4700006a004765",
"avatar_03": "61a657081b4700006a004767",
"avatar_04": "61a657081b4700006a004766",
"avatar_05": "61a657091b4700006a004768",
"avatar_06": "61a657091b4700006a004769"
}
if avatar_id not in avatars.keys():
pass
file_id = avatars[avatar_id]
UPDATE_USER_INFO_BY_CID(cid, {"头像fid": file_id})