tfse-app-api-v0.2/user/scripts.py

426 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
import time
import json
import requests
from werkzeug.security import check_password_hash, generate_password_hash
from user.user_utils import create_token
from common.text_recognition import id_card_recognition, business_license_recognition
from user.db import find_verify_record, find_user_info, update_verify_record, update_user_info_by_cid, insert_user_info, \
clean_verify_code, find_file, insert_file, delete_file
from common.rsa import decrypt_data
from common.scripts import make_id, make_verify_code, file_path, read_json_file
# 邮箱服务接口
EMAIL_API = 'http://116.63.130.34:30001'
def send_email(email, code, v_type):
"""
发送验证码到邮箱
Parameters:
email: 邮箱
code: 验证码
v_type: 验证类型
Returns:
邮件发送结果
"""
types = {
"register": "注册账号",
"resetpwd": "修改密码",
"resetemail": "修改邮箱"
}
headers = {"Content-Type": "application/json;charset=UTF-8"}
data = {"title": "【远东资信】{}".format(types[v_type]),
"sender": 'fecribd@fecr.com.cn',
"recipients": [email],
"msg_body": "{}的验证码为 【{}5分钟内有效。".format(types[v_type], code)}
response = requests.post(url=EMAIL_API + '/send_mail', headers=headers, data=json.dumps(data))
return response.text
def send_feedback_email(content):
"""
发送反馈邮件
Args:
content: 反馈内容
Returns:
info: 发送信息
"""
recipients = ["wangsichuan@fecr.com.cn"]
headers = {"Content-Type": "application/json;charset=UTF-8"}
data = {"title": '股交反馈邮件',
"sender": 'fecribd@fecr.com.cn',
"recipients": recipients,
"msg_body": content}
response = requests.post(url=EMAIL_API + '/send_mail', headers=headers, data=json.dumps(data))
return response.text
def check_mail_fmt(email):
"""
邮箱地址格式校验
Parameters:
email: 邮箱
Returns:
result: 邮箱校验结果正确返回True不正确返回False
"""
regex = "^.+\\@(\\[?)[a-zA-Z0-9\\-\\.]+\\.([a-zA-Z]{2,3}|[0-9]{1,3})(\\]?)$"
case = (len(email) > 7) and (re.match(regex, email) is not None)
result = True if case else False
return result
def check_pwd_fmt(pwd):
"""
密码强度校验
Parameters:
pwd: 密码(已加密)
Returns:
result: 密码强度校验结果正确返回True不正确返回False
"""
try:
password = decrypt_data(encrypt_msg=pwd)
except Exception:
return False
regex = "^(?![A-Za-z0-9]+$)(?![a-z0-9\\W]+$)(?![A-Za-z\\W]+$)(?![A-Z0-9\\W]+$)^.{8,}$"
case = (len(password) >= 8) and (re.match(regex, password) is not None)
result = True if case else False
return result
def check_registered(email):
"""
检查邮箱是否被注册
Parameters:
email: 邮箱
Returns:
邮箱未被注册 False
邮箱已被注册 True
"""
return False if find_user_info({"邮箱": email}) == [] else True
def check_verified(cid):
"""
检查企业是否已认证
Parameters:
cid:
Returns:
已认证 True
未认证 False
"""
return True if find_user_info({"企业ID": cid})[0]['已认证'] == "" else False
def check_verify_code(email, code):
"""
验证码是否正确
Parameters:
email: 邮箱
code: 验证码
Returns:
没有邮箱记录 False
验证码不正确 False
验证码过期 False
验证码正确 True
"""
record = find_verify_record({"邮箱": email})
if len(record) == 0:
return '验证码错误'
if record[0]['验证码'] != code:
return '验证码错误'
if time.time()-record[0]['验证有效期'] > 300:
return '验证码过期'
return True
def gen_new_cid():
"""
生成新的企业ID如果该ID存在则重新生成
Parameters:
-
Returns:
new_id: 生成的企业ID
"""
# 生成新ID
new_id = make_id()
# 检查新ID是否存在如果存在则继续生成新ID
case = find_user_info({"企业ID": new_id}) is []
while case:
new_id = make_id()
# 新ID可使用返回新ID
return new_id
def gen_new_user(email, pwd):
"""
生成新用户
Parameters:
email: 邮箱
pwd: 密码
Returns:
user: 新用户信息
"""
user = read_json_file(file_path('/user/static/template/user.json'))
user['企业ID'] = gen_new_cid()
user['邮箱'] = email
user['头像fid'] = "61a15b33e55c00003e0048e4"
user['密码'] = generate_password_hash(decrypt_data(encrypt_msg=pwd))
user['注册时间'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
user['已认证'] = ''
# 插入新用户数据
insert_user_info(user)
# 清除验证码和时间戳
clean_verify_code(email)
def gen_verify_code(email):
"""
生成邮箱验证码
Parameters:
email: 邮箱
Returns:
code: 验证码
"""
code = make_verify_code()
timestamp = time.time() + 300
update_verify_record(email, {"验证码": code, "验证有效期": timestamp})
return code
def check_pwd(email, pwd):
"""
检查用户登录密码
Parameters:
email: 邮箱
pwd: 密码
Returns:
通过校验 True
未通过校验 False
"""
record = find_user_info({"邮箱": email})
if not record:
return False
try:
if not check_password_hash(record[0]['密码'], decrypt_data(encrypt_msg=pwd)):
return False
except Exception:
return False
result = dict()
result['企业名称'] = record[0]['企业名称']
result['邮箱'] = record[0]['邮箱']
result['已认证'] = record[0]['已认证']
result['token'] = create_token({"cid": record[0]['企业ID']})
return result
def reset_pwd(email, pwd):
"""
修改密码
Args:
email: 邮箱
pwd: 新密码
Returns:
-
"""
record = find_user_info({"邮箱": email})
cid = record[0]['企业ID']
update_user_info_by_cid(cid, {"密码": generate_password_hash(decrypt_data(encrypt_msg=pwd))})
clean_verify_code(email)
def reset_email(cid, email):
"""
更换邮箱
Args:
cid: 企业ID
email: 邮箱
Returns:
-
"""
update_user_info_by_cid(cid, {"邮箱": email})
clean_verify_code(email)
def avatar_by_cid(cid):
"""
根据企业ID获取用户头像
Args:
cid:
Returns:
filestream
"""
fid = find_user_info({"企业ID": cid})[0]['头像fid']
filestream = find_file('头像', fid)
return filestream
def refresh_user_info(cid):
"""
根据cid查看、刷新用户信息
Args:
cid: 企业ID
Returns:
result: 用户基本信息
"""
record = find_user_info({"企业ID": cid})
result = dict()
result['企业名称'] = record[0]['企业名称']
result['邮箱'] = record[0]['邮箱']
result['已认证'] = record[0]['已认证']
return result
def id_card_recognition_result(cid, image):
"""
身份证文本识别
Args:
cid: 企业ID
image: 图片
Returns:
"""
if check_verified(cid):
return "已认证不能上传"
# 身份证正面文本识别
result = id_card_recognition(side='face', image=image)
# 重置image读取位置不然下一步读不出图片
image.seek(0)
# 图片识别错误
if result == '识别错误':
return result
if result == '伪造身份证':
return result
else:
# 图片识别正确,保存身份证图片
new_file_id = insert_file('身份证', image.read(), image.content_type, result['姓名'])
# 删除原身份证图片
old_file_id = find_user_info({"企业ID": cid})[0]['认证信息']['身份证fid']
if old_file_id is not None:
delete_file('身份证', old_file_id)
# 更新企业认证中身份证的信息
update_user_info_by_cid(cid, {"认证信息.法人姓名": result['姓名'],
"认证信息.法人身份证": result['身份证号码'],
"认证信息.身份证fid": new_file_id})
return '识别正确', {"姓名": result['姓名'], "身份证号码": result['身份证号码']}
def business_license_recognition_result(cid, image):
"""
营业执照识别结果
Parameters:
cid: 企业ID
image: 图片
Returns:
info 识别结果
result 识别信息
"""
if check_verified(cid):
return "已认证不能上传"
# 识别营业执照
result = business_license_recognition(image=image)
# 重置image读取位置不然下一步读不出图片
image.seek(0)
# 图片识别错误
if result == '识别错误':
return result
else:
# 图片识别正确,保存图片
new_file_id = insert_file('营业执照', image.read(), image.content_type, result['名称'])
# 删除原图片
old_file_id = find_user_info({"企业ID": cid})[0]['认证信息']['营业执照fid']
if old_file_id is not None:
delete_file('营业执照', old_file_id)
# 更新企业认证中营业执照的信息
update_user_info_by_cid(cid, {"认证信息.企业名称": result['名称'],
"认证信息.统一社会信用代码": result['统一社会信用代码'],
"认证信息.营业执照fid": new_file_id})
return '识别正确', {"公司名称": result['名称'], "统一社会信用代码": result['统一社会信用代码']}
def tyc_company_verify(cid):
"""
天眼查企业认证三要素
Parameters:
code 统一社会信用代码
name 企业名称
legal_person 法人姓名
Returns:
通过 True
未通过 False
"""
# 根据cid获取企业上传的认证信息
v_info = find_user_info({"企业ID": cid})[0]['认证信息']
code = v_info['统一社会信用代码']
name = v_info['企业名称']
legal_person = v_info['法人姓名']
# 认证信息有缺失
if None in [code, name, legal_person]:
return '认证信息不完整'
# 调用天眼查企业三要素验证接口
token = "5407573b-e41e-4ab4-9f46-daa0032ad0a4"
base = "http://open.api.tianyancha.com/services/open/ic/verify/2.0?code={}&name={}&legalPersonName={}"
url = base.format(code, name, legal_person)
headers = {'Authorization': token}
res = json.loads(requests.get(url, headers=headers).text)
try:
if res['result']['result'] == 1:
# 初始化企业数据
init_url = "http://139.9.249.34:51011/etl_tfse/company/init_company_data"
init_token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4"
init_headers = {'token': init_token}
data = json.dumps({"cid": cid, "company_name": name})
requests.post(init_url, headers=init_headers, data=data)
# 更新用户认证状态
update_user_info_by_cid(cid, {"企业名称": name, "已认证": ""})
return '认证成功'
else:
return '认证失败'
except Exception:
return '认证失败'
def choose_avatar(cid, avatar_id):
"""
选择头像
Args:
cid: 企业ID
avatar_id: 头像文件ID
Returns:
-
"""
avatars = {
"avatar_01": "61a15b33e55c00003e0048e4",
"avatar_02": "61a657081b4700006a004765",
"avatar_03": "61a657081b4700006a004767",
"avatar_04": "61a657081b4700006a004766",
"avatar_05": "61a657091b4700006a004768",
"avatar_06": "61a657091b4700006a004769"
}
if avatar_id not in avatars.keys():
pass
file_id = avatars[avatar_id]
update_user_info_by_cid(cid, {"头像fid": file_id})