tfse-app-api-v0.2/CompanyUser/CompanyUserImpl.py

566 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import time
import random
import base64
import requests
from urllib.request import Request, urlopen
from werkzeug.security import check_password_hash
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from DBHelper.MongoHelper import MongoHelper
from Utils.ErrorUtil import ReturnConditionCheckFailed, LogConditionCheckFailed
from CompanyUser.CompanyUserObj import CompanyUser, VerifyInfo, EmailVerifyCodeRecord, VerifyMaterial, \
CompanyVerifyThreeFactors
from Utils.ValidateUtil import ValidateAttr
class CompanyUserImpl(CompanyUser):
"""企业用户实现类"""
db = MongoHelper("tfse_v0.21")
def check_email_registered(self):
"""检查邮箱已被注册"""
email = self.db.find_single_column(
"应用端",
"企业用户",
{"邮箱": self.email},
"邮箱"
)
if email is not None:
raise ReturnConditionCheckFailed("邮箱已被注册", 200)
def check_email_not_registered(self):
"""检查邮箱未被注册"""
email = self.db.find_single_column(
"应用端",
"企业用户",
{"邮箱": self.email},
"邮箱"
)
if email is None:
raise ReturnConditionCheckFailed("邮箱未被注册", 200)
def check_vcode_correct(self, code):
"""检查邮箱验证码是否正确"""
verify_data = self.db.find_single_data(
"应用端",
"邮箱验证码记录",
{"邮箱": self.email},
["验证码", "验证有效期"]
)
if not verify_data:
raise ReturnConditionCheckFailed("验证码错误", 200)
if verify_data['验证码'] != code:
raise ReturnConditionCheckFailed("验证码错误", 200)
if time.time() - verify_data['验证有效期'] > 300:
raise ReturnConditionCheckFailed('验证码过期', 200)
def login(self):
"""登录"""
company_user = self.db.find_single_data(
"应用端",
"企业用户",
{"邮箱": self.email},
["企业ID", "企业名称", "邮箱", "密码", "已认证"]
)
def check_user_existed():
"""检查用户是否存在"""
if company_user is None:
raise ReturnConditionCheckFailed("不存在该用户", 200)
def check_password_correct():
"""检查密码是否正确"""
if not check_password_hash(company_user['密码'], self.pwd):
raise ReturnConditionCheckFailed("登录密码错误", 200)
def create_token(param):
"""
创建token
param: 传入参数用于创建token
return: 用户token
"""
secret_key = '0FTuOi^#Afx1@2@F'
token_expire = 14400
s = Serializer(secret_key, expires_in=token_expire)
token = '' + s.dumps(param).decode('ascii')
return token
def __main__():
""""""
check_user_existed()
check_password_correct()
self.cid = company_user['企业ID']
self.name = company_user['企业名称']
self.verify_status = company_user['已认证']
return_body = self.dict_to_show(columns=["企业名称", "认证状态", "token"])
return_body['token'] = create_token({"cid": self.cid})
return return_body
return __main__()
def register(self, code):
"""注册"""
def make_new_cid():
"""生成新的企业ID如果该ID存在则重新生成"""
def random_cid(num):
"""随机企业ID"""
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
salt = ''
for i in range(num):
salt += random.choice(choices)
return salt
new_cid = random_cid(8)
case = self.db.find_single_column(
"应用端",
"企业用户",
{"企业ID": new_cid},
"企业ID"
) is not None
while case:
new_cid = random_cid(8)
return new_cid
def __main__():
""""""
self.check_email_registered()
self.check_vcode_correct(code)
self.cid = make_new_cid()
self.avatar_id = "623152edf36000004f00124e"
self.verify_status = ""
self.verify_info = VerifyInfo()
self.db.insert_single_data(
"应用端",
"企业用户",
self.dict_to_save()
)
self.db.delete_single_data(
"应用端",
"邮箱验证码记录",
{"邮箱": self.email}
)
__main__()
def get_company_user_info(self):
"""获取企业用户信息"""
company_user_data = self.db.find_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
["企业ID", "企业名称", "邮箱", "已认证"]
)
self.name = company_user_data['企业名称']
self.email = company_user_data['邮箱']
self.verify_status = company_user_data['已认证']
return self.dict_to_show(columns=["企业ID", "企业名称", "邮箱", "认证状态"])
def get_avatar(self):
"""获取用户头像"""
self.avatar_id = self.db.find_single_column(
"应用端",
"企业用户",
{"企业ID": self.cid},
"头像fid"
)
avatar = self.db.find_file(
"文件",
"企业用户头像",
self.avatar_id
)
return avatar
def change_avatar(self, avatar_option):
"""修改企业用户头像"""
avatars = {
"avatar_01": "623152edf36000004f00124e",
"avatar_02": "623152edf36000004f001250",
"avatar_03": "623152edf36000004f00124f",
"avatar_04": "623152edf36000004f001251",
"avatar_05": "623152edf36000004f001252",
"avatar_06": "623152edf36000004f001253"
}
self.avatar_id = avatars[avatar_option]
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
{"头像fid": self.avatar_id}
)
def change_password(self, code):
"""修改登录密码"""
self.check_email_not_registered()
self.check_vcode_correct(code)
self.db.update_single_data(
"应用端",
"企业用户",
{"邮箱": self.email},
self.dict_to_save(columns=["密码"])
)
def change_email(self, code):
"""更换邮箱"""
self.check_vcode_correct(code)
self.check_email_registered()
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
self.dict_to_save(columns=["邮箱"])
)
class VerifyMaterialImpl(VerifyMaterial):
"""认证材料实现类"""
db = MongoHelper("tfse_v0.21")
@staticmethod
def read_img_by_base64(img_file):
"""读取base64编码格式的图片"""
data = img_file.read()
try:
encode_str = str(base64.b64encode(data), 'utf-8')
except TypeError:
encode_str = base64.b64encode(data)
return encode_str
def check_is_verified(self):
"""检查企业是否已认证"""
self.verify_status = self.db.find_single_column(
"应用端",
"企业用户",
{"企业ID": self.cid},
"已认证"
)
if self.verify_status == "":
raise ReturnConditionCheckFailed("企业已认证", 200)
def upload_id_card(self):
"""上传身份证"""
def trans_date_format(_time):
"""将解析出来的时间数据加上年月日"""
year = _time[:4]
month = _time[4:6]
day = _time[6:8]
return year + '.' + month + '.' + day
def id_card_recognition(side):
"""调用身份证图片识别接口"""
url = 'http://dm-51.data.aliyun.com/rest/160601/ocr/ocr_idcard.json'
APPCODE = "6d6c3146677c40bf9e3e1e2aa73404cd"
# 请求头
headers = {
'Authorization': 'APPCODE %s' % APPCODE,
'Content-Type': 'application/json; charset=UTF-8'
}
# 请求体
body = {
"configure": {'side': side},
"image": self.read_img_by_base64(self.image)
}
try:
params = json.dumps(body).encode(encoding='UTF8')
req = Request(url, params, headers)
res = urlopen(req)
html = res.read()
result = json.loads(html.decode("utf8"))
if result['is_fake']:
raise ReturnConditionCheckFailed("伪造身份证", 200)
id_card = dict()
if side == 'face':
id_card['姓名'] = result['name']
id_card['性别'] = result['sex']
id_card['民族'] = result['nationality']
id_card['出生'] = "{}{}{}".format(result['birth'][:4], result['birth'][4:6], result['birth'][6:])
id_card['住址'] = result['address']
id_card['身份证号码'] = result['num']
else:
id_card['签发机关'] = result['issue']
id_card['有效期限'] = '{}-{}'.format(trans_date_format(result['start_date']), trans_date_format(result['end_date']))
return id_card
except Exception:
raise ReturnConditionCheckFailed("身份证识别失败", 200)
def id_card_recognition_result():
"""
身份证文本识别
cid: 企业ID
image: 图片
return: 识别结果
"""
# 身份证正面文本识别
result = id_card_recognition(side='face')
# 重置image读取位置不然下一步读不出图片
self.image.seek(0)
# 更新企业认证中身份证的信息
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
{"认证信息.法人姓名": result['姓名'], "认证信息.法人身份证": result['身份证号码']}
)
return {"姓名": result['姓名'], "身份证号码": result['身份证号码']}
def __main__():
""""""
self.check_is_verified()
return id_card_recognition_result()
return __main__()
def upload_business_license(self):
"""上传营业执照"""
def trans_date_format(_time):
"""将解析出来的时间数据加上年月日"""
year = _time[:4]
month = _time[4:6]
day = _time[6:8]
return year + '' + month + '' + day + ''
def business_license_recognition():
"""调用营业执照识别接口"""
url = 'https://dm-58.data.aliyun.com/rest/160601/ocr/ocr_business_license.json'
app_code = "6d6c3146677c40bf9e3e1e2aa73404cd"
headers = {
'Authorization': 'APPCODE %s' % app_code,
'Content-Type': 'application/json; charset=UTF-8'
}
body = {
"image": self.read_img_by_base64(self.image)
}
response = requests.post(url=url, data=json.dumps(body), headers=headers)
try:
res = json.loads(response.text)
data = dict()
data['统一社会信用代码'] = res['reg_num']
data['名称'] = res['name']
data['注册资本'] = res['capital']
data['类型'] = res['type']
data['成立日期'] = trans_date_format(res['establish_date'])
data['法定代表人'] = res['person']
data['经营范围'] = res['business']
data['住所'] = res['address']
return data
except Exception:
raise ReturnConditionCheckFailed("营业执照识别异常", 200)
def business_license_recognition_result():
"""营业执照识别结果"""
# 识别营业执照
result = business_license_recognition()
# 图片识别正确 更新企业认证中营业执照的信息
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
{"认证信息.企业名称": result['名称'], "认证信息.统一社会信用代码": result['统一社会信用代码']}
)
return {"公司名称": result['名称'], "统一社会信用代码": result['统一社会信用代码']}
def _main__():
""""""
self.check_is_verified()
return business_license_recognition_result()
return _main__()
class CompanyVerifyThreeFactorsImpl(CompanyVerifyThreeFactors):
"""企业认证三要素实现类"""
db = MongoHelper("tfse_v0.21")
def company_verify(self):
"""企业认证"""
def get_verify_info():
"""获取企业认证信息"""
record = self.db.find_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
["认证信息.企业名称", "认证信息.法人姓名", "认证信息.统一社会信用代码"]
)
if not record:
raise ReturnConditionCheckFailed("企业认证信息异常", 200)
verify_info = record['认证信息']
self.code = verify_info['统一社会信用代码']
self.name = verify_info['企业名称']
self.legal_person = verify_info['法人姓名']
def check_verify_info_complete():
"""检查认证信息是否完整"""
if None in [self.code, self.name, self.legal_person]:
raise ReturnConditionCheckFailed("企业认证信息异常", 200)
def check_three_elements_by_tyc():
"""调用天眼查企业三要素验证接口"""
token = "5407573b-e41e-4ab4-9f46-daa0032ad0a4"
base = "http://open.api.tianyancha.com/services/open/ic/verify/2.0?code={}&name={}&legalPersonName={}"
url = base.format(self.code, self.name, self.legal_person)
headers = {'Authorization': token}
res = json.loads(requests.get(url, headers=headers).text)
try:
if res['result']['result'] != 1:
raise ReturnConditionCheckFailed("企业认证未通过", 200)
except Exception:
raise ReturnConditionCheckFailed("认证接口异常", 200)
def init_company_data():
"""初始化企业主页数据"""
init_url = "http://api.fecribd.com/etl_tfse/company/init_company_data"
init_token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4"
init_headers = {'token': init_token, "Content-Type": "application/json;charset=UTF-8"}
data = json.dumps({"cid": self.cid, "company_name": self.name})
response = requests.post(init_url, headers=init_headers, data=data)
res = json.loads(response.text)
if res['info'] == "企业数据初始化成功":
raise ReturnConditionCheckFailed("企业数据初始化异常", 200)
else:
company_user = CompanyUser()
company_user.cid = self.cid
company_user.name = self.name
company_user.verify_status = ""
self.db.upsert_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
company_user.dict_to_save(columns=["企业名称", "已认证"])
)
def __main__():
get_verify_info()
check_verify_info_complete()
check_three_elements_by_tyc()
init_company_data()
return __main__()
class EmailVerifyCodeRecordImpl(EmailVerifyCodeRecord):
"""用户通知邮件实现类"""
db = MongoHelper("tfse_v0.21")
def make_verify_code(self):
"""生成邮箱验证码"""
def make_verify_code():
"""随机生成6位数字验证码"""
num_code = ""
for i in range(6):
ch = chr(random.randrange(ord('0'), ord('9') + 1))
num_code += ch
return num_code
self.v_code = make_verify_code()
self.v_period = time.time() + 300
self.db.upsert_single_data(
"应用端",
"邮箱验证码记录",
{"邮箱": self.email},
self.dict_to_save()
)
def transfer_email_api(self):
"""发送邮件"""
# 邮箱服务接口
email_api = 'http://116.63.130.34:30001'
v_types = {
"register": "注册账号",
"resetpwd": "修改密码",
"resetemail": "修改邮箱"
}
headers = {"Content-Type": "application/json;charset=UTF-8"}
data = {
"title": "【远东资信】{}".format(v_types[self.v_type]),
"sender": 'fecribd@fecr.com.cn',
"recipients": [self.email],
"msg_body": "{}的验证码为 【{}5分钟内有效。".format(v_types[self.v_type], self.v_code)
}
response = requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data))
return response.text
def check_email_registered(self):
"""检查邮箱已被注册"""
email = self.db.find_single_column(
"应用端",
"企业用户",
{"邮箱": self.email},
"邮箱"
)
if email is not None:
raise ReturnConditionCheckFailed("邮箱已被注册", 200)
def check_email_not_registered(self):
"""检查邮箱未被注册"""
email = self.db.find_single_column(
"应用端",
"企业用户",
{"邮箱": self.email},
"邮箱"
)
if email is None:
raise ReturnConditionCheckFailed("邮箱未被注册", 200)
def send_email(self):
""""""
if self.v_type == 'register' or self.v_type == 'resetemail':
self.check_email_registered()
self.make_verify_code()
self.transfer_email_api()
elif self.v_type == 'resetpwd':
self.check_email_not_registered()
self.make_verify_code()
self.transfer_email_api()