Merge branch 'v0.21' into 'master'

V0.21

See merge request root/tfse_app_api!41
This commit is contained in:
FECR-IBD 2022-04-27 07:24:47 +00:00
commit 474af52aa8
24 changed files with 906 additions and 829 deletions

View File

@ -1,565 +0,0 @@
import json
import time
import random
import base64
import requests
from urllib.request import Request, urlopen
from werkzeug.security import check_password_hash
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from DBHelper.MongoHelper import MongoHelper
from Utils.ErrorUtil import ReturnConditionCheckFailed, LogConditionCheckFailed
from CompanyUser.CompanyUserObj import CompanyUser, VerifyInfo, EmailVerifyCodeRecord, VerifyMaterial, \
CompanyVerifyThreeFactors
from Utils.ValidateUtil import ValidateAttr
class CompanyUserImpl(CompanyUser):
"""企业用户实现类"""
db = MongoHelper("tfse_v0.21")
def check_email_registered(self):
"""检查邮箱已被注册"""
email = self.db.find_single_column(
"应用端",
"企业用户",
{"邮箱": self.email},
"邮箱"
)
if email is not None:
raise ReturnConditionCheckFailed("邮箱已被注册", 200)
def check_email_not_registered(self):
"""检查邮箱未被注册"""
email = self.db.find_single_column(
"应用端",
"企业用户",
{"邮箱": self.email},
"邮箱"
)
if email is None:
raise ReturnConditionCheckFailed("邮箱未被注册", 200)
def check_vcode_correct(self, code):
"""检查邮箱验证码是否正确"""
verify_data = self.db.find_single_data(
"应用端",
"邮箱验证码记录",
{"邮箱": self.email},
["验证码", "验证有效期"]
)
if not verify_data:
raise ReturnConditionCheckFailed("验证码错误", 200)
if verify_data['验证码'] != code:
raise ReturnConditionCheckFailed("验证码错误", 200)
if time.time() - verify_data['验证有效期'] > 300:
raise ReturnConditionCheckFailed('验证码过期', 200)
def login(self):
"""登录"""
company_user = self.db.find_single_data(
"应用端",
"企业用户",
{"邮箱": self.email},
["企业ID", "企业名称", "邮箱", "密码", "已认证"]
)
def check_user_existed():
"""检查用户是否存在"""
if company_user is None:
raise ReturnConditionCheckFailed("不存在该用户", 200)
def check_password_correct():
"""检查密码是否正确"""
if not check_password_hash(company_user['密码'], self.pwd):
raise ReturnConditionCheckFailed("登录密码错误", 200)
def create_token(param):
"""
创建token
param: 传入参数用于创建token
return: 用户token
"""
secret_key = '0FTuOi^#Afx1@2@F'
token_expire = 14400
s = Serializer(secret_key, expires_in=token_expire)
token = '' + s.dumps(param).decode('ascii')
return token
def __main__():
""""""
check_user_existed()
check_password_correct()
self.cid = company_user['企业ID']
self.name = company_user['企业名称']
self.verify_status = company_user['已认证']
return_body = self.dict_to_show(columns=["企业名称", "认证状态", "token"])
return_body['token'] = create_token({"cid": self.cid})
return return_body
return __main__()
def register(self, code):
"""注册"""
def make_new_cid():
"""生成新的企业ID如果该ID存在则重新生成"""
def random_cid(num):
"""随机企业ID"""
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
salt = ''
for i in range(num):
salt += random.choice(choices)
return salt
new_cid = random_cid(8)
case = self.db.find_single_column(
"应用端",
"企业用户",
{"企业ID": new_cid},
"企业ID"
) is not None
while case:
new_cid = random_cid(8)
return new_cid
def __main__():
""""""
self.check_email_registered()
self.check_vcode_correct(code)
self.cid = make_new_cid()
self.avatar_id = "623152edf36000004f00124e"
self.verify_status = ""
self.verify_info = VerifyInfo()
self.db.insert_single_data(
"应用端",
"企业用户",
self.dict_to_save()
)
self.db.delete_single_data(
"应用端",
"邮箱验证码记录",
{"邮箱": self.email}
)
__main__()
def get_company_user_info(self):
"""获取企业用户信息"""
company_user_data = self.db.find_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
["企业ID", "企业名称", "邮箱", "已认证"]
)
self.name = company_user_data['企业名称']
self.email = company_user_data['邮箱']
self.verify_status = company_user_data['已认证']
return self.dict_to_show(columns=["企业ID", "企业名称", "邮箱", "认证状态"])
def get_avatar(self):
"""获取用户头像"""
self.avatar_id = self.db.find_single_column(
"应用端",
"企业用户",
{"企业ID": self.cid},
"头像fid"
)
avatar = self.db.find_file(
"文件",
"企业用户头像",
self.avatar_id
)
return avatar
def change_avatar(self, avatar_option):
"""修改企业用户头像"""
avatars = {
"avatar_01": "623152edf36000004f00124e",
"avatar_02": "623152edf36000004f001250",
"avatar_03": "623152edf36000004f00124f",
"avatar_04": "623152edf36000004f001251",
"avatar_05": "623152edf36000004f001252",
"avatar_06": "623152edf36000004f001253"
}
self.avatar_id = avatars[avatar_option]
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
{"头像fid": self.avatar_id}
)
def change_password(self, code):
"""修改登录密码"""
self.check_email_not_registered()
self.check_vcode_correct(code)
self.db.update_single_data(
"应用端",
"企业用户",
{"邮箱": self.email},
self.dict_to_save(columns=["密码"])
)
def change_email(self, code):
"""更换邮箱"""
self.check_vcode_correct(code)
self.check_email_registered()
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
self.dict_to_save(columns=["邮箱"])
)
class VerifyMaterialImpl(VerifyMaterial):
"""认证材料实现类"""
db = MongoHelper("tfse_v0.21")
@staticmethod
def read_img_by_base64(img_file):
"""读取base64编码格式的图片"""
data = img_file.read()
try:
encode_str = str(base64.b64encode(data), 'utf-8')
except TypeError:
encode_str = base64.b64encode(data)
return encode_str
def check_is_verified(self):
"""检查企业是否已认证"""
self.verify_status = self.db.find_single_column(
"应用端",
"企业用户",
{"企业ID": self.cid},
"已认证"
)
if self.verify_status == "":
raise ReturnConditionCheckFailed("企业已认证", 200)
def upload_id_card(self):
"""上传身份证"""
def trans_date_format(_time):
"""将解析出来的时间数据加上年月日"""
year = _time[:4]
month = _time[4:6]
day = _time[6:8]
return year + '.' + month + '.' + day
def id_card_recognition(side):
"""调用身份证图片识别接口"""
url = 'http://dm-51.data.aliyun.com/rest/160601/ocr/ocr_idcard.json'
APPCODE = "6d6c3146677c40bf9e3e1e2aa73404cd"
# 请求头
headers = {
'Authorization': 'APPCODE %s' % APPCODE,
'Content-Type': 'application/json; charset=UTF-8'
}
# 请求体
body = {
"configure": {'side': side},
"image": self.read_img_by_base64(self.image)
}
try:
params = json.dumps(body).encode(encoding='UTF8')
req = Request(url, params, headers)
res = urlopen(req)
html = res.read()
result = json.loads(html.decode("utf8"))
if result['is_fake']:
raise ReturnConditionCheckFailed("伪造身份证", 200)
id_card = dict()
if side == 'face':
id_card['姓名'] = result['name']
id_card['性别'] = result['sex']
id_card['民族'] = result['nationality']
id_card['出生'] = "{}{}{}".format(result['birth'][:4], result['birth'][4:6], result['birth'][6:])
id_card['住址'] = result['address']
id_card['身份证号码'] = result['num']
else:
id_card['签发机关'] = result['issue']
id_card['有效期限'] = '{}-{}'.format(trans_date_format(result['start_date']), trans_date_format(result['end_date']))
return id_card
except Exception:
raise ReturnConditionCheckFailed("身份证识别失败", 200)
def id_card_recognition_result():
"""
身份证文本识别
cid: 企业ID
image: 图片
return: 识别结果
"""
# 身份证正面文本识别
result = id_card_recognition(side='face')
# 重置image读取位置不然下一步读不出图片
self.image.seek(0)
# 更新企业认证中身份证的信息
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
{"认证信息.法人姓名": result['姓名'], "认证信息.法人身份证": result['身份证号码']}
)
return {"姓名": result['姓名'], "身份证号码": result['身份证号码']}
def __main__():
""""""
self.check_is_verified()
return id_card_recognition_result()
return __main__()
def upload_business_license(self):
"""上传营业执照"""
def trans_date_format(_time):
"""将解析出来的时间数据加上年月日"""
year = _time[:4]
month = _time[4:6]
day = _time[6:8]
return year + '' + month + '' + day + ''
def business_license_recognition():
"""调用营业执照识别接口"""
url = 'https://dm-58.data.aliyun.com/rest/160601/ocr/ocr_business_license.json'
app_code = "6d6c3146677c40bf9e3e1e2aa73404cd"
headers = {
'Authorization': 'APPCODE %s' % app_code,
'Content-Type': 'application/json; charset=UTF-8'
}
body = {
"image": self.read_img_by_base64(self.image)
}
response = requests.post(url=url, data=json.dumps(body), headers=headers)
try:
res = json.loads(response.text)
data = dict()
data['统一社会信用代码'] = res['reg_num']
data['名称'] = res['name']
data['注册资本'] = res['capital']
data['类型'] = res['type']
data['成立日期'] = trans_date_format(res['establish_date'])
data['法定代表人'] = res['person']
data['经营范围'] = res['business']
data['住所'] = res['address']
return data
except Exception:
raise ReturnConditionCheckFailed("营业执照识别异常", 200)
def business_license_recognition_result():
"""营业执照识别结果"""
# 识别营业执照
result = business_license_recognition()
# 图片识别正确 更新企业认证中营业执照的信息
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
{"认证信息.企业名称": result['名称'], "认证信息.统一社会信用代码": result['统一社会信用代码']}
)
return {"公司名称": result['名称'], "统一社会信用代码": result['统一社会信用代码']}
def _main__():
""""""
self.check_is_verified()
return business_license_recognition_result()
return _main__()
class CompanyVerifyThreeFactorsImpl(CompanyVerifyThreeFactors):
"""企业认证三要素实现类"""
db = MongoHelper("tfse_v0.21")
def company_verify(self):
"""企业认证"""
def get_verify_info():
"""获取企业认证信息"""
record = self.db.find_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
["认证信息.企业名称", "认证信息.法人姓名", "认证信息.统一社会信用代码"]
)
if not record:
raise ReturnConditionCheckFailed("企业认证信息异常", 200)
verify_info = record['认证信息']
self.code = verify_info['统一社会信用代码']
self.name = verify_info['企业名称']
self.legal_person = verify_info['法人姓名']
def check_verify_info_complete():
"""检查认证信息是否完整"""
if None in [self.code, self.name, self.legal_person]:
raise ReturnConditionCheckFailed("企业认证信息异常", 200)
def check_three_elements_by_tyc():
"""调用天眼查企业三要素验证接口"""
token = "5407573b-e41e-4ab4-9f46-daa0032ad0a4"
base = "http://open.api.tianyancha.com/services/open/ic/verify/2.0?code={}&name={}&legalPersonName={}"
url = base.format(self.code, self.name, self.legal_person)
headers = {'Authorization': token}
res = json.loads(requests.get(url, headers=headers).text)
try:
if res['result']['result'] != 1:
raise ReturnConditionCheckFailed("企业认证未通过", 200)
except Exception:
raise ReturnConditionCheckFailed("认证接口异常", 200)
def init_company_data():
"""初始化企业主页数据"""
init_url = "http://api.fecribd.com/etl_tfse/company/init_company_data"
init_token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4"
init_headers = {'token': init_token, "Content-Type": "application/json;charset=UTF-8"}
data = json.dumps({"cid": self.cid, "company_name": self.name})
response = requests.post(init_url, headers=init_headers, data=data)
res = json.loads(response.text)
if res['info'] == "企业数据初始化成功":
company_user = CompanyUser()
company_user.cid = self.cid
company_user.name = self.name
company_user.verify_status = ""
self.db.upsert_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
company_user.dict_to_save(columns=["企业名称", "已认证"])
)
else:
raise ReturnConditionCheckFailed("企业数据初始化异常", 200)
def __main__():
get_verify_info()
check_verify_info_complete()
check_three_elements_by_tyc()
init_company_data()
return __main__()
class EmailVerifyCodeRecordImpl(EmailVerifyCodeRecord):
"""用户通知邮件实现类"""
db = MongoHelper("tfse_v0.21")
def make_verify_code(self):
"""生成邮箱验证码"""
def make_verify_code():
"""随机生成6位数字验证码"""
num_code = ""
for i in range(6):
ch = chr(random.randrange(ord('0'), ord('9') + 1))
num_code += ch
return num_code
self.v_code = make_verify_code()
self.v_period = time.time() + 300
self.db.upsert_single_data(
"应用端",
"邮箱验证码记录",
{"邮箱": self.email},
self.dict_to_save()
)
def transfer_email_api(self):
"""发送邮件"""
# 邮箱服务接口
email_api = 'http://116.63.130.34:30001'
v_types = {
"register": "注册账号",
"resetpwd": "修改密码",
"resetemail": "修改邮箱"
}
headers = {"Content-Type": "application/json;charset=UTF-8"}
data = {
"title": "【远东资信】{}".format(v_types[self.v_type]),
"sender": 'fecribd@fecr.com.cn',
"recipients": [self.email],
"msg_body": "{}的验证码为 【{}5分钟内有效。".format(v_types[self.v_type], self.v_code)
}
response = requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data))
return response.text
def check_email_registered(self):
"""检查邮箱已被注册"""
email = self.db.find_single_column(
"应用端",
"企业用户",
{"邮箱": self.email},
"邮箱"
)
if email is not None:
raise ReturnConditionCheckFailed("邮箱已被注册", 200)
def check_email_not_registered(self):
"""检查邮箱未被注册"""
email = self.db.find_single_column(
"应用端",
"企业用户",
{"邮箱": self.email},
"邮箱"
)
if email is None:
raise ReturnConditionCheckFailed("邮箱未被注册", 200)
def send_email(self):
""""""
if self.v_type == 'register' or self.v_type == 'resetemail':
self.check_email_registered()
self.make_verify_code()
self.transfer_email_api()
elif self.v_type == 'resetpwd':
self.check_email_not_registered()
self.make_verify_code()
self.transfer_email_api()

