tfse-app-api-v0.2/Implements/User/TFSECompanyUserImpl.py

212 lines
6.5 KiB
Python
Raw Normal View History

2022-04-26 17:04:09 +08:00
import time
import random
2022-05-05 14:45:06 +08:00
from werkzeug.security import check_password_hash
2022-04-26 17:04:09 +08:00
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
from DBHelper.MongoHelper import MongoHelper
2022-05-05 14:45:06 +08:00
from Objects.InProject.User.TFSECompanyUser import TFSECompanyUser
2022-04-26 17:04:09 +08:00
from Utils.ErrorUtil import ReturnConditionCheckFailed
class TFSECompanyUserImpl(TFSECompanyUser):
"""企业用户实现类"""
db = MongoHelper("tfse_v0.21")
def check_email_registered(self):
"""检查邮箱已被注册"""
email = self.db.find_single_column(
"应用端",
"企业用户",
{"邮箱": self.email},
"邮箱"
)
if email is not None:
raise ReturnConditionCheckFailed("邮箱已被注册", 200)
def check_email_not_registered(self):
"""检查邮箱未被注册"""
email = self.db.find_single_column(
"应用端",
"企业用户",
{"邮箱": self.email},
"邮箱"
)
if email is None:
raise ReturnConditionCheckFailed("邮箱未被注册", 200)
def check_vcode_correct(self, code):
"""检查邮箱验证码是否正确"""
verify_data = self.db.find_single_data(
"应用端",
"邮箱验证码记录",
{"邮箱": self.email},
["验证码", "验证有效期"]
)
if not verify_data:
raise ReturnConditionCheckFailed("验证码错误", 200)
if verify_data['验证码'] != code:
raise ReturnConditionCheckFailed("验证码错误", 200)
if time.time() - verify_data['验证有效期'] > 300:
raise ReturnConditionCheckFailed('验证码过期', 200)
def login(self):
"""登录"""
company_user = self.db.find_single_data(
"应用端",
"企业用户",
{"邮箱": self.email},
["企业ID", "企业名称", "邮箱", "密码", "已认证"]
)
def check_user_existed():
"""检查用户是否存在"""
if company_user is None:
raise ReturnConditionCheckFailed("不存在该用户", 200)
def check_password_correct():
"""检查密码是否正确"""
if not check_password_hash(company_user['密码'], self.pwd):
raise ReturnConditionCheckFailed("登录密码错误", 200)
def create_token(param):
"""
创建token
param: 传入参数用于创建token一般为 {"cid": cid}
return: 用户token
"""
secret_key = '0FTuOi^#Afx1@2@F'
token_expire = 14400
s = Serializer(secret_key, expires_in=token_expire)
token = '' + s.dumps(param).decode('ascii')
return token
check_user_existed()
check_password_correct()
self.cid = company_user['企业ID']
self.name = self.get_attr(company_user, '企业名称', default='未认证企业用户')
self.verify_status = company_user['已认证']
self.token = create_token({"cid": self.cid})
def register(self, code):
"""注册"""
def make_new_cid():
"""生成新的企业ID如果该ID存在则重新生成"""
def random_cid(num):
"""随机企业ID"""
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
salt = ''
for i in range(num):
salt += random.choice(choices)
return salt
new_cid = random_cid(8)
case = self.db.find_single_column(
"应用端",
"企业用户",
{"企业ID": new_cid},
"企业ID"
) is not None
while case:
new_cid = random_cid(8)
return new_cid
self.check_email_registered()
self.check_vcode_correct(code)
self.cid = make_new_cid()
self.avatar_id = "623152edf36000004f00124e"
self.verify_status = ""
self.db.insert_single_data(
"应用端",
"企业用户",
self.dict_to_return()
)
self.db.delete_single_data(
"应用端",
"邮箱验证码记录",
{"邮箱": self.email}
)
def get_company_user_info(self):
"""获取企业用户信息"""
company_user_data = self.db.find_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
["企业ID", "企业名称", "邮箱", "已认证"]
)
self.name = self.get_attr(company_user_data, '企业名称', default='未认证企业用户')
self.email = company_user_data['邮箱']
self.verify_status = company_user_data['已认证']
def get_avatar(self):
"""获取用户头像"""
self.avatar_id = self.db.find_single_column(
"应用端",
"企业用户",
{"企业ID": self.cid},
"头像fid"
)
avatar = self.db.find_file(
"文件",
"企业用户头像",
self.avatar_id
)
return avatar
def change_avatar(self, avatar_option):
"""修改企业用户头像"""
avatars = {
"avatar_01": "623152edf36000004f00124e",
"avatar_02": "623152edf36000004f001250",
"avatar_03": "623152edf36000004f00124f",
"avatar_04": "623152edf36000004f001251",
"avatar_05": "623152edf36000004f001252",
"avatar_06": "623152edf36000004f001253"
}
self.avatar_id = avatars[avatar_option]
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
{"头像fid": self.avatar_id}
)
def change_password(self, code):
"""修改登录密码"""
self.check_email_not_registered()
self.check_vcode_correct(code)
self.db.update_single_data(
"应用端",
"企业用户",
{"邮箱": self.email},
self.dict_to_save(columns=["密码"])
)
def change_email(self, code):
"""更换邮箱"""
self.check_vcode_correct(code)
self.check_email_registered()
self.db.update_single_data(
"应用端",
"企业用户",
{"企业ID": self.cid},
self.dict_to_save(columns=["邮箱"])
)