import json import time import random import base64 import requests from urllib.request import Request, urlopen from werkzeug.security import check_password_hash from itsdangerous import TimedJSONWebSignatureSerializer as Serializer from DBHelper.MongoHelper import MongoHelper from Utils.ErrorUtil import ReturnConditionCheckFailed, LogConditionCheckFailed from CompanyUser.CompanyUserObj import CompanyUser, VerifyInfo, EmailVerifyCodeRecord, VerifyMaterial, \ CompanyVerifyThreeFactors from Utils.ValidateUtil import ValidateAttr class CompanyUserImpl(CompanyUser): """企业用户实现类""" db = MongoHelper("tfse_v0.21") def check_email_registered(self): """检查邮箱已被注册""" email = self.db.find_single_column( "应用端", "企业用户", {"邮箱": self.email}, "邮箱" ) if email is not None: raise ReturnConditionCheckFailed("邮箱已被注册", 200) def check_email_not_registered(self): """检查邮箱未被注册""" email = self.db.find_single_column( "应用端", "企业用户", {"邮箱": self.email}, "邮箱" ) if email is None: raise ReturnConditionCheckFailed("邮箱未被注册", 200) def check_vcode_correct(self, code): """检查邮箱验证码是否正确""" verify_data = self.db.find_single_data( "应用端", "邮箱验证码记录", {"邮箱": self.email}, ["验证码", "验证有效期"] ) if not verify_data: raise ReturnConditionCheckFailed("验证码错误", 200) if verify_data['验证码'] != code: raise ReturnConditionCheckFailed("验证码错误", 200) if time.time() - verify_data['验证有效期'] > 300: raise ReturnConditionCheckFailed('验证码过期', 200) def login(self): """登录""" company_user = self.db.find_single_data( "应用端", "企业用户", {"邮箱": self.email}, ["企业ID", "企业名称", "邮箱", "密码", "已认证"] ) def check_user_existed(): """检查用户是否存在""" if company_user is None: raise ReturnConditionCheckFailed("不存在该用户", 200) def check_password_correct(): """检查密码是否正确""" if not check_password_hash(company_user['密码'], self.pwd): raise ReturnConditionCheckFailed("登录密码错误", 200) def create_token(param): """ 创建token param: 传入参数,用于创建token return: 用户token """ secret_key = '0FTuOi^#Afx1@2@F' token_expire = 14400 s = Serializer(secret_key, expires_in=token_expire) token = '' + s.dumps(param).decode('ascii') return token def __main__(): """""" check_user_existed() check_password_correct() self.cid = company_user['企业ID'] self.name = company_user['企业名称'] self.verify_status = company_user['已认证'] return_body = self.dict_to_show(columns=["企业名称", "认证状态", "token"]) return_body['token'] = create_token({"cid": self.cid}) return return_body return __main__() def register(self, code): """注册""" def make_new_cid(): """生成新的企业ID,如果该ID存在,则重新生成""" def random_cid(num): """随机企业ID""" choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789' salt = '' for i in range(num): salt += random.choice(choices) return salt new_cid = random_cid(8) case = self.db.find_single_column( "应用端", "企业用户", {"企业ID": new_cid}, "企业ID" ) is not None while case: new_cid = random_cid(8) return new_cid def __main__(): """""" self.check_email_registered() self.check_vcode_correct(code) self.cid = make_new_cid() self.avatar_id = "623152edf36000004f00124e" self.verify_status = "否" self.verify_info = VerifyInfo() self.db.insert_single_data( "应用端", "企业用户", self.dict_to_save() ) self.db.delete_single_data( "应用端", "邮箱验证码记录", {"邮箱": self.email} ) __main__() def get_company_user_info(self): """获取企业用户信息""" company_user_data = self.db.find_single_data( "应用端", "企业用户", {"企业ID": self.cid}, ["企业ID", "企业名称", "邮箱", "已认证"] ) self.name = company_user_data['企业名称'] self.email = company_user_data['邮箱'] self.verify_status = company_user_data['已认证'] return self.dict_to_show(columns=["企业ID", "企业名称", "邮箱", "认证状态"]) def get_avatar(self): """获取用户头像""" self.avatar_id = self.db.find_single_column( "应用端", "企业用户", {"企业ID": self.cid}, "头像fid" ) avatar = self.db.find_file( "文件", "企业用户头像", self.avatar_id ) return avatar def change_avatar(self, avatar_option): """修改企业用户头像""" avatars = { "avatar_01": "623152edf36000004f00124e", "avatar_02": "623152edf36000004f001250", "avatar_03": "623152edf36000004f00124f", "avatar_04": "623152edf36000004f001251", "avatar_05": "623152edf36000004f001252", "avatar_06": "623152edf36000004f001253" } self.avatar_id = avatars[avatar_option] self.db.update_single_data( "应用端", "企业用户", {"企业ID": self.cid}, {"头像fid": self.avatar_id} ) def change_password(self, code): """修改登录密码""" self.check_email_not_registered() self.check_vcode_correct(code) self.db.update_single_data( "应用端", "企业用户", {"邮箱": self.email}, self.dict_to_save(columns=["密码"]) ) def change_email(self, code): """更换邮箱""" self.check_vcode_correct(code) self.check_email_registered() self.db.update_single_data( "应用端", "企业用户", {"企业ID": self.cid}, self.dict_to_save(columns=["邮箱"]) ) class VerifyMaterialImpl(VerifyMaterial): """认证材料实现类""" db = MongoHelper("tfse_v0.21") @staticmethod def read_img_by_base64(img_file): """读取base64编码格式的图片""" data = img_file.read() try: encode_str = str(base64.b64encode(data), 'utf-8') except TypeError: encode_str = base64.b64encode(data) return encode_str def check_is_verified(self): """检查企业是否已认证""" self.verify_status = self.db.find_single_column( "应用端", "企业用户", {"企业ID": self.cid}, "已认证" ) if self.verify_status == "是": raise ReturnConditionCheckFailed("企业已认证", 200) def upload_id_card(self): """上传身份证""" def trans_date_format(_time): """将解析出来的时间数据加上年月日""" year = _time[:4] month = _time[4:6] day = _time[6:8] return year + '.' + month + '.' + day def id_card_recognition(side): """调用身份证图片识别接口""" url = 'http://dm-51.data.aliyun.com/rest/160601/ocr/ocr_idcard.json' APPCODE = "6d6c3146677c40bf9e3e1e2aa73404cd" # 请求头 headers = { 'Authorization': 'APPCODE %s' % APPCODE, 'Content-Type': 'application/json; charset=UTF-8' } # 请求体 body = { "configure": {'side': side}, "image": self.read_img_by_base64(self.image) } try: params = json.dumps(body).encode(encoding='UTF8') req = Request(url, params, headers) res = urlopen(req) html = res.read() result = json.loads(html.decode("utf8")) if result['is_fake']: raise ReturnConditionCheckFailed("伪造身份证", 200) id_card = dict() if side == 'face': id_card['姓名'] = result['name'] id_card['性别'] = result['sex'] id_card['民族'] = result['nationality'] id_card['出生'] = "{}年{}月{}日".format(result['birth'][:4], result['birth'][4:6], result['birth'][6:]) id_card['住址'] = result['address'] id_card['身份证号码'] = result['num'] else: id_card['签发机关'] = result['issue'] id_card['有效期限'] = '{}-{}'.format(trans_date_format(result['start_date']), trans_date_format(result['end_date'])) return id_card except Exception: raise ReturnConditionCheckFailed("身份证识别失败", 200) def id_card_recognition_result(): """ 身份证文本识别 cid: 企业ID image: 图片 return: 识别结果 """ # 身份证正面文本识别 result = id_card_recognition(side='face') # 重置image读取位置,不然下一步读不出图片 self.image.seek(0) # 更新企业认证中身份证的信息 self.db.update_single_data( "应用端", "企业用户", {"企业ID": self.cid}, {"认证信息.法人姓名": result['姓名'], "认证信息.法人身份证": result['身份证号码']} ) return {"姓名": result['姓名'], "身份证号码": result['身份证号码']} def __main__(): """""" self.check_is_verified() return id_card_recognition_result() return __main__() def upload_business_license(self): """上传营业执照""" def trans_date_format(_time): """将解析出来的时间数据加上年月日""" year = _time[:4] month = _time[4:6] day = _time[6:8] return year + '年' + month + '月' + day + '日' def business_license_recognition(): """调用营业执照识别接口""" url = 'https://dm-58.data.aliyun.com/rest/160601/ocr/ocr_business_license.json' app_code = "6d6c3146677c40bf9e3e1e2aa73404cd" headers = { 'Authorization': 'APPCODE %s' % app_code, 'Content-Type': 'application/json; charset=UTF-8' } body = { "image": self.read_img_by_base64(self.image) } response = requests.post(url=url, data=json.dumps(body), headers=headers) try: res = json.loads(response.text) data = dict() data['统一社会信用代码'] = res['reg_num'] data['名称'] = res['name'] data['注册资本'] = res['capital'] data['类型'] = res['type'] data['成立日期'] = trans_date_format(res['establish_date']) data['法定代表人'] = res['person'] data['经营范围'] = res['business'] data['住所'] = res['address'] return data except Exception: raise ReturnConditionCheckFailed("营业执照识别异常", 200) def business_license_recognition_result(): """营业执照识别结果""" # 识别营业执照 result = business_license_recognition() # 图片识别正确 更新企业认证中营业执照的信息 self.db.update_single_data( "应用端", "企业用户", {"企业ID": self.cid}, {"认证信息.企业名称": result['名称'], "认证信息.统一社会信用代码": result['统一社会信用代码']} ) return {"公司名称": result['名称'], "统一社会信用代码": result['统一社会信用代码']} def _main__(): """""" self.check_is_verified() return business_license_recognition_result() return _main__() class CompanyVerifyThreeFactorsImpl(CompanyVerifyThreeFactors): """企业认证三要素实现类""" db = MongoHelper("tfse_v0.21") def company_verify(self): """企业认证""" def get_verify_info(): """获取企业认证信息""" record = self.db.find_single_data( "应用端", "企业用户", {"企业ID": self.cid}, ["认证信息.企业名称", "认证信息.法人姓名", "认证信息.统一社会信用代码"] ) if not record: raise ReturnConditionCheckFailed("企业认证信息异常", 200) verify_info = record['认证信息'] self.code = verify_info['统一社会信用代码'] self.name = verify_info['企业名称'] self.legal_person = verify_info['法人姓名'] def check_verify_info_complete(): """检查认证信息是否完整""" if None in [self.code, self.name, self.legal_person]: raise ReturnConditionCheckFailed("企业认证信息异常", 200) def check_three_elements_by_tyc(): """调用天眼查企业三要素验证接口""" token = "5407573b-e41e-4ab4-9f46-daa0032ad0a4" base = "http://open.api.tianyancha.com/services/open/ic/verify/2.0?code={}&name={}&legalPersonName={}" url = base.format(self.code, self.name, self.legal_person) headers = {'Authorization': token} res = json.loads(requests.get(url, headers=headers).text) try: if res['result']['result'] != 1: raise ReturnConditionCheckFailed("企业认证未通过", 200) except Exception: raise ReturnConditionCheckFailed("认证接口异常", 200) def init_company_data(): """初始化企业主页数据""" init_url = "http://api.fecribd.com/etl_tfse/company/init_company_data" init_token = "dmfd7FshT!5Wng9^gcCcQV7T6FBnVgl4" init_headers = {'token': init_token, "Content-Type": "application/json;charset=UTF-8"} data = json.dumps({"cid": self.cid, "company_name": self.name}) response = requests.post(init_url, headers=init_headers, data=data) res = json.loads(response.text) if res['info'] == "企业数据初始化成功": raise ReturnConditionCheckFailed("企业数据初始化异常", 200) else: company_user = CompanyUser() company_user.cid = self.cid company_user.name = self.name company_user.verify_status = "是" self.db.upsert_single_data( "应用端", "企业用户", {"企业ID": self.cid}, company_user.dict_to_save(columns=["企业名称", "已认证"]) ) def __main__(): get_verify_info() check_verify_info_complete() check_three_elements_by_tyc() init_company_data() return __main__() class EmailVerifyCodeRecordImpl(EmailVerifyCodeRecord): """用户通知邮件实现类""" db = MongoHelper("tfse_v0.21") def make_verify_code(self): """生成邮箱验证码""" def make_verify_code(): """随机生成6位数字验证码""" num_code = "" for i in range(6): ch = chr(random.randrange(ord('0'), ord('9') + 1)) num_code += ch return num_code self.v_code = make_verify_code() self.v_period = time.time() + 300 self.db.upsert_single_data( "应用端", "邮箱验证码记录", {"邮箱": self.email}, self.dict_to_save() ) def transfer_email_api(self): """发送邮件""" # 邮箱服务接口 email_api = 'http://116.63.130.34:30001' v_types = { "register": "注册账号", "resetpwd": "修改密码", "resetemail": "修改邮箱" } headers = {"Content-Type": "application/json;charset=UTF-8"} data = { "title": "【远东资信】{}".format(v_types[self.v_type]), "sender": 'fecribd@fecr.com.cn', "recipients": [self.email], "msg_body": "您{}的验证码为 【{}】,5分钟内有效。".format(v_types[self.v_type], self.v_code) } response = requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data)) return response.text def check_email_registered(self): """检查邮箱已被注册""" email = self.db.find_single_column( "应用端", "企业用户", {"邮箱": self.email}, "邮箱" ) if email is not None: raise ReturnConditionCheckFailed("邮箱已被注册", 200) def check_email_not_registered(self): """检查邮箱未被注册""" email = self.db.find_single_column( "应用端", "企业用户", {"邮箱": self.email}, "邮箱" ) if email is None: raise ReturnConditionCheckFailed("邮箱未被注册", 200) def send_email(self): """""" if self.v_type == 'register' or self.v_type == 'resetemail': self.check_email_registered() self.make_verify_code() self.transfer_email_api() elif self.v_type == 'resetpwd': self.check_email_not_registered() self.make_verify_code() self.transfer_email_api()