View File

@ -1,188 +0,0 @@
from werkzeug.security import generate_password_hash
from Utils.ValidateUtil import ValidateAttr, Validate
class VerifyInfo(object):
"""认证信息"""
name = ValidateAttr(field='name', default=None, type=str, error_info='企业名称异常', error_code=200)
code = ValidateAttr(field='code', default=None, type=str, error_info='统一社会信用代码异常', error_code=200)
legal_person = ValidateAttr(field='legal_person', default=None, type=str, error_info='法人姓名异常', error_code=200)
legal_person_id = ValidateAttr(field='legal_person_id', default=None, type=str, error_info='法人身份证号异常', error_code=200)
def dict_to_save(self, **kwargs):
""""""
_dict_ = {
"企业名称": self.name,
"统一社会信用代码": self.code,
"法人姓名": self.legal_person,
"法人身份证": self.legal_person_id,
}
if 'columns' in kwargs:
_dict_ = {key: _dict_[key] for key in kwargs['columns']}
return _dict_
class CompanyUser(object):
"""企业用户"""
cid = ValidateAttr(field='cid', type=str, length=8, error_info='企业ID异常', error_code=200)
name = ValidateAttr(field='name', type=str, default=None, error_info='企业名称异常', error_code=200)
email = ValidateAttr(field='email', func=Validate.email, error_info='邮箱格式异常', error_code=200)
pwd = ValidateAttr(field='pwd', func=Validate.password, error_info='密码强度异常', error_code=200)
avatar_id = ValidateAttr(field='avatar_id', type=str, length=24, error_info='头像ID异常', error_code=200)
verify_status = ValidateAttr(field='verify_status', type=str, in_list=["", ""], error_info='认证状态异常', error_code=200)
verify_info = ValidateAttr(field='verify_info', type=VerifyInfo, error_info='认证信息异常', error_code=200)
register_time = ValidateAttr(field='register_time', default=None, type=str, error_info='注册时间异常', error_code=200)
def dict_to_show(self, **kwargs):
""""""
_dict_ = {
"企业ID": self.cid,
"企业名称": '企业用户' if self.name is None else self.name,
"邮箱": self.email,
"认证状态": "已认证" if self.verify_status == "" else "未认证",
"token": None
}
if 'columns' in kwargs:
_dict_ = {key: _dict_[key] for key in kwargs['columns']}
return _dict_
def dict_to_save(self, **kwargs):
"""存储对象"""
# 对象字段与实例字段的映射
keys_map = {
"cid": "企业ID",
"name": "企业名称",
"email": "邮箱",
"avatar_id": "头像fid",
"pwd": "密码",
"register_time": "注册时间",
"verify_status": "已认证",
"verify_info": "认证信息"
}
# 实例一个存储对象
_dict_ = dict()
# 实例字段列表
obj_keys = [key for key in self.__dict__.keys()]
# 实例的值循环赋值给存储对象
for key in obj_keys:
_dict_[keys_map[key]] = self.__dict__[key]
# 存储对象子集
if 'columns' in kwargs:
_dict_ = {key: _dict_[key] for key in kwargs['columns']}
if '密码' in _dict_:
_dict_['密码'] = generate_password_hash(_dict_['密码'])
return _dict_
def login(self):
"""登录"""
def register(self, **kwargs):
"""注册"""
def get_company_user_info(self):
"""获取企业用户信息"""
def get_avatar(self):
"""获取用户头像"""
def change_avatar(self, **kwargs):
"""修改用户头像"""
def change_password(self, **kwargs):
"""修改登录密码"""
def change_email(self, **kwargs):
"""修改登录邮箱"""
class IDCardFront(object):
"""身份证正面"""
name = ValidateAttr()
gender = ValidateAttr()
nationality = ValidateAttr()
birth = ValidateAttr()
address = ValidateAttr()
id_num = ValidateAttr()
class IDCardBack(object):
"""身份证反面"""
issuing_authority = ValidateAttr()
validity_period = ValidateAttr()
class BusinessLicense(object):
"""营业执照"""
reg_num = ValidateAttr()
name = ValidateAttr()
capital = ValidateAttr()
type = ValidateAttr()
establish_date = ValidateAttr()
person = ValidateAttr()
business = ValidateAttr()
address = ValidateAttr()
class VerifyMaterial(object):
"""认证材料"""
cid = ValidateAttr(field='cid', type=str, length=8, error_info='企业ID异常', error_code=200)
image = ValidateAttr(field='image', func=Validate.image, error_info='图片格式异常', error_code=200)
verify_status = ValidateAttr(field='verify_status', type=str, in_list=["", ""], error_info='认证状态异常', error_code=200)
def upload_id_card(self):
"""上传身份证"""
def upload_business_license(self):
"""上传营业执照"""
class CompanyVerifyThreeFactors(object):
"""企业认证三要素"""
cid = ValidateAttr(field='cid', type=str, length=8)
code = ValidateAttr(field='code', type=str)
name = ValidateAttr(field='name', type=str)
legal_person = ValidateAttr(field='legal_person', type=str)
fields_map = {
"cid": "企业ID",
"code": "统一社会信用代码",
"name": "企业名称",
"legal_person": "法人姓名",
}
def company_verify(self):
"""企业认证"""
class EmailVerifyCodeRecord(object):
"""用户邮件通知"""
email = ValidateAttr(field='email', func=Validate.email, error_info='邮箱格式异常', error_code=200)
v_period = ValidateAttr(field='v_period', type=float, error_info='验证有效期格式异常', error_code=200)
v_code = ValidateAttr(field='v_code', type=str, length=6, error_info='验证码格式异常', error_code=200)
v_type = ValidateAttr(field="v_type", type=str, in_list=["register", "resetemail", "resetpwd"], error_info="邮件验证类型错误", error_code=200)
def dict_to_save(self):
""""""
_dict_ = {
"邮箱": self.email,
"验证有效期": self.v_period,
"验证码": self.v_code
}
return _dict_
def send_email(self):
"""发送邮件"""

