565 lines
19 KiB
Python
565 lines
19 KiB
Python
import json
|
||
import time
|
||
import random
|
||
import base64
|
||
import requests
|
||
from urllib.request import Request, urlopen
|
||
|
||
from werkzeug.security import check_password_hash
|
||
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
|
||
|
||
from DBHelper.MongoHelper import MongoHelper
|
||
from Utils.ErrorUtil import ReturnConditionCheckFailed
|
||
from CompanyUser.CompanyUserObj import CompanyUser, VerifyInfo, EmailVerifyCodeRecord, VerifyMaterial, \
|
||
CompanyVerifyThreeFactors
|
||
|
||
|
||
class CompanyUserImpl(CompanyUser):
|
||
"""企业用户实现类"""
|
||
|
||
db = MongoHelper("tfse_v0.21")
|
||
|
||
def check_email_registered(self):
|
||
"""检查邮箱已被注册"""
|
||
email = self.db.find_single_column(
|
||
"应用端",
|
||
"企业用户",
|
||
{"邮箱": self.email},
|
||
"邮箱"
|
||
)
|
||
if email is not None:
|
||
raise ReturnConditionCheckFailed("邮箱已被注册", 200)
|
||
|
||
def check_email_not_registered(self):
|
||
"""检查邮箱未被注册"""
|
||
email = self.db.find_single_column(
|
||
"应用端",
|
||
"企业用户",
|
||
{"邮箱": self.email},
|
||
"邮箱"
|
||
)
|
||
if email is None:
|
||
raise ReturnConditionCheckFailed("邮箱未被注册", 200)
|
||
|
||
def check_vcode_correct(self, code):
|
||
"""检查邮箱验证码是否正确"""
|
||
verify_data = self.db.find_single_data(
|
||
"应用端",
|
||
"邮箱验证码记录",
|
||
{"邮箱": self.email},
|
||
["验证码", "验证有效期"]
|
||
)
|
||
|
||
if not verify_data:
|
||
raise ReturnConditionCheckFailed("验证码错误", 200)
|
||
|
||
if verify_data['验证码'] != code:
|
||
raise ReturnConditionCheckFailed("验证码错误", 200)
|
||
|
||
if time.time() - verify_data['验证有效期'] > 300:
|
||
raise ReturnConditionCheckFailed('验证码过期', 200)
|
||
|
||
def login(self):
|
||
"""登录"""
|
||
|
||
company_user = self.db.find_single_data(
|
||
"应用端",
|
||
"企业用户",
|
||
{"邮箱": self.email},
|
||
["企业ID", "企业名称", "邮箱", "密码", "已认证"]
|
||
)
|
||
|
||
def check_user_existed():
|
||
"""检查用户是否存在"""
|
||
if company_user is None:
|
||
raise ReturnConditionCheckFailed("不存在该用户", 200)
|
||
|
||
def check_password_correct():
|
||
"""检查密码是否正确"""
|
||
if not check_password_hash(company_user['密码'], self.pwd):
|
||
raise ReturnConditionCheckFailed("登录密码错误", 200)
|
||
|
||
def create_token(param):
|
||
"""
|
||
创建token
|
||
param: 传入参数,用于创建token
|
||
return: 用户token
|
||
"""
|
||
secret_key = '0FTuOi^#Afx1@2@F'
|
||
token_expire = 14400
|
||
s = Serializer(secret_key, expires_in=token_expire)
|
||
token = '' + s.dumps(param).decode('ascii')
|
||
return token
|
||
|
||
def __main__():
|
||
""""""
|
||
check_user_existed()
|
||
check_password_correct()
|
||
|
||
self.cid = company_user['企业ID']
|
||
self.name = company_user['企业名称']
|
||
self.verify_status = company_user['已认证']
|
||
|
||
return_body = self.dict_to_show(columns=["企业名称", "认证状态", "token"])
|
||
return_body['token'] = create_token({"cid": self.cid})
|
||
|
||
return return_body
|
||
|
||
return __main__()
|
||
|
||
def register(self, code):
|
||
"""注册"""
|
||
|
||
def make_new_cid():
|
||
"""生成新的企业ID,如果该ID存在,则重新生成"""
|
||
|
||
def random_cid(num):
|
||
"""随机企业ID"""
|
||
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
|
||
salt = ''
|
||
for i in range(num):
|
||
salt += random.choice(choices)
|
||
return salt
|
||
|
||
new_cid = random_cid(8)
|
||
|
||
case = self.db.find_single_column(
|
||
"应用端",
|
||
"企业用户",
|
||
{"企业ID": new_cid},
|
||
"企业ID"
|
||
) is not None
|
||
while case:
|
||
new_cid = random_cid(8)
|
||
return new_cid
|
||
|
||
def __main__():
|
||
""""""
|
||
self.check_email_registered()
|
||
self.check_vcode_correct(code)
|
||
|
||
self.cid = make_new_cid()
|
||
self.avatar_id = "623152edf36000004f00124e"
|
||
self.verify_status = "否"
|
||
self.verify_info = VerifyInfo()
|
||
|
||
self.db.insert_single_data(
|
||
"应用端",
|
||
"企业用户",
|
||
self.dict_to_save()
|
||
)
|
||
|
||
self.db.delete_single_data(
|
||
"应用端",
|
||
"邮箱验证码记录",
|
||
{"邮箱": self.email}
|
||
)
|
||
|
||
__main__()
|
||
|
||
def get_company_user_info(self):
|
||
"""获取企业用户信息"""
|
||
company_user_data = self.db.find_single_data(
|
||
"应用端",
|
||
"企业用户",
|
||
{"企业ID": self.cid},
|
||
["企业ID", "企业名称", "邮箱", "已认证"]
|
||
)
|
||
|
||
self.name = company_user_data['企业名称']
|
||
self.email = company_user_data['邮箱']
|
||
self.verify_status = company_user_data['已认证']
|
||
|
||
return self.dict_to_show(columns=["企业ID", "企业名称", "邮箱", "认证状态"])
|
||
|
||
def get_avatar(self):
|
||
"""获取用户头像"""
|
||
|
||
self.avatar_id = self.db.find_single_column(
|
||
"应用端",
|
||
"企业用户",
|
||
{"企业ID": self.cid},
|
||
"头像fid"
|
||
)
|
||
|
||
avatar = self.db.find_file(
|
||
"文件",
|
||
"企业用户头像",
|
||
self.avatar_id
|
||
)
|
||
|
||
return avatar
|
||
|
||
def change_avatar(self, avatar_option):
|
||
"""修改企业用户头像"""
|
||
avatars = {
|
||
"avatar_01": "623152edf36000004f00124e",
|
||
"avatar_02": "623152edf36000004f001250",
|
||
"avatar_03": "623152edf36000004f00124f",
|
||
"avatar_04": "623152edf36000004f001251",
|
||
"avatar_05": "623152edf36000004f001252",
|
||
"avatar_06": "623152edf36000004f001253"
|
||
}
|
||
self.avatar_id = avatars[avatar_option]
|
||
self.db.update_single_data(
|
||
"应用端",
|
||
"企业用户",
|
||
{"企业ID": self.cid},
|
||
{"头像fid": self.avatar_id}
|
||
)
|
||
|
||
def change_password(self, code):
|
||
"""修改登录密码"""
|
||
self.check_email_not_registered()
|
||
self.check_vcode_correct(code)
|
||
self.db.update_single_data(
|
||
"应用端",
|
||
"企业用户",
|
||
{"邮箱": self.email},
|
||
self.dict_to_save(columns=["密码"])
|
||
)
|
||
|
||
def change_email(self, code):
|
||
"""更换邮箱"""
|
||
self.check_vcode_correct(code)
|
||
self.check_email_registered()
|
||
self.db.update_single_data(
|
||
"应用端",
|
||
"企业用户",
|
||
{"企业ID": self.cid},
|
||
self.dict_to_save(columns=["邮箱"])
|
||
)
|
||
|
||
|
||
class VerifyMaterialImpl(VerifyMaterial):
|
||
"""认证材料实现类"""
|
||
|
||
db = MongoHelper("tfse_v0.21")
|
||
|
||
@staticmethod
|
||
def read_img_by_base64(img_file):
|
||
"""读取base64编码格式的图片"""
|
||
data = img_file.read()
|
||
try:
|
||
encode_str = str(base64.b64encode(data), 'utf-8')
|
||
except TypeError:
|
||
encode_str = base64.b64encode(data)
|
||
return encode_str
|
||
|
||
def check_is_verified(self):
|
||
"""检查企业是否已认证"""
|
||
self.verify_status = self.db.find_single_column(
|
||
"应用端",
|
||
"企业用户",
|
||
{"企业ID": self.cid},
|
||
"已认证"
|
||
)
|
||
if self.verify_status == "是":
|
||
raise ReturnConditionCheckFailed("企业已认证", 200)
|
||
|
||
def upload_id_card(self):
|
||
"""上传身份证"""
|
||
|
||
def trans_date_format(_time):
|
||
"""将解析出来的时间数据加上年月日"""
|
||
year = _time[:4]
|
||
month = _time[4:6]
|
||
day = _time[6:8]
|
||
return year + '.' + month + '.' + day
|
||
|
||
def id_card_recognition(side):
|
||
"""调用身份证图片识别接口"""
|
||
|
||
url = 'http://dm-51.data.aliyun.com/rest/160601/ocr/ocr_idcard.json'
|
||
APPCODE = "6d6c3146677c40bf9e3e1e2aa73404cd"
|
||
|
||
# 请求头
|
||
headers = {
|
||
'Authorization': 'APPCODE %s' % APPCODE,
|
||
'Content-Type': 'application/json; charset=UTF-8'
|
||
}
|
||
|
||
# 请求体
|
||
body = {
|
||
"configure": {'side': side},
|
||
"image": self.read_img_by_base64(self.image)
|
||
}
|
||
|
||
try:
|
||
params = json.dumps(body).encode(encoding='UTF8')
|
||
req = Request(url, params, headers)
|
||
res = urlopen(req)
|
||
html = res.read()
|
||
result = json.loads(html.decode("utf8"))
|
||
|
||
if result['is_fake']:
|
||
raise ReturnConditionCheckFailed("伪造身份证", 200)
|
||
|
||
id_card = dict()
|
||
if side == 'face':
|
||
id_card['姓名'] = result['name']
|
||
id_card['性别'] = result['sex']
|
||
id_card['民族'] = result['nationality']
|
||
id_card['出生'] = "{}年{}月{}日".format(result['birth'][:4], result['birth'][4:6], result['birth'][6:])
|
||
id_card['住址'] = result['address']
|
||
id_card['身份证号码'] = result['num']
|
||
else:
|
||
id_card['签发机关'] = result['issue']
|
||
id_card['有效期限'] = '{}-{}'.format(trans_date_format(result['start_date']), trans_date_format(result['end_date']))
|
||
return id_card
|
||
except Exception:
|
||
raise ReturnConditionCheckFailed("身份证识别失败", 200)
|
||
|
||
def id_card_recognition_result():
|
||
"""
|
||
身份证文本识别
|
||
cid: 企业ID
|
||
image: 图片
|
||
return: 识别结果
|
||
"""
|
||
|
||
# 身份证正面文本识别
|
||
result = id_card_recognition(side='face')
|
||
|
||
# 重置image读取位置,不然下一步读不出图片
|
||
self.image.seek(0)
|
||
|
||
# 更新企业认证中身份证的信息
|
||
self.db.update_single_data(
|
||
"应用端",
|
||
"企业用户",
|
||
{"企业ID": self.cid},
|
||
{"认证信息.法人姓名": result['姓名'], "认证信息.法人身份证": result['身份证号码']}
|
||
)
|
||
|
||
return {"姓名": result['姓名'], "身份证号码": result['身份证号码']}
|
||
|
||
def __main__():
|
||
""""""
|
||
self.check_is_verified()
|
||
return id_card_recognition_result()
|
||
|
||
return __main__()
|
||
|
||
def upload_business_license(self):
|
||
"""上传营业执照"""
|
||
|
||
def trans_date_format(_time):
|
||
"""将解析出来的时间数据加上年月日"""
|
||
year = _time[:4]
|
||
month = _time[4:6]
|
||
day = _time[6:8]
|
||
return year + '年' + month + '月' + day + '日'
|
||
|
||
def business_license_recognition():
|
||
"""调用营业执照识别接口"""
|
||
url = 'https://dm-58.data.aliyun.com/rest/160601/ocr/ocr_business_license.json'
|
||
app_code = "6d6c3146677c40bf9e3e1e2aa73404cd"
|
||
|
||
headers = {
|
||
'Authorization': 'APPCODE %s' % app_code,
|
||
'Content-Type': 'application/json; charset=UTF-8'
|
||
}
|
||
|
||
body = {
|
||
"image": self.read_img_by_base64(self.image)
|
||
}
|
||
|
||
response = requests.post(url=url, data=json.dumps(body), headers=headers)
|
||
|
||
try:
|
||
res = json.loads(response.text)
|
||
data = dict()
|
||
data['统一社会信用代码'] = res['reg_num']
|
||
data['名称'] = res['name']
|
||
data['注册资本'] = res['capital']
|
||
data['类型'] = res['type']
|
||
data['成立日期'] = trans_date_format(res['establish_date'])
|
||
data['法定代表人'] = res['person']
|
||
data['经营范围'] = res['business']
|
||
data['住所'] = res['address']
|
||
return data
|
||
except Exception:
|
||
raise ReturnConditionCheckFailed("营业执照识别异常", 200)
|
||
|
||
def business_license_recognition_result():
|
||
"""营业执照识别结果"""
|
||
|
||
# 识别营业执照
|
||
result = business_license_recognition()
|
||
|
||
# 图片识别正确 更新企业认证中营业执照的信息
|
||
self.db.update_single_data(
|
||
"应用端",
|
||
"企业用户",
|
||
{"企业ID": self.cid},
|
||
{"认证信息.企业名称": result['名称'], "认证信息.统一社会信用代码": result['统一社会信用代码']}
|
||
)
|
||
return {"公司名称": result['名称'], "统一社会信用代码": result['统一社会信用代码']}
|
||
|
||
def _main__():
|
||
""""""
|
||
self.check_is_verified()
|
||
return business_license_recognition_result()
|
||
|
||
return _main__()
|
||
|
||
|
||
class CompanyVerifyThreeFactorsImpl(CompanyVerifyThreeFactors):
|
||
"""企业认证三要素实现类"""
|
||
|
||
db = MongoHelper("tfse_v0.21")
|
||
|
||
def company_verify(self):
|
||
"""企业认证"""
|
||
|
||
def get_verify_info():
|
||
"""获取企业认证信息"""
|
||
record = self.db.find_single_data(
|
||
"应用端",
|
||
"企业用户",
|
||
{"企业ID": self.cid},
|
||
["认证信息.企业名称", "认证信息.法人姓名", "认证信息.统一社会信用代码"]
|
||
)
|
||
|
||
if not record:
|
||
raise ReturnConditionCheckFailed("企业认证信息异常", 200)
|
||
|
||
verify_info = record['认证信息']
|
||
self.code = verify_info['统一社会信用代码']
|
||
self.name = verify_info['企业名称']
|
||
self.legal_person = verify_info['法人姓名']
|
||
|
||
def check_verify_info_complete():
|
||
"""检查认证信息是否完整"""
|
||
if None in [self.code, self.name, self.legal_person]:
|
||
raise ReturnConditionCheckFailed("企业认证信息异常", 200)
|
||
|
||
def check_three_elements_by_tyc():
|
||
"""调用天眼查企业三要素验证接口"""
|
||
token = "5407573b-e41e-4ab4-9f46-daa0032ad0a4"
|
||
base = "http://open.api.tianyancha.com/services/open/ic/verify/2.0?code={}&name={}&legalPersonName={}"
|
||
url = base.format(self.code, self.name, self.legal_person)
|
||
headers = {'Authorization': token}
|
||
res = json.loads(requests.get(url, headers=headers).text)
|
||
try:
|
||
if res['result']['result'] != 1:
|
||
raise ReturnConditionCheckFailed("企业认证未通过", 200)
|
||
except Exception:
|
||
raise ReturnConditionCheckFailed("认证接口异常", 200)
|
||
|
||
def init_company_data():
|
||
"""初始化企业主页数据"""
|
||
init_url = "http://api.fecribd.com/etl_tfse/company/init_company_data"
|
||
init_token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4"
|
||
init_headers = {'token': init_token, "Content-Type": "application/json;charset=UTF-8"}
|
||
data = json.dumps({"cid": self.cid, "company_name": self.name})
|
||
response = requests.post(init_url, headers=init_headers, data=data)
|
||
|
||
res = json.loads(response.text)
|
||
if res['info'] == "企业数据初始化成功":
|
||
raise ReturnConditionCheckFailed("企业数据初始化异常", 200)
|
||
else:
|
||
company_user = CompanyUser()
|
||
company_user.cid = self.cid
|
||
company_user.name = self.name
|
||
company_user.verify_status = "是"
|
||
|
||
self.db.upsert_single_data(
|
||
"应用端",
|
||
"企业用户",
|
||
{"企业ID": self.cid},
|
||
company_user.dict_to_save(columns=["企业名称", "已认证"])
|
||
)
|
||
|
||
def __main__():
|
||
get_verify_info()
|
||
check_verify_info_complete()
|
||
check_three_elements_by_tyc()
|
||
init_company_data()
|
||
|
||
return __main__()
|
||
|
||
|
||
class EmailVerifyCodeRecordImpl(EmailVerifyCodeRecord):
|
||
"""用户通知邮件实现类"""
|
||
|
||
db = MongoHelper("tfse_v0.21")
|
||
|
||
def make_verify_code(self):
|
||
"""生成邮箱验证码"""
|
||
|
||
def make_verify_code():
|
||
"""随机生成6位数字验证码"""
|
||
num_code = ""
|
||
for i in range(6):
|
||
ch = chr(random.randrange(ord('0'), ord('9') + 1))
|
||
num_code += ch
|
||
return num_code
|
||
|
||
self.v_code = make_verify_code()
|
||
self.v_period = time.time() + 300
|
||
self.db.upsert_single_data(
|
||
"应用端",
|
||
"邮箱验证码记录",
|
||
{"邮箱": self.email},
|
||
self.dict_to_save()
|
||
)
|
||
|
||
def transfer_email_api(self):
|
||
"""发送邮件"""
|
||
# 邮箱服务接口
|
||
email_api = 'http://116.63.130.34:30001'
|
||
|
||
v_types = {
|
||
"register": "注册账号",
|
||
"resetpwd": "修改密码",
|
||
"resetemail": "修改邮箱"
|
||
}
|
||
|
||
headers = {"Content-Type": "application/json;charset=UTF-8"}
|
||
data = {
|
||
"title": "【远东资信】{}".format(v_types[self.v_type]),
|
||
"sender": 'fecribd@fecr.com.cn',
|
||
"recipients": [self.email],
|
||
"msg_body": "您{}的验证码为 【{}】,5分钟内有效。".format(v_types[self.v_type], self.v_code)
|
||
}
|
||
|
||
response = requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data))
|
||
return response.text
|
||
|
||
def check_email_registered(self):
|
||
"""检查邮箱已被注册"""
|
||
email = self.db.find_single_column(
|
||
"应用端",
|
||
"企业用户",
|
||
{"邮箱": self.email},
|
||
"邮箱"
|
||
)
|
||
if email is not None:
|
||
raise ReturnConditionCheckFailed("邮箱已被注册", 200)
|
||
|
||
def check_email_not_registered(self):
|
||
"""检查邮箱未被注册"""
|
||
email = self.db.find_single_column(
|
||
"应用端",
|
||
"企业用户",
|
||
{"邮箱": self.email},
|
||
"邮箱"
|
||
)
|
||
if email is None:
|
||
raise ReturnConditionCheckFailed("邮箱未被注册", 200)
|
||
|
||
def send_email(self):
|
||
""""""
|
||
|
||
if self.v_type == 'register' or self.v_type == 'resetemail':
|
||
self.check_email_registered()
|
||
self.make_verify_code()
|
||
self.transfer_email_api()
|
||
|
||
elif self.v_type == 'resetpwd':
|
||
self.check_email_not_registered()
|
||
self.make_verify_code()
|
||
self.transfer_email_api()
|