216 lines
6.6 KiB
Python
216 lines
6.6 KiB
Python
import json
|
||
import time
|
||
import random
|
||
import base64
|
||
import requests
|
||
from urllib.request import Request, urlopen
|
||
|
||
from werkzeug.security import check_password_hash, generate_password_hash
|
||
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
|
||
|
||
from DBHelper.MongoHelper import MongoHelper
|
||
from ObjectsInProject.User.TFSECompanyUser import TFSECompanyUser
|
||
|
||
from Utils.ErrorUtil import ReturnConditionCheckFailed
|
||
|
||
|
||
class TFSECompanyUserImpl(TFSECompanyUser):
|
||
"""企业用户实现类"""
|
||
|
||
db = MongoHelper("tfse_v0.21")
|
||
|
||
def check_email_registered(self):
|
||
"""检查邮箱已被注册"""
|
||
email = self.db.find_single_column(
|
||
"应用端",
|
||
"企业用户",
|
||
{"邮箱": self.email},
|
||
"邮箱"
|
||
)
|
||
if email is not None:
|
||
raise ReturnConditionCheckFailed("邮箱已被注册", 200)
|
||
|
||
def check_email_not_registered(self):
|
||
"""检查邮箱未被注册"""
|
||
email = self.db.find_single_column(
|
||
"应用端",
|
||
"企业用户",
|
||
{"邮箱": self.email},
|
||
"邮箱"
|
||
)
|
||
if email is None:
|
||
raise ReturnConditionCheckFailed("邮箱未被注册", 200)
|
||
|
||
def check_vcode_correct(self, code):
|
||
"""检查邮箱验证码是否正确"""
|
||
verify_data = self.db.find_single_data(
|
||
"应用端",
|
||
"邮箱验证码记录",
|
||
{"邮箱": self.email},
|
||
["验证码", "验证有效期"]
|
||
)
|
||
|
||
if not verify_data:
|
||
raise ReturnConditionCheckFailed("验证码错误", 200)
|
||
|
||
if verify_data['验证码'] != code:
|
||
raise ReturnConditionCheckFailed("验证码错误", 200)
|
||
|
||
if time.time() - verify_data['验证有效期'] > 300:
|
||
raise ReturnConditionCheckFailed('验证码过期', 200)
|
||
|
||
def login(self):
|
||
"""登录"""
|
||
|
||
company_user = self.db.find_single_data(
|
||
"应用端",
|
||
"企业用户",
|
||
{"邮箱": self.email},
|
||
["企业ID", "企业名称", "邮箱", "密码", "已认证"]
|
||
)
|
||
|
||
def check_user_existed():
|
||
"""检查用户是否存在"""
|
||
if company_user is None:
|
||
raise ReturnConditionCheckFailed("不存在该用户", 200)
|
||
|
||
def check_password_correct():
|
||
"""检查密码是否正确"""
|
||
if not check_password_hash(company_user['密码'], self.pwd):
|
||
raise ReturnConditionCheckFailed("登录密码错误", 200)
|
||
|
||
def create_token(param):
|
||
"""
|
||
创建token
|
||
param: 传入参数,用于创建token,一般为 {"cid": cid}
|
||
return: 用户token
|
||
"""
|
||
secret_key = '0FTuOi^#Afx1@2@F'
|
||
token_expire = 14400
|
||
s = Serializer(secret_key, expires_in=token_expire)
|
||
token = '' + s.dumps(param).decode('ascii')
|
||
return token
|
||
|
||
check_user_existed()
|
||
check_password_correct()
|
||
|
||
self.cid = company_user['企业ID']
|
||
self.name = self.get_attr(company_user, '企业名称', default='未认证企业用户')
|
||
self.verify_status = company_user['已认证']
|
||
self.token = create_token({"cid": self.cid})
|
||
|
||
def register(self, code):
|
||
"""注册"""
|
||
|
||
def make_new_cid():
|
||
"""生成新的企业ID,如果该ID存在,则重新生成"""
|
||
|
||
def random_cid(num):
|
||
"""随机企业ID"""
|
||
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
|
||
salt = ''
|
||
for i in range(num):
|
||
salt += random.choice(choices)
|
||
return salt
|
||
|
||
new_cid = random_cid(8)
|
||
|
||
case = self.db.find_single_column(
|
||
"应用端",
|
||
"企业用户",
|
||
{"企业ID": new_cid},
|
||
"企业ID"
|
||
) is not None
|
||
while case:
|
||
new_cid = random_cid(8)
|
||
return new_cid
|
||
|
||
self.check_email_registered()
|
||
self.check_vcode_correct(code)
|
||
|
||
self.cid = make_new_cid()
|
||
self.avatar_id = "623152edf36000004f00124e"
|
||
self.verify_status = "否"
|
||
|
||
self.db.insert_single_data(
|
||
"应用端",
|
||
"企业用户",
|
||
self.dict_to_return()
|
||
)
|
||
|
||
self.db.delete_single_data(
|
||
"应用端",
|
||
"邮箱验证码记录",
|
||
{"邮箱": self.email}
|
||
)
|
||
|
||
def get_company_user_info(self):
|
||
"""获取企业用户信息"""
|
||
company_user_data = self.db.find_single_data(
|
||
"应用端",
|
||
"企业用户",
|
||
{"企业ID": self.cid},
|
||
["企业ID", "企业名称", "邮箱", "已认证"]
|
||
)
|
||
|
||
self.name = self.get_attr(company_user_data, '企业名称', default='未认证企业用户')
|
||
self.email = company_user_data['邮箱']
|
||
self.verify_status = company_user_data['已认证']
|
||
|
||
def get_avatar(self):
|
||
"""获取用户头像"""
|
||
|
||
self.avatar_id = self.db.find_single_column(
|
||
"应用端",
|
||
"企业用户",
|
||
{"企业ID": self.cid},
|
||
"头像fid"
|
||
)
|
||
|
||
avatar = self.db.find_file(
|
||
"文件",
|
||
"企业用户头像",
|
||
self.avatar_id
|
||
)
|
||
|
||
return avatar
|
||
|
||
def change_avatar(self, avatar_option):
|
||
"""修改企业用户头像"""
|
||
avatars = {
|
||
"avatar_01": "623152edf36000004f00124e",
|
||
"avatar_02": "623152edf36000004f001250",
|
||
"avatar_03": "623152edf36000004f00124f",
|
||
"avatar_04": "623152edf36000004f001251",
|
||
"avatar_05": "623152edf36000004f001252",
|
||
"avatar_06": "623152edf36000004f001253"
|
||
}
|
||
self.avatar_id = avatars[avatar_option]
|
||
self.db.update_single_data(
|
||
"应用端",
|
||
"企业用户",
|
||
{"企业ID": self.cid},
|
||
{"头像fid": self.avatar_id}
|
||
)
|
||
|
||
def change_password(self, code):
|
||
"""修改登录密码"""
|
||
self.check_email_not_registered()
|
||
self.check_vcode_correct(code)
|
||
self.db.update_single_data(
|
||
"应用端",
|
||
"企业用户",
|
||
{"邮箱": self.email},
|
||
self.dict_to_save(columns=["密码"])
|
||
)
|
||
|
||
def change_email(self, code):
|
||
"""更换邮箱"""
|
||
self.check_vcode_correct(code)
|
||
self.check_email_registered()
|
||
self.db.update_single_data(
|
||
"应用端",
|
||
"企业用户",
|
||
{"企业ID": self.cid},
|
||
self.dict_to_save(columns=["邮箱"])
|
||
) |