View File

@ -0,0 +1,91 @@
import time
import json
import random
import requests
from DBHelper.MongoHelper import MongoHelper
from ObjectsCommon.Others.EmailNotice import EmailNotice
from Utils.ErrorUtil import ReturnConditionCheckFailed
class EmailNoticeImpl(EmailNotice):
"""邮件通知实现类"""
db = MongoHelper("tfse_v0.21")
def make_verify_code(self):
"""生成邮箱验证码"""
def make_verify_code():
"""随机生成6位数字验证码"""
num_code = ""
for i in range(6):
ch = chr(random.randrange(ord('0'), ord('9') + 1))
num_code += ch
return num_code
self.v_code = make_verify_code()
self.v_period = time.time() + 300
self.db.upsert_single_data(
"应用端",
"邮箱验证码记录",
{"邮箱": self.email},
self.dict_to_save()
)
def transfer_email_api(self):
"""发送邮件"""
# 邮箱服务接口
email_api = 'http://116.63.130.34:30001'
v_types = {
"register": "注册账号",
"resetpwd": "修改密码",
"resetemail": "修改邮箱"
}
headers = {"Content-Type": "application/json;charset=UTF-8"}
data = {
"title": "【远东资信】{}".format(v_types[self.v_type]),
"sender": 'fecribd@fecr.com.cn',
"recipients": [self.email],
"msg_body": "{}的验证码为 【{}5分钟内有效。".format(v_types[self.v_type], self.v_code)
}
response = requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data))
return response.text
def check_email_registered(self):
"""检查邮箱已被注册"""
email = self.db.find_single_column(
"应用端",
"企业用户",
{"邮箱": self.email},
"邮箱"
)
if email is not None:
raise ReturnConditionCheckFailed("邮箱已被注册", 200)
def check_email_not_registered(self):
"""检查邮箱未被注册"""
email = self.db.find_single_column(
"应用端",
"企业用户",
{"邮箱": self.email},
"邮箱"
)
if email is None:
raise ReturnConditionCheckFailed("邮箱未被注册", 200)
def send_email(self):
"""发送通知邮件"""
if self.v_type == 'register' or self.v_type == 'resetemail':
self.check_email_registered()
self.make_verify_code()
self.transfer_email_api()
elif self.v_type == 'resetpwd':
self.check_email_not_registered()
self.make_verify_code()
self.transfer_email_api()

