用户模块重构

This commit is contained in:
王思川 2022-04-27 15:02:43 +08:00
parent cf59941688
commit 37fe78b690
7 changed files with 357 additions and 980 deletions

View File

@ -1,564 +0,0 @@
import json
import time
import random
import base64
import requests
from urllib.request import Request, urlopen
from werkzeug.security import check_password_hash
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from DBHelper.MongoHelper import MongoHelper
from Utils.ErrorUtil import ReturnConditionCheckFailed
from CompanyUser.CompanyUserObj import CompanyUser, VerifyInfo, EmailVerifyCodeRecord, VerifyMaterial, \
CompanyVerifyThreeFactors
class CompanyUserImpl(CompanyUser):
"""企业用户实现类"""
db = MongoHelper("tfse_v0.21")
def check_email_registered(self):
"""检查邮箱已被注册"""
email = self.db.find_single_column(
"应用端",
"企业用户",
{"邮箱": self.email},
"邮箱"
)
if email is not None:
raise ReturnConditionCheckFailed("邮箱已被注册", 200)
def check_email_not_registered(self):
"""检查邮箱未被注册"""
email = self.db.find_single_column(
"应用端",
"企业用户",
{"邮箱": self.email},
"邮箱"
)
if email is None:
raise ReturnConditionCheckFailed("邮箱未被注册", 200)
def check_vcode_correct(self, code):
"""检查邮箱验证码是否正确"""
verify_data = self.db.find_single_data(
"应用端",
"邮箱验证码记录",
{"邮箱": self.email},
["验证码", "验证有效期"]
)
if not verify_data:
raise ReturnConditionCheckFailed("验证码错误", 200)
if verify_data['验证码'] != code:
raise ReturnConditionCheckFailed("验证码错误", 200)
if time.time() - verify_data['验证有效期'] > 300:
raise ReturnConditionCheckFailed('验证码过期', 200)
def login(self):
"""登录"""
company_user = self.db.find_single_data(
"应用端",
"企业用户",
{"邮箱": self.email},
["企业ID", "企业名称", "邮箱", "密码", "已认证"]
)
def check_user_existed():
"""检查用户是否存在"""
if company_user is None:
raise ReturnConditionCheckFailed("不存在该用户", 200)
def check_password_correct():
"""检查密码是否正确"""
if not check_password_hash(company_user['密码'], self.pwd):
raise ReturnConditionCheckFailed("登录密码错误", 200)
def create_token(param):
"""
创建token
param: 传入参数用于创建token
return: 用户token
"""
secret_key = '0FTuOi^#Afx1@2@F'
token_expire = 14400
s = Serializer(secret_key, expires_in=token_expire)
token = '' + s.dumps(param).decode('ascii')
return token
def __main__():
""""""
check_user_existed()
check_password_correct()
self.cid = company_user['企业ID']
self.name = company_user['企业名称']
self.verify_status = company_user['已认证']
return_body = self.dict_to_show(columns=["企业名称", "认证状态", "token"])
return_body['token'] = create_token({"cid": self.cid})
return return_body
return __main__()
def register(self, code):
"""注册"""
def make_new_cid():
"""生成新的企业ID如果该ID存在则重新生成"""
def random_cid(num):
"""随机企业ID"""
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
salt = ''
for i in range(num):
salt += random.choice(choices)
return salt
new_cid = random_cid(8)
case = self.db.find_single_column(
"应用端",
"企业用户",
{"企业ID": new_cid},
"企业ID"
) is not None
while case:
new_cid = random_cid(8)
return new_cid
def __main__():
""""""
self.check_email_registered()
self.check_vcode_correct(code)
self.cid = make_new_cid()
self.avatar_id = "623152edf36000004f00124e"
self.verify_status = ""
self.verify_info = VerifyInfo()
self.db.insert_single_data(
"应用端",
"企业用户",
self.dict_to_save()
)
self.db.delete_single_data(
"应用端",
"邮箱验证码记录",
{"邮箱": self.email}
)
__main__()
def get_company_user_info(self):
"""获取企业用户信息"""
company_user_data = self.db.find_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
["企业ID", "企业名称", "邮箱", "已认证"]
)
self.name = company_user_data['企业名称']
self.email = company_user_data['邮箱']
self.verify_status = company_user_data['已认证']
return self.dict_to_show(columns=["企业ID", "企业名称", "邮箱", "认证状态"])
def get_avatar(self):
"""获取用户头像"""
self.avatar_id = self.db.find_single_column(
"应用端",
"企业用户",
{"企业ID": self.cid},
"头像fid"
)
avatar = self.db.find_file(
"文件",
"企业用户头像",
self.avatar_id
)
return avatar
def change_avatar(self, avatar_option):
"""修改企业用户头像"""
avatars = {
"avatar_01": "623152edf36000004f00124e",
"avatar_02": "623152edf36000004f001250",
"avatar_03": "623152edf36000004f00124f",
"avatar_04": "623152edf36000004f001251",
"avatar_05": "623152edf36000004f001252",
"avatar_06": "623152edf36000004f001253"
}
self.avatar_id = avatars[avatar_option]
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
{"头像fid": self.avatar_id}
)
def change_password(self, code):
"""修改登录密码"""
self.check_email_not_registered()
self.check_vcode_correct(code)
self.db.update_single_data(
"应用端",
"企业用户",
{"邮箱": self.email},
self.dict_to_save(columns=["密码"])
)
def change_email(self, code):
"""更换邮箱"""
self.check_vcode_correct(code)
self.check_email_registered()
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
self.dict_to_save(columns=["邮箱"])
)
class VerifyMaterialImpl(VerifyMaterial):
"""认证材料实现类"""
db = MongoHelper("tfse_v0.21")
@staticmethod
def read_img_by_base64(img_file):
"""读取base64编码格式的图片"""
data = img_file.read()
try:
encode_str = str(base64.b64encode(data), 'utf-8')
except TypeError:
encode_str = base64.b64encode(data)
return encode_str
def check_is_verified(self):
"""检查企业是否已认证"""
self.verify_status = self.db.find_single_column(
"应用端",
"企业用户",
{"企业ID": self.cid},
"已认证"
)
if self.verify_status == "":
raise ReturnConditionCheckFailed("企业已认证", 200)
def upload_id_card(self):
"""上传身份证"""
def trans_date_format(_time):
"""将解析出来的时间数据加上年月日"""
year = _time[:4]
month = _time[4:6]
day = _time[6:8]
return year + '.' + month + '.' + day
def id_card_recognition(side):
"""调用身份证图片识别接口"""
url = 'http://dm-51.data.aliyun.com/rest/160601/ocr/ocr_idcard.json'
APPCODE = "6d6c3146677c40bf9e3e1e2aa73404cd"
# 请求头
headers = {
'Authorization': 'APPCODE %s' % APPCODE,
'Content-Type': 'application/json; charset=UTF-8'
}
# 请求体
body = {
"configure": {'side': side},
"image": self.read_img_by_base64(self.image)
}
try:
params = json.dumps(body).encode(encoding='UTF8')
req = Request(url, params, headers)
res = urlopen(req)
html = res.read()
result = json.loads(html.decode("utf8"))
if result['is_fake']:
raise ReturnConditionCheckFailed("伪造身份证", 200)
id_card = dict()
if side == 'face':
id_card['姓名'] = result['name']
id_card['性别'] = result['sex']
id_card['民族'] = result['nationality']
id_card['出生'] = "{}{}{}".format(result['birth'][:4], result['birth'][4:6], result['birth'][6:])
id_card['住址'] = result['address']
id_card['身份证号码'] = result['num']
else:
id_card['签发机关'] = result['issue']
id_card['有效期限'] = '{}-{}'.format(trans_date_format(result['start_date']), trans_date_format(result['end_date']))
return id_card
except Exception:
raise ReturnConditionCheckFailed("身份证识别失败", 200)
def id_card_recognition_result():
"""
身份证文本识别
cid: 企业ID
image: 图片
return: 识别结果
"""
# 身份证正面文本识别
result = id_card_recognition(side='face')
# 重置image读取位置不然下一步读不出图片
self.image.seek(0)
# 更新企业认证中身份证的信息
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
{"认证信息.法人姓名": result['姓名'], "认证信息.法人身份证": result['身份证号码']}
)
return {"姓名": result['姓名'], "身份证号码": result['身份证号码']}
def __main__():
""""""
self.check_is_verified()
return id_card_recognition_result()
return __main__()
def upload_business_license(self):
"""上传营业执照"""
def trans_date_format(_time):
"""将解析出来的时间数据加上年月日"""
year = _time[:4]
month = _time[4:6]
day = _time[6:8]
return year + '' + month + '' + day + ''
def business_license_recognition():
"""调用营业执照识别接口"""
url = 'https://dm-58.data.aliyun.com/rest/160601/ocr/ocr_business_license.json'
app_code = "6d6c3146677c40bf9e3e1e2aa73404cd"
headers = {
'Authorization': 'APPCODE %s' % app_code,
'Content-Type': 'application/json; charset=UTF-8'
}
body = {
"image": self.read_img_by_base64(self.image)
}
response = requests.post(url=url, data=json.dumps(body), headers=headers)
try:
res = json.loads(response.text)
data = dict()
data['统一社会信用代码'] = res['reg_num']
data['名称'] = res['name']
data['注册资本'] = res['capital']
data['类型'] = res['type']
data['成立日期'] = trans_date_format(res['establish_date'])
data['法定代表人'] = res['person']
data['经营范围'] = res['business']
data['住所'] = res['address']
return data
except Exception:
raise ReturnConditionCheckFailed("营业执照识别异常", 200)
def business_license_recognition_result():
"""营业执照识别结果"""
# 识别营业执照
result = business_license_recognition()
# 图片识别正确 更新企业认证中营业执照的信息
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
{"认证信息.企业名称": result['名称'], "认证信息.统一社会信用代码": result['统一社会信用代码']}
)
return {"公司名称": result['名称'], "统一社会信用代码": result['统一社会信用代码']}
def _main__():
""""""
self.check_is_verified()
return business_license_recognition_result()
return _main__()
class CompanyVerifyThreeFactorsImpl(CompanyVerifyThreeFactors):
"""企业认证三要素实现类"""
db = MongoHelper("tfse_v0.21")
def company_verify(self):
"""企业认证"""
def get_verify_info():
"""获取企业认证信息"""
record = self.db.find_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
["认证信息.企业名称", "认证信息.法人姓名", "认证信息.统一社会信用代码"]
)
if not record:
raise ReturnConditionCheckFailed("企业认证信息异常", 200)
verify_info = record['认证信息']
self.code = verify_info['统一社会信用代码']
self.name = verify_info['企业名称']
self.legal_person = verify_info['法人姓名']
def check_verify_info_complete():
"""检查认证信息是否完整"""
if None in [self.code, self.name, self.legal_person]:
raise ReturnConditionCheckFailed("企业认证信息异常", 200)
def check_three_elements_by_tyc():
"""调用天眼查企业三要素验证接口"""
token = "5407573b-e41e-4ab4-9f46-daa0032ad0a4"
base = "http://open.api.tianyancha.com/services/open/ic/verify/2.0?code={}&name={}&legalPersonName={}"
url = base.format(self.code, self.name, self.legal_person)
headers = {'Authorization': token}
res = json.loads(requests.get(url, headers=headers).text)
try:
if res['result']['result'] != 1:
raise ReturnConditionCheckFailed("企业认证未通过", 200)
except Exception:
raise ReturnConditionCheckFailed("认证接口异常", 200)
def init_company_data():
"""初始化企业主页数据"""
init_url = "http://api.fecribd.com/etl_tfse/company/init_company_data"
init_token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4"
init_headers = {'token': init_token, "Content-Type": "application/json;charset=UTF-8"}
data = json.dumps({"cid": self.cid, "company_name": self.name})
response = requests.post(init_url, headers=init_headers, data=data)
res = json.loads(response.text)
if res['info'] == "企业数据初始化成功":
raise ReturnConditionCheckFailed("企业数据初始化异常", 200)
else:
company_user = CompanyUser()
company_user.cid = self.cid
company_user.name = self.name
company_user.verify_status = ""
self.db.upsert_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
company_user.dict_to_save(columns=["企业名称", "已认证"])
)
def __main__():
get_verify_info()
check_verify_info_complete()
check_three_elements_by_tyc()
init_company_data()
return __main__()
class EmailVerifyCodeRecordImpl(EmailVerifyCodeRecord):
"""用户通知邮件实现类"""
db = MongoHelper("tfse_v0.21")
def make_verify_code(self):
"""生成邮箱验证码"""
def make_verify_code():
"""随机生成6位数字验证码"""
num_code = ""
for i in range(6):
ch = chr(random.randrange(ord('0'), ord('9') + 1))
num_code += ch
return num_code
self.v_code = make_verify_code()
self.v_period = time.time() + 300
self.db.upsert_single_data(
"应用端",
"邮箱验证码记录",
{"邮箱": self.email},
self.dict_to_save()
)
def transfer_email_api(self):
"""发送邮件"""
# 邮箱服务接口
email_api = 'http://116.63.130.34:30001'
v_types = {
"register": "注册账号",
"resetpwd": "修改密码",
"resetemail": "修改邮箱"
}
headers = {"Content-Type": "application/json;charset=UTF-8"}
data = {
"title": "【远东资信】{}".format(v_types[self.v_type]),
"sender": 'fecribd@fecr.com.cn',
"recipients": [self.email],
"msg_body": "{}的验证码为 【{}5分钟内有效。".format(v_types[self.v_type], self.v_code)
}
response = requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data))
return response.text
def check_email_registered(self):
"""检查邮箱已被注册"""
email = self.db.find_single_column(
"应用端",
"企业用户",
{"邮箱": self.email},
"邮箱"
)
if email is not None:
raise ReturnConditionCheckFailed("邮箱已被注册", 200)
def check_email_not_registered(self):
"""检查邮箱未被注册"""
email = self.db.find_single_column(
"应用端",
"企业用户",
{"邮箱": self.email},
"邮箱"
)
if email is None:
raise ReturnConditionCheckFailed("邮箱未被注册", 200)
def send_email(self):
""""""
if self.v_type == 'register' or self.v_type == 'resetemail':
self.check_email_registered()
self.make_verify_code()
self.transfer_email_api()
elif self.v_type == 'resetpwd':
self.check_email_not_registered()
self.make_verify_code()
self.transfer_email_api()

