tfse-admin-api-v0.2/user/user_impl.py

307 lines
9.1 KiB
Python
Raw Normal View History

2022-01-11 15:12:08 +08:00
import json
2022-01-25 16:48:42 +08:00
import os
2022-01-10 05:22:25 +08:00
import time
2022-01-11 07:13:28 +08:00
import random
2022-01-11 15:12:08 +08:00
import requests
2022-04-01 12:10:18 +08:00
2022-04-01 11:15:09 +08:00
from werkzeug.security import check_password_hash
2022-01-10 05:22:25 +08:00
from DBHelper.MongoHelper import MongoHelper
2022-04-01 02:24:02 +08:00
from Utils.ErrorUtil import ReturnConditionCheckFailed
2022-04-01 12:10:18 +08:00
from user.user_auth import create_token
from user.user_obj import UserManage, ListUser, User, UserLogin, SendLoginVcodeEmail
2022-03-31 07:00:37 +08:00
2022-04-01 12:10:18 +08:00
from user.user_utils import decrypt_data
2022-03-31 07:00:37 +08:00
2022-04-01 03:49:16 +08:00
class ListUserImpl(ListUser):
2022-04-01 12:10:18 +08:00
"""用户列表实现"""
2022-04-01 03:49:16 +08:00
db = MongoHelper("tfse_v0.21")
def list(self):
# 查询体构造方法
def make_search_body(param):
body = dict()
search_keys = list(param.keys())
if "uid" in search_keys:
body['UID'] = {"UID": param['uid']}
if "姓名" in search_keys:
body['name'] = {"$regex": param['姓名']}
if "邮箱" in search_keys:
body['email'] = {"$regex": param['邮箱']}
if "状态" in search_keys:
if param['状态'] == "正常":
body['status'] = "normal"
elif param['状态'] == "停用":
body['status'] = "disable"
else:
pass
if "角色" in search_keys:
roles = list()
role_map = {"管理员": "admin", "分析师": "analysts", "开发者": "developer", "运营人员": "operator", "访客": "guest"}
for role in list(set(param['角色'])):
roles.append(role_map[role])
body['role'] = {"$in": roles}
return body
# 顺序条件构造方法
def make_sort_body(param):
if param != {}:
columns_map = {"姓名": "name", "邮箱": "email", "状态": "status", "角色": "role", "创建时间": "create_time"}
asc_or_desc = 1 if list(param.values())[0] == "asc" else -1
sort_column = columns_map[list(param.keys())[0]]
body = {sort_column: asc_or_desc}
else:
body = {"create_time": -1}
return body
search_body = make_search_body(self.search)
sort = make_sort_body(self.sort)
page_size = 10 if self.page_size > 10 else self.page_size
page_no = int(self.page_no)
total = self.db.find_all_data_with_count(
"管理端",
"用户",
search_body
)
records = self.db.find_data_by_page_with_sort(
"管理端",
"用户",
search_body,
["UID", "email", "name", "status", "role", "create_time"],
sort,
page_size,
page_no
)
table_data = list()
if records:
for record in records:
user = User()
user.uid = record['UID']
user.email = record['email']
user.name = record['name']
user.status = record['status']
user.role = record['role']
user.create_time = record['create_time']
table_data.append(user.dict_to_show())
result = {
"records": table_data,
"total": total
}
2022-01-26 15:07:18 +08:00
2022-03-31 03:57:36 +08:00
return result
2022-04-01 11:15:09 +08:00
class UserLoginImpl(UserLogin):
"""用户登录实现"""
db = MongoHelper("tfse_v0.21")
def login(self):
""""""
user_info = self.db.find_single_data(
"管理端",
"用户",
{"email": self.email},
["UID", "name", "pwd", "status", "role"]
)
def check_email_existed():
if not user_info:
raise ReturnConditionCheckFailed("邮箱不存在", 200)
def check_user_is_disable():
if user_info['status'] != 'normal':
raise ReturnConditionCheckFailed("账户已禁用", 200)
def check_vcode_is_correct():
record = self.db.find_single_data(
"管理端",
"邮箱验证码记录",
{"email": self.email},
["vcode", "timestamp"]
)
if not record:
raise ReturnConditionCheckFailed("验证码不存在", 200)
if record['vcode'] != self.vcode:
raise ReturnConditionCheckFailed("验证码错误", 200)
if time.time() - record['timestamp'] > 300:
raise ReturnConditionCheckFailed("验证码过期", 200)
self.db.delete_single_data(
"管理端",
"邮箱验证码记录",
{"email": self.email}
)
def check_pwd_is_correct():
try:
if not check_password_hash(user_info['pwd'], decrypt_data(encrypt_msg=self.pwd)):
raise ReturnConditionCheckFailed("密码错误", 200)
except Exception:
raise ReturnConditionCheckFailed("密码错误", 200)
def make_menus():
role = user_info['role']
with open(os.path.abspath(os.path.dirname(__file__)+'/static/menus.json'), "r", encoding='utf-8') as f:
duties = json.load(f)
self.menus = duties[role]
def make_token():
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
session_id = ''
for i in range(4):
session_id += random.choice(choices)
self.db.update_single_data(
"管理端",
"token记录",
{"UID": user_info['UID']},
{"session_id": session_id}
)
self.token = create_token(session_id)
def __main__():
check_email_existed()
check_user_is_disable()
check_pwd_is_correct()
check_vcode_is_correct()
make_menus()
make_token()
self.name = user_info['name']
return self.dict_to_show()
return __main__()
2022-04-01 12:10:18 +08:00
class SendLoginVcodeEmailImpl(SendLoginVcodeEmail):
"""发送登录验证码邮件 实现"""
db = MongoHelper("tfse_v0.21")
def send_vcode_email(self):
def gen_vcode():
choices = '0123456789'
salt = ''
for i in range(6):
salt += random.choice(choices)
self.vcode = salt
def save_vcode():
self.db.upsert_single_data(
"管理端",
"邮箱验证码记录",
{"email": self.email},
2022-04-01 12:19:23 +08:00
{"vcode": self.vcode, "timestamp": round(time.time())}
2022-04-01 12:10:18 +08:00
)
2022-04-01 12:19:23 +08:00
def send_email():
email_api = 'http://116.63.130.34:30001'
headers = {"Content-Type": "application/json;charset=UTF-8"}
data = {
"title": "【远东资信】{}".format("登录验证码"),
"sender": 'fecribd@fecr.com.cn',
"recipients": [self.email],
"msg_body": "{}的验证码为 【{}5分钟内有效。".format("登录", self.vcode)
}
requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data))
2022-04-01 12:10:18 +08:00
def __main__():
gen_vcode()
save_vcode()
send_email()
return __main__()
class UserManageImpl(UserManage):
db = MongoHelper("tfse_v0.21")
def create(self):
def check_email_registered():
"""检查邮箱是否已注册"""
email_is_existed = self.db.find_single_column(
"管理端",
"用户",
{"email": self.email},
"email"
)
if email_is_existed:
raise ReturnConditionCheckFailed("邮箱已被注册", 200)
def generate_new_uid():
"""生成新的用户ID"""
def uid_maker(num):
"""
用户ID生成器
num: ID长度 int
"""
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
salt = ''
for i in range(num):
salt += random.choice(choices)
return salt
def uid_checker(__uid):
"""
用户ID重复检查
__uid: 用户ID str
return:
True 用户ID可用
False 用户ID不可用
"""
is_uid_existed = self.db.find_single_column(
"管理端",
"用户",
{"UID": __uid},
"UID"
)
return True if is_uid_existed is None else False
uid = uid_maker(8)
while not uid_checker(uid):
uid = uid_maker(8)
self.uid = uid
def __main__():
check_email_registered()
generate_new_uid()
self.status = "normal"
self.create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.db.upsert_single_data(
"管理端",
"用户",
{"UID": self.uid},
self.dict_to_save()
)
__main__()