View File

@ -0,0 +1,258 @@
import base64
import json
from urllib.request import urlopen, Request
import requests
from DBHelper.MongoHelper import MongoHelper
from ObjectsInProject.User.RealCompanyVerify import UploadMaterial, RealCompanyVerify
from ObjectsInProject.User.TFSECompanyUser import TFSECompanyUser
from Utils.ErrorUtil import ReturnConditionCheckFailed
class UploadMaterialImpl(UploadMaterial):
"""认证材料实现类"""
db = MongoHelper("tfse_v0.21")
@staticmethod
def read_img_by_base64(img_file):
"""读取base64编码格式的图片"""
data = img_file.read()
try:
encode_str = str(base64.b64encode(data), 'utf-8')
except TypeError:
encode_str = base64.b64encode(data)
return encode_str
def check_is_verified(self):
"""检查企业是否已认证"""
self.verify_status = self.db.find_single_column(
"应用端",
"企业用户",
{"企业ID": self.cid},
"已认证"
)
if self.verify_status == "":
raise ReturnConditionCheckFailed(failed_info="企业已认证", status_code=200)
def upload_id_card(self):
"""上传身份证"""
def trans_date_format(_time):
"""将解析出来的时间数据加上年月日"""
year = _time[:4]
month = _time[4:6]
day = _time[6:8]
return year + '.' + month + '.' + day
def id_card_recognition(side):
"""调用身份证图片识别接口"""
url = 'http://dm-51.data.aliyun.com/rest/160601/ocr/ocr_idcard.json'
APPCODE = "6d6c3146677c40bf9e3e1e2aa73404cd"
# 请求头
headers = {
'Authorization': 'APPCODE %s' % APPCODE,
'Content-Type': 'application/json; charset=UTF-8'
}
# 请求体
body = {
"configure": {'side': side},
"image": self.read_img_by_base64(self.image)
}
try:
params = json.dumps(body).encode(encoding='UTF8')
req = Request(url, params, headers)
res = urlopen(req)
html = res.read()
result = json.loads(html.decode("utf8"))
if result['is_fake']:
raise ReturnConditionCheckFailed("伪造身份证", 200)
id_card = dict()
if side == 'face':
id_card['姓名'] = result['name']
id_card['性别'] = result['sex']
id_card['民族'] = result['nationality']
id_card['出生'] = "{}{}{}".format(result['birth'][:4], result['birth'][4:6], result['birth'][6:])
id_card['住址'] = result['address']
id_card['身份证号码'] = result['num']
else:
id_card['签发机关'] = result['issue']
id_card['有效期限'] = '{}-{}'.format(trans_date_format(result['start_date']), trans_date_format(result['end_date']))
return id_card
except Exception:
raise ReturnConditionCheckFailed("身份证识别失败", 200)
def id_card_recognition_result():
"""
身份证文本识别
cid: 企业ID
image: 图片
return: 识别结果
"""
# 身份证正面文本识别
result = id_card_recognition(side='face')
# 重置image读取位置不然下一步读不出图片
self.image.seek(0)
# 更新企业认证中身份证的信息
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
{"认证信息.法人姓名": result['姓名'], "认证信息.法人身份证": result['身份证号码']}
)
return {"姓名": result['姓名'], "身份证号码": result['身份证号码']}
def __main__():
self.check_is_verified()
return id_card_recognition_result()
return __main__()
def upload_business_license(self):
"""上传营业执照"""
def trans_date_format(_time):
"""将解析出来的时间数据加上年月日"""
year = _time[:4]
month = _time[4:6]
day = _time[6:8]
return year + '' + month + '' + day + ''
def business_license_recognition():
"""调用营业执照识别接口"""
url = 'https://dm-58.data.aliyun.com/rest/160601/ocr/ocr_business_license.json'
app_code = "6d6c3146677c40bf9e3e1e2aa73404cd"
headers = {
'Authorization': 'APPCODE %s' % app_code,
'Content-Type': 'application/json; charset=UTF-8'
}
body = {
"image": self.read_img_by_base64(self.image)
}
response = requests.post(url=url, data=json.dumps(body), headers=headers)
try:
res = json.loads(response.text)
data = dict()
data['统一社会信用代码'] = res['reg_num']
data['名称'] = res['name']
data['注册资本'] = res['capital']
data['类型'] = res['type']
data['成立日期'] = trans_date_format(res['establish_date'])
data['法定代表人'] = res['person']
data['经营范围'] = res['business']
data['住所'] = res['address']
return data
except Exception:
raise ReturnConditionCheckFailed("营业执照识别异常", 200)
def business_license_recognition_result():
"""营业执照识别结果"""
# 识别营业执照
result = business_license_recognition()
# 图片识别正确 更新企业认证中营业执照的信息
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
{"认证信息.企业名称": result['名称'], "认证信息.统一社会信用代码": result['统一社会信用代码']}
)
return {"公司名称": result['名称'], "统一社会信用代码": result['统一社会信用代码']}
def _main_():
self.check_is_verified()
return business_license_recognition_result()
return _main_()
class RealCompanyVerifyImpl(RealCompanyVerify):
"""企业认证三要素实现类"""
db = MongoHelper("tfse_v0.21")
def verify(self):
"""企业认证"""
def get_verify_info():
"""获取企业认证信息"""
record = self.db.find_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
["认证信息.企业名称", "认证信息.法人姓名", "认证信息.统一社会信用代码"]
)
if not record:
raise ReturnConditionCheckFailed("企业认证信息异常", 200)
verify_info = record['认证信息']
self.code = verify_info['统一社会信用代码']
self.name = verify_info['企业名称']
self.legal_person = verify_info['法人姓名']
def check_verify_info_complete():
"""检查认证信息是否完整"""
if None in [self.code, self.name, self.legal_person]:
raise ReturnConditionCheckFailed("企业认证信息异常", 200)
def check_three_elements_by_tyc():
"""调用天眼查企业三要素验证接口"""
token = "5407573b-e41e-4ab4-9f46-daa0032ad0a4"
base = "http://open.api.tianyancha.com/services/open/ic/verify/2.0?code={}&name={}&legalPersonName={}"
url = base.format(self.code, self.name, self.legal_person)
headers = {'Authorization': token}
res = json.loads(requests.get(url, headers=headers).text)
try:
if res['result']['result'] != 1:
raise ReturnConditionCheckFailed("企业认证未通过", 200)
except Exception:
raise ReturnConditionCheckFailed("认证接口异常", 200)
def init_company_data():
"""初始化企业主页数据"""
init_url = "http://api.fecribd.com/etl_tfse/company/init_company_data"
init_token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4"
init_headers = {'token': init_token, "Content-Type": "application/json;charset=UTF-8"}
data = json.dumps({"cid": self.cid, "company_name": self.name})
response = requests.post(init_url, headers=init_headers, data=data)
res = json.loads(response.text)
if res['info'] == "企业数据初始化成功":
raise ReturnConditionCheckFailed("企业数据初始化异常", 200)
else:
company_user = TFSECompanyUser()
company_user.cid = self.cid
company_user.name = self.name
company_user.verify_status = ""
self.db.upsert_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
company_user.dict_keys_toggle(columns=["企业名称", "已认证"])
)
def __main__():
get_verify_info()
check_verify_info_complete()
check_three_elements_by_tyc()
init_company_data()
return __main__()