View File

@ -1,188 +0,0 @@
from werkzeug.security import generate_password_hash
from Utils.ValidateUtil import ValidateAttr, Validate
class VerifyInfo(object):
"""认证信息"""
name = ValidateAttr(field='name', default=None, type=str, error_info='企业名称异常', error_code=200)
code = ValidateAttr(field='code', default=None, type=str, error_info='统一社会信用代码异常', error_code=200)
legal_person = ValidateAttr(field='legal_person', default=None, type=str, error_info='法人姓名异常', error_code=200)
legal_person_id = ValidateAttr(field='legal_person_id', default=None, type=str, error_info='法人身份证号异常', error_code=200)
def dict_to_save(self, **kwargs):
""""""
_dict_ = {
"企业名称": self.name,
"统一社会信用代码": self.code,
"法人姓名": self.legal_person,
"法人身份证": self.legal_person_id,
}
if 'columns' in kwargs:
_dict_ = {key: _dict_[key] for key in kwargs['columns']}
return _dict_
class CompanyUser(object):
"""企业用户"""
cid = ValidateAttr(field='cid', type=str, length=8, error_info='企业ID异常', error_code=200)
name = ValidateAttr(field='name', type=str, default=None, error_info='企业名称异常', error_code=200)
email = ValidateAttr(field='email', func=Validate.email, error_info='邮箱格式异常', error_code=200)
pwd = ValidateAttr(field='pwd', func=Validate.password, error_info='密码强度异常', error_code=200)
avatar_id = ValidateAttr(field='avatar_id', type=str, length=24, error_info='头像ID异常', error_code=200)
verify_status = ValidateAttr(field='verify_status', type=str, in_list=["", ""], error_info='认证状态异常', error_code=200)
verify_info = ValidateAttr(field='verify_info', type=VerifyInfo, error_info='认证信息异常', error_code=200)
register_time = ValidateAttr(field='register_time', default=None, type=str, error_info='注册时间异常', error_code=200)
def dict_to_show(self, **kwargs):
""""""
_dict_ = {
"企业ID": self.cid,
"企业名称": '企业用户' if self.name is None else self.name,
"邮箱": self.email,
"认证状态": "已认证" if self.verify_status == "" else "未认证",
"token": None
}
if 'columns' in kwargs:
_dict_ = {key: _dict_[key] for key in kwargs['columns']}
return _dict_
def dict_to_save(self, **kwargs):
"""存储对象"""
# 对象字段与实例字段的映射
keys_map = {
"cid": "企业ID",
"name": "企业名称",
"email": "邮箱",
"avatar_id": "头像fid",
"pwd": "密码",
"register_time": "注册时间",
"verify_status": "已认证",
"verify_info": "认证信息"
}
# 实例一个存储对象
_dict_ = dict()
# 实例字段列表
obj_keys = [key for key in self.__dict__.keys()]
# 实例的值循环赋值给存储对象
for key in obj_keys:
_dict_[keys_map[key]] = self.__dict__[key]
# 存储对象子集
if 'columns' in kwargs:
_dict_ = {key: _dict_[key] for key in kwargs['columns']}
if '密码' in _dict_:
_dict_['密码'] = generate_password_hash(_dict_['密码'])
return _dict_
def login(self):
"""登录"""
def register(self, **kwargs):
"""注册"""
def get_company_user_info(self):
"""获取企业用户信息"""
def get_avatar(self):
"""获取用户头像"""
def change_avatar(self, **kwargs):
"""修改用户头像"""
def change_password(self, **kwargs):
"""修改登录密码"""
def change_email(self, **kwargs):
"""修改登录邮箱"""
class IDCardFront(object):
"""身份证正面"""
name = ValidateAttr()
gender = ValidateAttr()
nationality = ValidateAttr()
birth = ValidateAttr()
address = ValidateAttr()
id_num = ValidateAttr()
class IDCardBack(object):
"""身份证反面"""
issuing_authority = ValidateAttr()
validity_period = ValidateAttr()
class BusinessLicense(object):
"""营业执照"""
reg_num = ValidateAttr()
name = ValidateAttr()
capital = ValidateAttr()
type = ValidateAttr()
establish_date = ValidateAttr()
person = ValidateAttr()
business = ValidateAttr()
address = ValidateAttr()
class VerifyMaterial(object):
"""认证材料"""
cid = ValidateAttr(field='cid', type=str, length=8, error_info='企业ID异常', error_code=200)
image = ValidateAttr(field='image', func=Validate.image, error_info='图片格式异常', error_code=200)
verify_status = ValidateAttr(field='verify_status', type=str, in_list=["", ""], error_info='认证状态异常', error_code=200)
def upload_id_card(self):
"""上传身份证"""
def upload_business_license(self):
"""上传营业执照"""
class CompanyVerifyThreeFactors(object):
"""企业认证三要素"""
cid = ValidateAttr(field='cid', type=str, length=8)
code = ValidateAttr(field='code', type=str)
name = ValidateAttr(field='name', type=str)
legal_person = ValidateAttr(field='legal_person', type=str)
fields_map = {
"cid": "企业ID",
"code": "统一社会信用代码",
"name": "企业名称",
"legal_person": "法人姓名",
}
def company_verify(self):
"""企业认证"""
class EmailVerifyCodeRecord(object):
"""用户邮件通知"""
email = ValidateAttr(field='email', func=Validate.email, error_info='邮箱格式异常', error_code=200)
v_period = ValidateAttr(field='v_period', type=float, error_info='验证有效期格式异常', error_code=200)
v_code = ValidateAttr(field='v_code', type=str, length=6, error_info='验证码格式异常', error_code=200)
v_type = ValidateAttr(field="v_type", type=str, in_list=["register", "resetemail", "resetpwd"], error_info="邮件验证类型错误", error_code=200)
def dict_to_save(self):
""""""
_dict_ = {
"邮箱": self.email,
"验证有效期": self.v_period,
"验证码": self.v_code
}
return _dict_
def send_email(self):
"""发送邮件"""

