From e45cd4a168d79e6305f0b8743e7e98cfe22b5c28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E6=80=9D=E5=B7=9D?= Date: Wed, 27 Apr 2022 15:12:49 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=86=B2=E7=AA=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CompanyUser/CompanyUserImpl.py | 565 --------------------------------- TestForAdmin/route.py | 26 +- 2 files changed, 14 insertions(+), 577 deletions(-) delete mode 100644 CompanyUser/CompanyUserImpl.py diff --git a/CompanyUser/CompanyUserImpl.py b/CompanyUser/CompanyUserImpl.py deleted file mode 100644 index 5a2033a..0000000 --- a/CompanyUser/CompanyUserImpl.py +++ /dev/null @@ -1,565 +0,0 @@ -import json -import time -import random -import base64 -import requests -from urllib.request import Request, urlopen - -from werkzeug.security import check_password_hash -from itsdangerous import TimedJSONWebSignatureSerializer as Serializer - -from DBHelper.MongoHelper import MongoHelper -from Utils.ErrorUtil import ReturnConditionCheckFailed, LogConditionCheckFailed -from CompanyUser.CompanyUserObj import CompanyUser, VerifyInfo, EmailVerifyCodeRecord, VerifyMaterial, \ - CompanyVerifyThreeFactors -from Utils.ValidateUtil import ValidateAttr - - -class CompanyUserImpl(CompanyUser): - """企业用户实现类""" - - db = MongoHelper("tfse_v0.21") - - def check_email_registered(self): - """检查邮箱已被注册""" - email = self.db.find_single_column( - "应用端", - "企业用户", - {"邮箱": self.email}, - "邮箱" - ) - if email is not None: - raise ReturnConditionCheckFailed("邮箱已被注册", 200) - - def check_email_not_registered(self): - """检查邮箱未被注册""" - email = self.db.find_single_column( - "应用端", - "企业用户", - {"邮箱": self.email}, - "邮箱" - ) - if email is None: - raise ReturnConditionCheckFailed("邮箱未被注册", 200) - - def check_vcode_correct(self, code): - """检查邮箱验证码是否正确""" - verify_data = self.db.find_single_data( - "应用端", - "邮箱验证码记录", - {"邮箱": self.email}, - ["验证码", "验证有效期"] - ) - - if not verify_data: - raise ReturnConditionCheckFailed("验证码错误", 200) - - if verify_data['验证码'] != code: - raise ReturnConditionCheckFailed("验证码错误", 200) - - if time.time() - verify_data['验证有效期'] > 300: - raise ReturnConditionCheckFailed('验证码过期', 200) - - def login(self): - """登录""" - - company_user = self.db.find_single_data( - "应用端", - "企业用户", - {"邮箱": self.email}, - ["企业ID", "企业名称", "邮箱", "密码", "已认证"] - ) - - def check_user_existed(): - """检查用户是否存在""" - if company_user is None: - raise ReturnConditionCheckFailed("不存在该用户", 200) - - def check_password_correct(): - """检查密码是否正确""" - if not check_password_hash(company_user['密码'], self.pwd): - raise ReturnConditionCheckFailed("登录密码错误", 200) - - def create_token(param): - """ - 创建token - param: 传入参数,用于创建token - return: 用户token - """ - secret_key = '0FTuOi^#Afx1@2@F' - token_expire = 14400 - s = Serializer(secret_key, expires_in=token_expire) - token = '' + s.dumps(param).decode('ascii') - return token - - def __main__(): - """""" - check_user_existed() - check_password_correct() - - self.cid = company_user['企业ID'] - self.name = company_user['企业名称'] - self.verify_status = company_user['已认证'] - - return_body = self.dict_to_show(columns=["企业名称", "认证状态", "token"]) - return_body['token'] = create_token({"cid": self.cid}) - - return return_body - - return __main__() - - def register(self, code): - """注册""" - - def make_new_cid(): - """生成新的企业ID,如果该ID存在,则重新生成""" - - def random_cid(num): - """随机企业ID""" - choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789' - salt = '' - for i in range(num): - salt += random.choice(choices) - return salt - - new_cid = random_cid(8) - - case = self.db.find_single_column( - "应用端", - "企业用户", - {"企业ID": new_cid}, - "企业ID" - ) is not None - while case: - new_cid = random_cid(8) - return new_cid - - def __main__(): - """""" - self.check_email_registered() - self.check_vcode_correct(code) - - self.cid = make_new_cid() - self.avatar_id = "623152edf36000004f00124e" - self.verify_status = "否" - self.verify_info = VerifyInfo() - - self.db.insert_single_data( - "应用端", - "企业用户", - self.dict_to_save() - ) - - self.db.delete_single_data( - "应用端", - "邮箱验证码记录", - {"邮箱": self.email} - ) - - __main__() - - def get_company_user_info(self): - """获取企业用户信息""" - company_user_data = self.db.find_single_data( - "应用端", - "企业用户", - {"企业ID": self.cid}, - ["企业ID", "企业名称", "邮箱", "已认证"] - ) - - self.name = company_user_data['企业名称'] - self.email = company_user_data['邮箱'] - self.verify_status = company_user_data['已认证'] - - return self.dict_to_show(columns=["企业ID", "企业名称", "邮箱", "认证状态"]) - - def get_avatar(self): - """获取用户头像""" - - self.avatar_id = self.db.find_single_column( - "应用端", - "企业用户", - {"企业ID": self.cid}, - "头像fid" - ) - - avatar = self.db.find_file( - "文件", - "企业用户头像", - self.avatar_id - ) - - return avatar - - def change_avatar(self, avatar_option): - """修改企业用户头像""" - avatars = { - "avatar_01": "623152edf36000004f00124e", - "avatar_02": "623152edf36000004f001250", - "avatar_03": "623152edf36000004f00124f", - "avatar_04": "623152edf36000004f001251", - "avatar_05": "623152edf36000004f001252", - "avatar_06": "623152edf36000004f001253" - } - self.avatar_id = avatars[avatar_option] - self.db.update_single_data( - "应用端", - "企业用户", - {"企业ID": self.cid}, - {"头像fid": self.avatar_id} - ) - - def change_password(self, code): - """修改登录密码""" - self.check_email_not_registered() - self.check_vcode_correct(code) - self.db.update_single_data( - "应用端", - "企业用户", - {"邮箱": self.email}, - self.dict_to_save(columns=["密码"]) - ) - - def change_email(self, code): - """更换邮箱""" - self.check_vcode_correct(code) - self.check_email_registered() - self.db.update_single_data( - "应用端", - "企业用户", - {"企业ID": self.cid}, - self.dict_to_save(columns=["邮箱"]) - ) - - -class VerifyMaterialImpl(VerifyMaterial): - """认证材料实现类""" - - db = MongoHelper("tfse_v0.21") - - @staticmethod - def read_img_by_base64(img_file): - """读取base64编码格式的图片""" - data = img_file.read() - try: - encode_str = str(base64.b64encode(data), 'utf-8') - except TypeError: - encode_str = base64.b64encode(data) - return encode_str - - def check_is_verified(self): - """检查企业是否已认证""" - self.verify_status = self.db.find_single_column( - "应用端", - "企业用户", - {"企业ID": self.cid}, - "已认证" - ) - if self.verify_status == "是": - raise ReturnConditionCheckFailed("企业已认证", 200) - - def upload_id_card(self): - """上传身份证""" - - def trans_date_format(_time): - """将解析出来的时间数据加上年月日""" - year = _time[:4] - month = _time[4:6] - day = _time[6:8] - return year + '.' + month + '.' + day - - def id_card_recognition(side): - """调用身份证图片识别接口""" - - url = 'http://dm-51.data.aliyun.com/rest/160601/ocr/ocr_idcard.json' - APPCODE = "6d6c3146677c40bf9e3e1e2aa73404cd" - - # 请求头 - headers = { - 'Authorization': 'APPCODE %s' % APPCODE, - 'Content-Type': 'application/json; charset=UTF-8' - } - - # 请求体 - body = { - "configure": {'side': side}, - "image": self.read_img_by_base64(self.image) - } - - try: - params = json.dumps(body).encode(encoding='UTF8') - req = Request(url, params, headers) - res = urlopen(req) - html = res.read() - result = json.loads(html.decode("utf8")) - - if result['is_fake']: - raise ReturnConditionCheckFailed("伪造身份证", 200) - - id_card = dict() - if side == 'face': - id_card['姓名'] = result['name'] - id_card['性别'] = result['sex'] - id_card['民族'] = result['nationality'] - id_card['出生'] = "{}年{}月{}日".format(result['birth'][:4], result['birth'][4:6], result['birth'][6:]) - id_card['住址'] = result['address'] - id_card['身份证号码'] = result['num'] - else: - id_card['签发机关'] = result['issue'] - id_card['有效期限'] = '{}-{}'.format(trans_date_format(result['start_date']), trans_date_format(result['end_date'])) - return id_card - except Exception: - raise ReturnConditionCheckFailed("身份证识别失败", 200) - - def id_card_recognition_result(): - """ - 身份证文本识别 - cid: 企业ID - image: 图片 - return: 识别结果 - """ - - # 身份证正面文本识别 - result = id_card_recognition(side='face') - - # 重置image读取位置,不然下一步读不出图片 - self.image.seek(0) - - # 更新企业认证中身份证的信息 - self.db.update_single_data( - "应用端", - "企业用户", - {"企业ID": self.cid}, - {"认证信息.法人姓名": result['姓名'], "认证信息.法人身份证": result['身份证号码']} - ) - - return {"姓名": result['姓名'], "身份证号码": result['身份证号码']} - - def __main__(): - """""" - self.check_is_verified() - return id_card_recognition_result() - - return __main__() - - def upload_business_license(self): - """上传营业执照""" - - def trans_date_format(_time): - """将解析出来的时间数据加上年月日""" - year = _time[:4] - month = _time[4:6] - day = _time[6:8] - return year + '年' + month + '月' + day + '日' - - def business_license_recognition(): - """调用营业执照识别接口""" - url = 'https://dm-58.data.aliyun.com/rest/160601/ocr/ocr_business_license.json' - app_code = "6d6c3146677c40bf9e3e1e2aa73404cd" - - headers = { - 'Authorization': 'APPCODE %s' % app_code, - 'Content-Type': 'application/json; charset=UTF-8' - } - - body = { - "image": self.read_img_by_base64(self.image) - } - - response = requests.post(url=url, data=json.dumps(body), headers=headers) - - try: - res = json.loads(response.text) - data = dict() - data['统一社会信用代码'] = res['reg_num'] - data['名称'] = res['name'] - data['注册资本'] = res['capital'] - data['类型'] = res['type'] - data['成立日期'] = trans_date_format(res['establish_date']) - data['法定代表人'] = res['person'] - data['经营范围'] = res['business'] - data['住所'] = res['address'] - return data - except Exception: - raise ReturnConditionCheckFailed("营业执照识别异常", 200) - - def business_license_recognition_result(): - """营业执照识别结果""" - - # 识别营业执照 - result = business_license_recognition() - - # 图片识别正确 更新企业认证中营业执照的信息 - self.db.update_single_data( - "应用端", - "企业用户", - {"企业ID": self.cid}, - {"认证信息.企业名称": result['名称'], "认证信息.统一社会信用代码": result['统一社会信用代码']} - ) - return {"公司名称": result['名称'], "统一社会信用代码": result['统一社会信用代码']} - - def _main__(): - """""" - self.check_is_verified() - return business_license_recognition_result() - - return _main__() - - -class CompanyVerifyThreeFactorsImpl(CompanyVerifyThreeFactors): - """企业认证三要素实现类""" - - db = MongoHelper("tfse_v0.21") - - def company_verify(self): - """企业认证""" - - def get_verify_info(): - """获取企业认证信息""" - record = self.db.find_single_data( - "应用端", - "企业用户", - {"企业ID": self.cid}, - ["认证信息.企业名称", "认证信息.法人姓名", "认证信息.统一社会信用代码"] - ) - - if not record: - raise ReturnConditionCheckFailed("企业认证信息异常", 200) - - verify_info = record['认证信息'] - self.code = verify_info['统一社会信用代码'] - self.name = verify_info['企业名称'] - self.legal_person = verify_info['法人姓名'] - - def check_verify_info_complete(): - """检查认证信息是否完整""" - if None in [self.code, self.name, self.legal_person]: - raise ReturnConditionCheckFailed("企业认证信息异常", 200) - - def check_three_elements_by_tyc(): - """调用天眼查企业三要素验证接口""" - token = "5407573b-e41e-4ab4-9f46-daa0032ad0a4" - base = "http://open.api.tianyancha.com/services/open/ic/verify/2.0?code={}&name={}&legalPersonName={}" - url = base.format(self.code, self.name, self.legal_person) - headers = {'Authorization': token} - res = json.loads(requests.get(url, headers=headers).text) - try: - if res['result']['result'] != 1: - raise ReturnConditionCheckFailed("企业认证未通过", 200) - except Exception: - raise ReturnConditionCheckFailed("认证接口异常", 200) - - def init_company_data(): - """初始化企业主页数据""" - init_url = "http://api.fecribd.com/etl_tfse/company/init_company_data" - init_token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4" - init_headers = {'token': init_token, "Content-Type": "application/json;charset=UTF-8"} - data = json.dumps({"cid": self.cid, "company_name": self.name}) - response = requests.post(init_url, headers=init_headers, data=data) - - res = json.loads(response.text) - if res['info'] == "企业数据初始化成功": - raise ReturnConditionCheckFailed("企业数据初始化异常", 200) - else: - company_user = CompanyUser() - company_user.cid = self.cid - company_user.name = self.name - company_user.verify_status = "是" - - self.db.upsert_single_data( - "应用端", - "企业用户", - {"企业ID": self.cid}, - company_user.dict_to_save(columns=["企业名称", "已认证"]) - ) - - def __main__(): - get_verify_info() - check_verify_info_complete() - check_three_elements_by_tyc() - init_company_data() - - return __main__() - - -class EmailVerifyCodeRecordImpl(EmailVerifyCodeRecord): - """用户通知邮件实现类""" - - db = MongoHelper("tfse_v0.21") - - def make_verify_code(self): - """生成邮箱验证码""" - - def make_verify_code(): - """随机生成6位数字验证码""" - num_code = "" - for i in range(6): - ch = chr(random.randrange(ord('0'), ord('9') + 1)) - num_code += ch - return num_code - - self.v_code = make_verify_code() - self.v_period = time.time() + 300 - self.db.upsert_single_data( - "应用端", - "邮箱验证码记录", - {"邮箱": self.email}, - self.dict_to_save() - ) - - def transfer_email_api(self): - """发送邮件""" - # 邮箱服务接口 - email_api = 'http://116.63.130.34:30001' - - v_types = { - "register": "注册账号", - "resetpwd": "修改密码", - "resetemail": "修改邮箱" - } - - headers = {"Content-Type": "application/json;charset=UTF-8"} - data = { - "title": "【远东资信】{}".format(v_types[self.v_type]), - "sender": 'fecribd@fecr.com.cn', - "recipients": [self.email], - "msg_body": "您{}的验证码为 【{}】,5分钟内有效。".format(v_types[self.v_type], self.v_code) - } - - response = requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data)) - return response.text - - def check_email_registered(self): - """检查邮箱已被注册""" - email = self.db.find_single_column( - "应用端", - "企业用户", - {"邮箱": self.email}, - "邮箱" - ) - if email is not None: - raise ReturnConditionCheckFailed("邮箱已被注册", 200) - - def check_email_not_registered(self): - """检查邮箱未被注册""" - email = self.db.find_single_column( - "应用端", - "企业用户", - {"邮箱": self.email}, - "邮箱" - ) - if email is None: - raise ReturnConditionCheckFailed("邮箱未被注册", 200) - - def send_email(self): - """""" - - if self.v_type == 'register' or self.v_type == 'resetemail': - self.check_email_registered() - self.make_verify_code() - self.transfer_email_api() - - elif self.v_type == 'resetpwd': - self.check_email_not_registered() - self.make_verify_code() - self.transfer_email_api() diff --git a/TestForAdmin/route.py b/TestForAdmin/route.py index b5575d3..b4224e9 100644 --- a/TestForAdmin/route.py +++ b/TestForAdmin/route.py @@ -1,7 +1,9 @@ from flask import Blueprint, request, Response from werkzeug.security import generate_password_hash -from CompanyUser.CompanyUserImpl import CompanyVerifyThreeFactorsImpl, CompanyUserImpl, EmailVerifyCodeRecordImpl +from Implements.Others.EmailNoticeImpl import EmailNoticeImpl +from Implements.User.RealCompanyVerifyImpl import RealCompanyVerifyImpl +from Implements.User.TFSECompanyUserImpl import TFSECompanyUserImpl from Rating.Credit.CreditImpl import CreditRating from Rating.Esg.EsgImpl import EsgRating from TestForAdmin.APIAuth import api_secret @@ -15,9 +17,9 @@ test_admin_route = Blueprint('test_admin', __name__) def company_verify_route(): """企业认证""" try: - company_verify = CompanyVerifyThreeFactorsImpl() - company_verify.cid = request.args['cid'] - company_verify.company_verify() + impl = RealCompanyVerifyImpl() + impl.cid = request.args['cid'] + impl.verify() return {"info": "认证成功"}, 200 except ReturnConditionCheckFailed as e: e.log_error() @@ -62,10 +64,10 @@ def verify_email(): try: req = request.json # 发送验证码 - email_verify = EmailVerifyCodeRecordImpl() - email_verify.email = req['email'] - email_verify.v_type = 'register' - email_verify.send_email() + impl = EmailNoticeImpl() + impl.email = req['email'] + impl.v_type = 'register' + impl.send_email() return {"info": "邮件已发送"}, 200 except ReturnConditionCheckFailed as e: e.log_error() @@ -86,10 +88,10 @@ def register(): try: req = request.json # 注册 - company_user = CompanyUserImpl() - company_user.email = req['email'] - company_user.pwd = generate_password_hash("Fecr1988.") - company_user.register(req['code']) + impl = TFSECompanyUserImpl() + impl.email = req['email'] + impl.pwd = "Fecr1988." + impl.register(req['code']) return {"info": "注册成功"}, 200 except ReturnConditionCheckFailed as e: e.log_error()