View File

@ -0,0 +1,216 @@
import json
import time
import random
import base64
import requests
from urllib.request import Request, urlopen
from werkzeug.security import check_password_hash, generate_password_hash
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from DBHelper.MongoHelper import MongoHelper
from ObjectsInProject.User.TFSECompanyUser import TFSECompanyUser
from Utils.ErrorUtil import ReturnConditionCheckFailed
class TFSECompanyUserImpl(TFSECompanyUser):
"""企业用户实现类"""
db = MongoHelper("tfse_v0.21")
def check_email_registered(self):
"""检查邮箱已被注册"""
email = self.db.find_single_column(
"应用端",
"企业用户",
{"邮箱": self.email},
"邮箱"
)
if email is not None:
raise ReturnConditionCheckFailed("邮箱已被注册", 200)
def check_email_not_registered(self):
"""检查邮箱未被注册"""
email = self.db.find_single_column(
"应用端",
"企业用户",
{"邮箱": self.email},
"邮箱"
)
if email is None:
raise ReturnConditionCheckFailed("邮箱未被注册", 200)
def check_vcode_correct(self, code):
"""检查邮箱验证码是否正确"""
verify_data = self.db.find_single_data(
"应用端",
"邮箱验证码记录",
{"邮箱": self.email},
["验证码", "验证有效期"]
)
if not verify_data:
raise ReturnConditionCheckFailed("验证码错误", 200)
if verify_data['验证码'] != code:
raise ReturnConditionCheckFailed("验证码错误", 200)
if time.time() - verify_data['验证有效期'] > 300:
raise ReturnConditionCheckFailed('验证码过期', 200)
def login(self):
"""登录"""
company_user = self.db.find_single_data(
"应用端",
"企业用户",
{"邮箱": self.email},
["企业ID", "企业名称", "邮箱", "密码", "已认证"]
)
def check_user_existed():
"""检查用户是否存在"""
if company_user is None:
raise ReturnConditionCheckFailed("不存在该用户", 200)
def check_password_correct():
"""检查密码是否正确"""
if not check_password_hash(company_user['密码'], self.pwd):
raise ReturnConditionCheckFailed("登录密码错误", 200)
def create_token(param):
"""
创建token
param: 传入参数用于创建token一般为 {"cid": cid}
return: 用户token
"""
secret_key = '0FTuOi^#Afx1@2@F'
token_expire = 14400
s = Serializer(secret_key, expires_in=token_expire)
token = '' + s.dumps(param).decode('ascii')
return token
check_user_existed()
check_password_correct()
self.cid = company_user['企业ID']
self.name = self.get_attr(company_user, '企业名称', default='未认证企业用户')
self.verify_status = company_user['已认证']
self.token = create_token({"cid": self.cid})
def register(self, code):
"""注册"""
def make_new_cid():
"""生成新的企业ID如果该ID存在则重新生成"""
def random_cid(num):
"""随机企业ID"""
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
salt = ''
for i in range(num):
salt += random.choice(choices)
return salt
new_cid = random_cid(8)
case = self.db.find_single_column(
"应用端",
"企业用户",
{"企业ID": new_cid},
"企业ID"
) is not None
while case:
new_cid = random_cid(8)
return new_cid
self.check_email_registered()
self.check_vcode_correct(code)
self.cid = make_new_cid()
self.avatar_id = "623152edf36000004f00124e"
self.verify_status = ""
self.db.insert_single_data(
"应用端",
"企业用户",
self.dict_to_return()
)
self.db.delete_single_data(
"应用端",
"邮箱验证码记录",
{"邮箱": self.email}
)
def get_company_user_info(self):
"""获取企业用户信息"""
company_user_data = self.db.find_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
["企业ID", "企业名称", "邮箱", "已认证"]
)
self.name = self.get_attr(company_user_data, '企业名称', default='未认证企业用户')
self.email = company_user_data['邮箱']
self.verify_status = company_user_data['已认证']
def get_avatar(self):
"""获取用户头像"""
self.avatar_id = self.db.find_single_column(
"应用端",
"企业用户",
{"企业ID": self.cid},
"头像fid"
)
avatar = self.db.find_file(
"文件",
"企业用户头像",
self.avatar_id
)
return avatar
def change_avatar(self, avatar_option):
"""修改企业用户头像"""
avatars = {
"avatar_01": "623152edf36000004f00124e",
"avatar_02": "623152edf36000004f001250",
"avatar_03": "623152edf36000004f00124f",
"avatar_04": "623152edf36000004f001251",
"avatar_05": "623152edf36000004f001252",
"avatar_06": "623152edf36000004f001253"
}
self.avatar_id = avatars[avatar_option]
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
{"头像fid": self.avatar_id}
)
def change_password(self, code):
"""修改登录密码"""
self.check_email_not_registered()
self.check_vcode_correct(code)
self.db.update_single_data(
"应用端",
"企业用户",
{"邮箱": self.email},
self.dict_to_save(columns=["密码"])
)
def change_email(self, code):
"""更换邮箱"""
self.check_vcode_correct(code)
self.check_email_registered()
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
self.dict_to_save(columns=["邮箱"])
)