View File

@ -1,203 +0,0 @@
from flask import Blueprint, request, Response
from common.rsa import decrypt_data
from Utils.AuthUtil import check_block, verify_token
from Utils.ErrorUtil import ReturnConditionCheckFailed
from CompanyUser.CompanyUserImpl import CompanyUserImpl, EmailVerifyCodeRecordImpl, VerifyMaterialImpl, \
CompanyVerifyThreeFactorsImpl
company_user_route = Blueprint('user', __name__)
@company_user_route.route('/login', methods=['POST'])
@check_block
def login():
"""企业用户登录"""
try:
company_user = CompanyUserImpl()
company_user.email = request.json['email']
try:
company_user.pwd = decrypt_data(encrypt_msg=request.json['pwd'])
except Exception:
return {"info": "登录密码错误"}, 200
result = company_user.login()
return {"info": "登陆成功", "result": result}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.failed_info}, e.status_code
except KeyError:
return {"info": "参数异常"}, 400
@company_user_route.route('/register', methods=['POST'])
@check_block
def register():
"""企业用户注册"""
try:
req = request.json
company_user = CompanyUserImpl()
company_user.email = req['email']
company_user.pwd = req['pwd']
company_user.register(req['code'])
response = {"info": "注册成功"}, 200
return response
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.failed_info}, e.status_code
except KeyError:
return {"info": "参数异常"}, 400
@company_user_route.route('/user_info', methods=['GET'])
@verify_token
def user_info(**kwargs):
"""获取企业用户信息"""
try:
company_user = CompanyUserImpl()
company_user.cid = kwargs['cid']
result = company_user.get_company_user_info()
return {"info": "查询结果", "result": result}
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.failed_info}, e.status_code
except KeyError:
return {"info": "参数异常"}, 400
@company_user_route.route('/get_avatar', methods=['GET'])
@verify_token
def get_avatar(**kwargs):
"""获取头像"""
try:
company_user = CompanyUserImpl()
company_user.cid = kwargs['cid']
response = Response(company_user.get_avatar(), content_type='image/jpeg')
return response
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.failed_info}, e.status_code
except KeyError:
return {"info": "参数异常"}, 400
@company_user_route.route('/change_avatar', methods=['POST'])
@verify_token
def change_avatar(**kwargs):
"""修改头像"""
try:
company_user = CompanyUserImpl()
company_user.cid = kwargs['cid']
company_user.change_avatar(request.json['avatar_id'])
return {"info": "修改成功"}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.failed_info}, e.status_code
except KeyError:
return {"info": "参数异常"}, 400
@company_user_route.route('/change_pwd', methods=['POST'])
@check_block
def change_pwd():
"""修改密码"""
try:
req = request.json
company_user = CompanyUserImpl()
company_user.email = req['email']
company_user.pwd = decrypt_data(encrypt_msg=req['pwd'])
company_user.change_password(req['code'])
return {"info": "修改成功"}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.failed_info}, e.status_code
except KeyError:
return {"info": "参数异常"}, 400
@company_user_route.route('/change_email', methods=['POST'])
@check_block
@verify_token
def change_email(**kwargs):
"""更换账号邮箱"""
try:
req = request.json
company_user = CompanyUserImpl()
company_user.cid = kwargs['cid']
company_user.email = req['email']
company_user.change_email(req['code'])
return {"info": "修改成功"}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.failed_info}, e.status_code
except KeyError:
return {"info": "参数异常"}, 400
@company_user_route.route('/verify_email', methods=['POST'])
@check_block
def verify_email():
"""发送邮箱验证码"""
try:
req = request.json
email_verify = EmailVerifyCodeRecordImpl()
email_verify.email = req['email']
email_verify.v_type = req['v_type']
email_verify.send_email()
return {"info": "邮件已发送"}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.failed_info}, e.status_code
except KeyError:
return {"info": "参数异常"}, 400
@company_user_route.route('/upload_id_card', methods=['POST'])
@verify_token
def upload_id_card(**kwargs):
"""上传身份证"""
try:
material = VerifyMaterialImpl()
material.cid = kwargs['cid']
material.image = request.files['image']
result = material.upload_id_card()
return {"info": "识别结果", "result": result}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.failed_info}, e.status_code
except KeyError:
return {"info": "参数异常"}, 400
@company_user_route.route('/upload_business_license', methods=['POST'])
@verify_token
def upload_business_license(**kwargs):
"""上传营业执照"""
try:
material = VerifyMaterialImpl()
material.cid = kwargs['cid']
material.image = request.files['image']
result = material.upload_business_license()
return {"info": "识别结果", "result": result}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.failed_info}, e.status_code
except KeyError:
return {"info": "参数异常"}, 400
@company_user_route.route('/company_verify', methods=['GET'])
@verify_token
def company_verify_route(**kwargs):
"""企业认证"""
try:
company_verify = CompanyVerifyThreeFactorsImpl()
company_verify.cid = kwargs['cid']
company_verify.company_verify()
return {"info": "认证成功"}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.failed_info}, e.status_code
except KeyError:
return {"info": "参数异常"}, 400