View File

@ -0,0 +1,21 @@
from Utils.ObjUtil import SpecObject
from Utils.ValidateUtil import ValidateAttr, Validate
class EmailNotice(SpecObject):
"""用户邮件通知"""
email = ValidateAttr(field='email', func=Validate.email)
v_period = ValidateAttr(field='v_period', type=float)
v_code = ValidateAttr(field='v_code', type=str, length=6)
v_type = ValidateAttr(field="v_type", in_list=["register", "resetemail", "resetpwd"])
fields_map = {
"email": "邮箱",
"v_period": "验证有效期",
"v_code": "验证码",
"v_type": "验证类型"
}
def send_email(self):
"""发送邮件"""

View File

@ -0,0 +1,22 @@
from Utils.ObjUtil import SpecObject
from Utils.ValidateUtil import ValidateAttr, Validate
class TobUser(SpecObject):
"""企业用户类"""
cid = ValidateAttr(field='cid', type=str)
pwd = ValidateAttr(field='pwd', func=Validate.password, error_info='密码强度不够')
name = ValidateAttr(field='name', type=str)
email = ValidateAttr(field='email', func=Validate.email, error_info='邮箱格式错误')
telephone = ValidateAttr(field='telephone', func=Validate.telephone, error_info='手机号格式错误')
register_time = ValidateAttr(field='register_time', func=Validate.time_format)
fields_map = {
"cid": "企业ID",
"pwd": "密码",
"name": "企业名称",
"email": "邮箱",
"telephone": "手机号",
"register_time": "注册时间"
}

View File

@ -0,0 +1,20 @@
from Utils.ObjUtil import SpecObject
from Utils.ValidateUtil import ValidateAttr
class User(SpecObject):
"""普通用户"""
uid = ValidateAttr(field='uid', type=str)
name = ValidateAttr(field='name', type=str)
pwd = ValidateAttr(field='pwd', type=str)
email = ValidateAttr(field='email', type=str)
telephone = ValidateAttr(field='telephone', type=str)
fields_map = {
"uid": "用户ID",
"name": "姓名",
"pwd": "密码",
"email": "邮箱",
"telephone": "手机号"
}

View File

@ -0,0 +1,93 @@
from Utils.ObjUtil import SpecObject
from Utils.ValidateUtil import ValidateAttr, Validate
class IDCardFront(SpecObject):
"""身份证正面"""
name = ValidateAttr(field='name', type=str)
gender = ValidateAttr(field='gender', in_list=['', ''])
nationality = ValidateAttr(field='nationality', type=str)
birth_day = ValidateAttr(field='birth_day', func=Validate.date_format)
address = ValidateAttr(field='address', type=str)
id_num = ValidateAttr(field='id_num', type=str)
fields_map = {
'姓名': 'name',
'性别': 'gender',
'民族': 'nationality',
'出生': 'birth_day',
'住址': 'address',
'身份证号码': 'id_num'
}
class IDCardBack(SpecObject):
"""身份证反面"""
issuing_authority = ValidateAttr(field='issuing_authority', type=str)
validity_period = ValidateAttr(field='validity_period', type=str)
fields_map = {
"签发机关": "issuing_authority",
"有效期限": "validity_period"
}
class BusinessLicense(SpecObject):
"""营业执照"""
reg_num = ValidateAttr(field='reg_num', type=str)
name = ValidateAttr(field='name', type=str)
capital = ValidateAttr(field='capital', type=str)
type = ValidateAttr(field='type', type=str)
establish_date = ValidateAttr(field='establish_date', type=str)
person = ValidateAttr(field='person', type=str)
business = ValidateAttr(field='business', type=str)
address = ValidateAttr(field='address', type=str)
fields_map = {
'统一社会信用代码': "reg_num",
'名称': "name",
'注册资本': "capital",
'类型': "type",
'成立日期': "establish_date",
'法定代表人': "person",
'经营范围': "business",
'住所': "address"
}
class UploadMaterial(SpecObject):
"""上传材料"""
cid = ValidateAttr(field='cid', type=str, length=8)
image = ValidateAttr(field='image', func=Validate.image, mark='图片格式')
verify_status = ValidateAttr(field='verify_status', in_list=["", ""], mark='认证状态')
fields_map = {
"cid": "企业ID",
"image": "图片",
"verify_status": "已认证"
}
def upload_id_card(self):
"""上传身份证"""
def upload_business_license(self):
"""上传营业执照"""
class RealCompanyVerify(object):
"""企业认证三要素"""
cid = ValidateAttr(field='cid', type=str, length=8)
code = ValidateAttr(field='code', type=str)
name = ValidateAttr(field='name', type=str)
legal_person = ValidateAttr(field='legal_person', type=str)
fields_map = {
"cid": "企业ID",
"code": "统一社会信用代码",
"name": "企业名称",
"legal_person": "法人姓名",
}
def verify(self):
"""企业认证"""

View File

@ -0,0 +1,79 @@
from werkzeug.security import generate_password_hash
from Utils.ObjUtil import SpecObject
from Utils.ValidateUtil import ValidateAttr, Validate
class TFSECompanyUser(SpecObject):
"""天府股交项企业用户"""
class VerifyInfo(SpecObject):
"""认证信息"""
name = ValidateAttr(field='name', type=str)
code = ValidateAttr(field='code', type=str)
legal_person = ValidateAttr(field='legal_person', type=str)
legal_person_id = ValidateAttr(field='legal_person_id', type=str)
fields_map = {
"name": "企业名称",
"code": "统一社会信用代码",
"legal_person": "法人姓名",
"legal_person_id": "法人身份证号"
}
cid = ValidateAttr(field='cid', type=str)
pwd = ValidateAttr(field='pwd', func=Validate.password, error_info='密码强度不够')
name = ValidateAttr(field='name', type=str)
email = ValidateAttr(field='email', func=Validate.email, error_info='邮箱格式错误')
telephone = ValidateAttr(field='telephone', func=Validate.telephone, error_info='手机号格式错误')
register_time = ValidateAttr(field='register_time', func=Validate.time_format)
avatar_id = ValidateAttr(field='avatar_id', type=str, length=24)
verify_status = ValidateAttr(field='verify_status', in_list=["", ""])
verify_info = ValidateAttr(field='verify_info', type=VerifyInfo)
token = ValidateAttr(field='token', type=str)
fields_map = {
"cid": "企业ID",
"pwd": "密码",
"name": "企业名称",
"email": "邮箱",
"telephone": "手机号",
"register_time": "注册时间",
"avatar_id": "头像fid",
"verify_status": "已认证",
"verify_info": "认证信息",
"token": "token"
}
def dict_to_return(self, **kwargs):
_dict_ = self.dict_keys_toggle(columns=kwargs['columns']) if kwargs.__contains__('columns') else self.dict_keys_toggle()
if kwargs.__contains__('verify_status'):
if kwargs['verify_status'] is True:
_dict_['认证状态'] = '已认证' if _dict_.pop('已认证') == '' else '未认证'
if _dict_.__contains__('密码'):
_dict_['密码'] = generate_password_hash(_dict_['密码'])
return _dict_
def login(self, **kwargs):
"""登录"""
def register(self, **kwargs):
"""注册"""
def get_company_user_info(self):
"""获取企业用户信息"""
def get_avatar(self):
"""获取用户头像"""
def change_avatar(self, **kwargs):
"""修改用户头像"""
def change_password(self, **kwargs):
"""修改登录密码"""
def change_email(self, **kwargs):
"""修改登录邮箱"""

View File

@ -1,5 +1,5 @@
from flask import Blueprint, request
from CompanyUser.CompanyUserAuth import verify_token, verify_real_company
from Utils.AuthUtil import verify_token, verify_real_company
from Rating.Credit.CreditImpl import CreditRating
from Utils.ErrorUtil import ReturnConditionCheckFailed

View File

@ -1,5 +1,5 @@
from flask import Blueprint, request
from CompanyUser.CompanyUserAuth import verify_token, verify_real_company
from Utils.AuthUtil import verify_token
from Rating.Esg.EsgImpl import EsgRating
from Utils.ErrorUtil import ReturnConditionCheckFailed

View File

@ -1,7 +1,7 @@
from flask import Blueprint
from CompanyUser.CompanyUserAuth import verify_token
from Utils.AuthUtil import verify_token
from Utils.ErrorUtil import ReturnConditionCheckFailed
from Implements.Company.Index import CompanyIndexImpl

View File

@ -0,0 +1,25 @@
from flask import Blueprint, request, Response
from Implements.Others.EmailNoticeImpl import EmailNoticeImpl
from Utils.AuthUtil import check_block
from Utils.ErrorUtil import ReturnConditionCheckFailed
email_route = Blueprint('email', __name__)
@email_route.route('/send_verify_info', methods=['POST'])
@check_block
def verify_email():
"""发送邮箱验证码"""
try:
req = request.json
impl = EmailNoticeImpl()
impl.email = req['email']
impl.v_type = req['v_type']
impl.send_email()
return {"info": "邮件已发送"}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.failed_info}, e.status_code
except KeyError:
return {"info": "参数异常"}, 400

0
Routes/Other/__init__.py Normal file
View File

View File

@ -1,10 +1,10 @@
from flask import Blueprint, request, Response
from Implements.User.RealCompanyVerifyImpl import UploadMaterialImpl, RealCompanyVerifyImpl
from Implements.User.TFSECompanyUserImpl import TFSECompanyUserImpl
from common.rsa import decrypt_data
from CompanyUser.CompanyUserAuth import check_block, verify_token
from Utils.AuthUtil import check_block, verify_token
from Utils.ErrorUtil import ReturnConditionCheckFailed
from CompanyUser.CompanyUserImpl import CompanyUserImpl, EmailVerifyCodeRecordImpl, VerifyMaterialImpl, \
CompanyVerifyThreeFactorsImpl
company_user_route = Blueprint('user', __name__)
@ -14,15 +14,14 @@ company_user_route = Blueprint('user', __name__)
def login():
"""企业用户登录"""
try:
company_user = CompanyUserImpl()
company_user.email = request.json['email']
impl = TFSECompanyUserImpl()
impl.email = request.json['email']
try:
company_user.pwd = decrypt_data(encrypt_msg=request.json['pwd'])
impl.pwd = decrypt_data(encrypt_msg=request.json['pwd'])
except Exception:
return {"info": "登录密码错误"}, 200
result = company_user.login()
impl.login()
result = impl.dict_to_return(columns=["企业名称", "已认证", "token"], verify_status=True)
return {"info": "登陆成功", "result": result}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
@ -37,9 +36,12 @@ def register():
"""企业用户注册"""
try:
req = request.json
company_user = CompanyUserImpl()
company_user = TFSECompanyUserImpl()
company_user.email = req['email']
company_user.pwd = req['pwd']
try:
company_user.pwd = decrypt_data(encrypt_msg=req['pwd'])
except Exception:
return {"info": "密码格式错误"}, 200
company_user.register(req['code'])
response = {"info": "注册成功"}, 200
return response
@ -55,10 +57,11 @@ def register():
def user_info(**kwargs):
"""获取企业用户信息"""
try:
company_user = CompanyUserImpl()
company_user.cid = kwargs['cid']
result = company_user.get_company_user_info()
return {"info": "查询结果", "result": result}
impl = TFSECompanyUserImpl()
impl.cid = kwargs['cid']
impl.get_company_user_info()
result= impl.dict_to_return(columns=["企业ID", "企业名称", "邮箱", "已认证"], verify_status=True)
return {"info": "查询结果", "result": result}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.failed_info}, e.status_code
@ -71,9 +74,9 @@ def user_info(**kwargs):
def get_avatar(**kwargs):
"""获取头像"""
try:
company_user = CompanyUserImpl()
company_user.cid = kwargs['cid']
response = Response(company_user.get_avatar(), content_type='image/jpeg')
impl = TFSECompanyUserImpl()
impl.cid = kwargs['cid']
response = Response(impl.get_avatar(), content_type='image/jpeg')
return response
except ReturnConditionCheckFailed as e:
e.log_error()
@ -87,9 +90,9 @@ def get_avatar(**kwargs):
def change_avatar(**kwargs):
"""修改头像"""
try:
company_user = CompanyUserImpl()
company_user.cid = kwargs['cid']
company_user.change_avatar(request.json['avatar_id'])
impl = TFSECompanyUserImpl()
impl.cid = kwargs['cid']
impl.change_avatar(request.json['avatar_id'])
return {"info": "修改成功"}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
@ -104,10 +107,10 @@ def change_pwd():
"""修改密码"""
try:
req = request.json
company_user = CompanyUserImpl()
company_user.email = req['email']
company_user.pwd = decrypt_data(encrypt_msg=req['pwd'])
company_user.change_password(req['code'])
impl = TFSECompanyUserImpl()
impl.email = req['email']
impl.pwd = decrypt_data(encrypt_msg=req['pwd'])
impl.change_password(req['code'])
return {"info": "修改成功"}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
@ -123,10 +126,10 @@ def change_email(**kwargs):
"""更换账号邮箱"""
try:
req = request.json
company_user = CompanyUserImpl()
company_user.cid = kwargs['cid']
company_user.email = req['email']
company_user.change_email(req['code'])
impl = TFSECompanyUserImpl()
impl.cid = kwargs['cid']
impl.email = req['email']
impl.change_email(req['code'])
return {"info": "修改成功"}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
@ -135,30 +138,12 @@ def change_email(**kwargs):
return {"info": "参数异常"}, 400
@company_user_route.route('/verify_email', methods=['POST'])
@check_block
def verify_email():
"""发送邮箱验证码"""
try:
req = request.json
email_verify = EmailVerifyCodeRecordImpl()
email_verify.email = req['email']
email_verify.v_type = req['v_type']
email_verify.send_email()
return {"info": "邮件已发送"}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.failed_info}, e.status_code
except KeyError:
return {"info": "参数异常"}, 400
@company_user_route.route('/upload_id_card', methods=['POST'])
@verify_token
def upload_id_card(**kwargs):
"""上传身份证"""
try:
material = VerifyMaterialImpl()
material = UploadMaterialImpl()
material.cid = kwargs['cid']
material.image = request.files['image']
result = material.upload_id_card()
@ -175,7 +160,7 @@ def upload_id_card(**kwargs):
def upload_business_license(**kwargs):
"""上传营业执照"""
try:
material = VerifyMaterialImpl()
material = UploadMaterialImpl()
material.cid = kwargs['cid']
material.image = request.files['image']
result = material.upload_business_license()
@ -192,9 +177,9 @@ def upload_business_license(**kwargs):
def company_verify_route(**kwargs):
"""企业认证"""
try:
company_verify = CompanyVerifyThreeFactorsImpl()
company_verify.cid = kwargs['cid']
company_verify.company_verify()
impl = RealCompanyVerifyImpl()
impl.cid = kwargs['cid']
impl.verify()
return {"info": "认证成功"}, 200
except ReturnConditionCheckFailed as e:
e.log_error()