View File

@ -0,0 +1,258 @@
import base64
import json
from urllib.request import urlopen, Request
import requests
from DBHelper.MongoHelper import MongoHelper
from ObjectsInProject.User.RealCompanyVerify import UploadMaterial, RealCompanyVerify
from ObjectsInProject.User.TFSECompanyUser import TFSECompanyUser
from Utils.ErrorUtil import ReturnConditionCheckFailed
class UploadMaterialImpl(UploadMaterial):
"""认证材料实现类"""
db = MongoHelper("tfse_v0.21")
@staticmethod
def read_img_by_base64(img_file):
"""读取base64编码格式的图片"""
data = img_file.read()
try:
encode_str = str(base64.b64encode(data), 'utf-8')
except TypeError:
encode_str = base64.b64encode(data)
return encode_str
def check_is_verified(self):
"""检查企业是否已认证"""
self.verify_status = self.db.find_single_column(
"应用端",
"企业用户",
{"企业ID": self.cid},
"已认证"
)
if self.verify_status == "":
raise ReturnConditionCheckFailed(failed_info="企业已认证", status_code=200)
def upload_id_card(self):
"""上传身份证"""
def trans_date_format(_time):
"""将解析出来的时间数据加上年月日"""
year = _time[:4]
month = _time[4:6]
day = _time[6:8]
return year + '.' + month + '.' + day
def id_card_recognition(side):
"""调用身份证图片识别接口"""
url = 'http://dm-51.data.aliyun.com/rest/160601/ocr/ocr_idcard.json'
APPCODE = "6d6c3146677c40bf9e3e1e2aa73404cd"
# 请求头
headers = {
'Authorization': 'APPCODE %s' % APPCODE,
'Content-Type': 'application/json; charset=UTF-8'
}
# 请求体
body = {
"configure": {'side': side},
"image": self.read_img_by_base64(self.image)
}
try:
params = json.dumps(body).encode(encoding='UTF8')
req = Request(url, params, headers)
res = urlopen(req)
html = res.read()
result = json.loads(html.decode("utf8"))
if result['is_fake']:
raise ReturnConditionCheckFailed("伪造身份证", 200)
id_card = dict()
if side == 'face':
id_card['姓名'] = result['name']
id_card['性别'] = result['sex']
id_card['民族'] = result['nationality']
id_card['出生'] = "{}{}{}".format(result['birth'][:4], result['birth'][4:6], result['birth'][6:])
id_card['住址'] = result['address']
id_card['身份证号码'] = result['num']
else:
id_card['签发机关'] = result['issue']
id_card['有效期限'] = '{}-{}'.format(trans_date_format(result['start_date']), trans_date_format(result['end_date']))
return id_card
except Exception:
raise ReturnConditionCheckFailed("身份证识别失败", 200)
def id_card_recognition_result():
"""
身份证文本识别
cid: 企业ID
image: 图片
return: 识别结果
"""
# 身份证正面文本识别
result = id_card_recognition(side='face')
# 重置image读取位置不然下一步读不出图片
self.image.seek(0)
# 更新企业认证中身份证的信息
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
{"认证信息.法人姓名": result['姓名'], "认证信息.法人身份证": result['身份证号码']}
)
return {"姓名": result['姓名'], "身份证号码": result['身份证号码']}
def __main__():
self.check_is_verified()
return id_card_recognition_result()
return __main__()
def upload_business_license(self):
"""上传营业执照"""
def trans_date_format(_time):
"""将解析出来的时间数据加上年月日"""
year = _time[:4]
month = _time[4:6]
day = _time[6:8]
return year + '' + month + '' + day + ''
def business_license_recognition():
"""调用营业执照识别接口"""
url = 'https://dm-58.data.aliyun.com/rest/160601/ocr/ocr_business_license.json'
app_code = "6d6c3146677c40bf9e3e1e2aa73404cd"
headers = {
'Authorization': 'APPCODE %s' % app_code,
'Content-Type': 'application/json; charset=UTF-8'
}
body = {
"image": self.read_img_by_base64(self.image)
}
response = requests.post(url=url, data=json.dumps(body), headers=headers)
try:
res = json.loads(response.text)
data = dict()
data['统一社会信用代码'] = res['reg_num']
data['名称'] = res['name']
data['注册资本'] = res['capital']
data['类型'] = res['type']
data['成立日期'] = trans_date_format(res['establish_date'])
data['法定代表人'] = res['person']
data['经营范围'] = res['business']
data['住所'] = res['address']
return data
except Exception:
raise ReturnConditionCheckFailed("营业执照识别异常", 200)
def business_license_recognition_result():
"""营业执照识别结果"""
# 识别营业执照
result = business_license_recognition()
# 图片识别正确 更新企业认证中营业执照的信息
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
{"认证信息.企业名称": result['名称'], "认证信息.统一社会信用代码": result['统一社会信用代码']}
)
return {"公司名称": result['名称'], "统一社会信用代码": result['统一社会信用代码']}
def _main_():
self.check_is_verified()
return business_license_recognition_result()
return _main_()
class RealCompanyVerifyImpl(RealCompanyVerify):
"""企业认证三要素实现类"""
db = MongoHelper("tfse_v0.21")
def verify(self):
"""企业认证"""
def get_verify_info():
"""获取企业认证信息"""
record = self.db.find_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
["认证信息.企业名称", "认证信息.法人姓名", "认证信息.统一社会信用代码"]
)
if not record:
raise ReturnConditionCheckFailed("企业认证信息异常", 200)
verify_info = record['认证信息']
self.code = verify_info['统一社会信用代码']
self.name = verify_info['企业名称']
self.legal_person = verify_info['法人姓名']
def check_verify_info_complete():
"""检查认证信息是否完整"""
if None in [self.code, self.name, self.legal_person]:
raise ReturnConditionCheckFailed("企业认证信息异常", 200)
def check_three_elements_by_tyc():
"""调用天眼查企业三要素验证接口"""
token = "5407573b-e41e-4ab4-9f46-daa0032ad0a4"
base = "http://open.api.tianyancha.com/services/open/ic/verify/2.0?code={}&name={}&legalPersonName={}"
url = base.format(self.code, self.name, self.legal_person)
headers = {'Authorization': token}
res = json.loads(requests.get(url, headers=headers).text)
try:
if res['result']['result'] != 1:
raise ReturnConditionCheckFailed("企业认证未通过", 200)
except Exception:
raise ReturnConditionCheckFailed("认证接口异常", 200)
def init_company_data():
"""初始化企业主页数据"""
init_url = "http://api.fecribd.com/etl_tfse/company/init_company_data"
init_token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4"
init_headers = {'token': init_token, "Content-Type": "application/json;charset=UTF-8"}
data = json.dumps({"cid": self.cid, "company_name": self.name})
response = requests.post(init_url, headers=init_headers, data=data)
res = json.loads(response.text)
if res['info'] == "企业数据初始化成功":
raise ReturnConditionCheckFailed("企业数据初始化异常", 200)
else:
company_user = TFSECompanyUser()
company_user.cid = self.cid
company_user.name = self.name
company_user.verify_status = ""
self.db.upsert_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
company_user.dict_keys_toggle(columns=["企业名称", "已认证"])
)
def __main__():
get_verify_info()
check_verify_info_complete()
check_three_elements_by_tyc()
init_company_data()
return __main__()