0
Routes/User/__init__.py Normal file
View File

View File

@ -1,7 +1,8 @@
from flask import Blueprint, request, Response
from werkzeug.security import generate_password_hash
from flask import Blueprint, request
from CompanyUser.CompanyUserImpl import CompanyVerifyThreeFactorsImpl, CompanyUserImpl, EmailVerifyCodeRecordImpl
from Implements.Others.EmailNoticeImpl import EmailNoticeImpl
from Implements.User.RealCompanyVerifyImpl import RealCompanyVerifyImpl
from Implements.User.TFSECompanyUserImpl import TFSECompanyUserImpl
from Rating.Credit.CreditImpl import CreditRating
from Rating.Esg.EsgImpl import EsgRating
from TestForAdmin.APIAuth import api_secret
@ -15,9 +16,9 @@ test_admin_route = Blueprint('test_admin', __name__)
def company_verify_route():
"""企业认证"""
try:
company_verify = CompanyVerifyThreeFactorsImpl()
company_verify.cid = request.args['cid']
company_verify.company_verify()
impl = RealCompanyVerifyImpl()
impl.cid = request.args['cid']
impl.verify()
return {"info": "认证成功"}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
@ -62,10 +63,10 @@ def verify_email():
try:
req = request.json
# 发送验证码
email_verify = EmailVerifyCodeRecordImpl()
email_verify.email = req['email']
email_verify.v_type = 'register'
email_verify.send_email()
impl = EmailNoticeImpl()
impl.email = req['email']
impl.v_type = 'register'
impl.send_email()
return {"info": "邮件已发送"}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
@ -86,10 +87,10 @@ def register():
try:
req = request.json
# 注册
company_user = CompanyUserImpl()
company_user.email = req['email']
company_user.pwd = generate_password_hash("Fecr1988.")
company_user.register(req['code'])
impl = TFSECompanyUserImpl()
impl.email = req['email']
impl.pwd = "Fecr1988."
impl.register(req['code'])
return {"info": "注册成功"}, 200
except ReturnConditionCheckFailed as e:
e.log_error()

View File

@ -8,21 +8,21 @@ class SpecObject(object):
def dict_keys_toggle(self, **kwargs):
"""字典键值切换"""
_dict_ = dict()
default_types = ['str', 'int', 'float', 'dict', 'bool', 'tuple']
for key in self.__dict__.keys():
if type(self.__dict__[key]).__name__ in ['str', 'int', 'float', 'dict', 'bool', 'tuple']:
if type(self.__dict__[key]).__name__ in default_types:
_dict_[self.fields_map[key]] = self.__dict__[key]
elif type(self.__dict__[key]).__name__ == 'list':
if len(self.__dict__[key]) == 0:
_dict_[self.fields_map[key]] = self.__dict__[key]
elif type(self.__dict__[key][0]).__name__ in ['str', 'int', 'float', 'dict', 'bool', 'tuple']:
elif type(self.__dict__[key][0]).__name__ in default_types:
_dict_[self.fields_map[key]] = self.__dict__[key]
else:
_dict_[self.fields_map[key]] = [item.dict_to_save() for item in self.__dict__[key]]
_dict_[self.fields_map[key]] = [item.dict_keys_toggle() for item in self.__dict__[key]]
elif self.__dict__[key] is None:
_dict_[self.fields_map[key]] = self.__dict__[key]
else:
_dict_[self.fields_map[key]] = self.__dict__[key].dict_to_show()
_dict_[self.fields_map[key]] = self.__dict__[key].dict_keys_toggle()
if 'columns' in kwargs:
_dict_ = {key: _dict_[key] for key in kwargs['columns']}
@ -61,3 +61,13 @@ class SpecObject(object):
list_.append(instance)
return list_
@staticmethod
def get_attr(_dict_, _key_, **kwargs):
"""获取字典属性值"""
value = kwargs['default'] if kwargs.__contains__('default') else None
try:
value = _dict_[_key_]
except KeyError:
pass
return value

View File

@ -21,6 +21,13 @@ class Validate(object):
case = (len(param) >= 8) and (re.match(regex, param) is not None)
return True if case else False
@staticmethod
def telephone(param):
"""手机号格式"""
regex = "(^(13[0-9]|14[01456879]|15[0-3,5-9]|16[2567]|17[0-8]|18[0-9]|19[0-3,5-9])d{8}$)"
case = re.match(regex, param) is not None
return True if case else False
@staticmethod
def date_format(param):
"""yyyy-mm-dd"""

4
app.py
View File

@ -3,8 +3,9 @@ from flask_cors import *
from Routes.Company.Index import company_route
from Routes.Other.EmailNoticeRoute import email_route
from Routes.User.TFSECompanyUserRoute import company_user_route
from CompanyUser.CompanyUserRoute import company_user_route
from file.file_routes import file_route
from Rating.Credit.CreditRoutes import credit_route
from Rating.Esg.EsgRoutes import esg_route
@ -18,6 +19,7 @@ app.config['JSON_SORT_KEYS'] = False
app.register_blueprint(company_route, url_prefix='/app/company')
app.register_blueprint(file_route, url_prefix='/app/file')
app.register_blueprint(company_user_route, url_prefix='/app/user')
app.register_blueprint(email_route, url_prefix='/app/notice/email')
app.register_blueprint(credit_route, url_prefix='/app/input')
app.register_blueprint(esg_route, url_prefix='/app/esg')
app.register_blueprint(test_admin_route, url_prefix='/app/test')

View File

@ -1,6 +1,6 @@
from flask import Blueprint, request, Response
from CompanyUser.CompanyUserAuth import verify_token, verify_report_view_auth
from Utils.AuthUtil import verify_token, verify_report_view_auth
from file.file_obj import TfseFile
file_route = Blueprint('file', __name__)