View File

@ -0,0 +1,93 @@
from Utils.ObjUtil import SpecObject
from Utils.ValidateUtil import ValidateAttr, Validate
class IDCardFront(SpecObject):
"""身份证正面"""
name = ValidateAttr(field='name', type=str)
gender = ValidateAttr(field='gender', in_list=['', ''])
nationality = ValidateAttr(field='nationality', type=str)
birth_day = ValidateAttr(field='birth_day', func=Validate.date_format)
address = ValidateAttr(field='address', type=str)
id_num = ValidateAttr(field='id_num', type=str)
fields_map = {
'姓名': 'name',
'性别': 'gender',
'民族': 'nationality',
'出生': 'birth_day',
'住址': 'address',
'身份证号码': 'id_num'
}
class IDCardBack(SpecObject):
"""身份证反面"""
issuing_authority = ValidateAttr(field='issuing_authority', type=str)
validity_period = ValidateAttr(field='validity_period', type=str)
fields_map = {
"签发机关": "issuing_authority",
"有效期限": "validity_period"
}
class BusinessLicense(SpecObject):
"""营业执照"""
reg_num = ValidateAttr(field='reg_num', type=str)
name = ValidateAttr(field='name', type=str)
capital = ValidateAttr(field='capital', type=str)
type = ValidateAttr(field='type', type=str)
establish_date = ValidateAttr(field='establish_date', type=str)
person = ValidateAttr(field='person', type=str)
business = ValidateAttr(field='business', type=str)
address = ValidateAttr(field='address', type=str)
fields_map = {
'统一社会信用代码': "reg_num",
'名称': "name",
'注册资本': "capital",
'类型': "type",
'成立日期': "establish_date",
'法定代表人': "person",
'经营范围': "business",
'住所': "address"
}
class UploadMaterial(SpecObject):
"""上传材料"""
cid = ValidateAttr(field='cid', type=str, length=8)
image = ValidateAttr(field='image', func=Validate.image, mark='图片格式')
verify_status = ValidateAttr(field='verify_status', in_list=["", ""], mark='认证状态')
fields_map = {
"cid": "企业ID",
"image": "图片",
"verify_status": "已认证"
}
def upload_id_card(self):
"""上传身份证"""
def upload_business_license(self):
"""上传营业执照"""
class RealCompanyVerify(object):
"""企业认证三要素"""
cid = ValidateAttr(field='cid', type=str, length=8)
code = ValidateAttr(field='code', type=str)
name = ValidateAttr(field='name', type=str)
legal_person = ValidateAttr(field='legal_person', type=str)
fields_map = {
"cid": "企业ID",
"code": "统一社会信用代码",
"name": "企业名称",
"legal_person": "法人姓名",
}
def verify(self):
"""企业认证"""

View File

@ -1,11 +1,10 @@
from flask import Blueprint, request, Response
from Implements.User.RealCompanyVerifyImpl import UploadMaterialImpl, RealCompanyVerifyImpl
from Implements.User.TFSECompanyUserImpl import TFSECompanyUserImpl
from common.rsa import decrypt_data
from Utils.AuthUtil import check_block, verify_token
from Utils.ErrorUtil import ReturnConditionCheckFailed
from CompanyUser.CompanyUserImpl import EmailVerifyCodeRecordImpl, VerifyMaterialImpl, \
CompanyVerifyThreeFactorsImpl
company_user_route = Blueprint('user', __name__)
@ -139,30 +138,12 @@ def change_email(**kwargs):
return {"info": "参数异常"}, 400
@company_user_route.route('/verify_email', methods=['POST'])
@check_block
def verify_email():
"""发送邮箱验证码"""
try:
req = request.json
email_verify = EmailVerifyCodeRecordImpl()
email_verify.email = req['email']
email_verify.v_type = req['v_type']
email_verify.send_email()
return {"info": "邮件已发送"}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.failed_info}, e.status_code
except KeyError:
return {"info": "参数异常"}, 400
@company_user_route.route('/upload_id_card', methods=['POST'])
@verify_token
def upload_id_card(**kwargs):
"""上传身份证"""
try:
material = VerifyMaterialImpl()
material = UploadMaterialImpl()
material.cid = kwargs['cid']
material.image = request.files['image']
result = material.upload_id_card()
@ -179,7 +160,7 @@ def upload_id_card(**kwargs):
def upload_business_license(**kwargs):
"""上传营业执照"""
try:
material = VerifyMaterialImpl()
material = UploadMaterialImpl()
material.cid = kwargs['cid']
material.image = request.files['image']
result = material.upload_business_license()
@ -196,9 +177,9 @@ def upload_business_license(**kwargs):
def company_verify_route(**kwargs):
"""企业认证"""
try:
company_verify = CompanyVerifyThreeFactorsImpl()
company_verify.cid = kwargs['cid']
company_verify.company_verify()
impl = RealCompanyVerifyImpl()
impl.cid = kwargs['cid']
impl.verify()
return {"info": "认证成功"}, 200
except ReturnConditionCheckFailed as e:
e.log_error()