tfse-admin-api-v0.2/user/user_impl.py

328 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import time
import random
import requests
from werkzeug.security import check_password_hash
from DBHelper.MongoHelper import MongoHelper
from Utils.ErrorUtil import ReturnConditionCheckFailed
from user.user_auth import create_token
from user.user_obj import UserManage, ListUser, User, UserLogin, SendLoginVcodeEmail
from user.user_utils import decrypt_data
class ListUserImpl(ListUser):
"""用户列表实现"""
db = MongoHelper("tfse_v0.21")
def list(self):
# 查询体构造方法
def make_search_body(param):
body = dict()
search_keys = list(param.keys())
if "uid" in search_keys:
body['UID'] = param['uid']
if "姓名" in search_keys:
body['name'] = {"$regex": param['姓名']}
if "邮箱" in search_keys:
body['email'] = {"$regex": param['邮箱']}
if "状态" in search_keys:
if param['状态'] == "正常":
body['status'] = "normal"
elif param['状态'] == "停用":
body['status'] = "disable"
else:
pass
if "角色" in search_keys:
roles = list()
role_map = {"管理员": "admin", "分析师": "analysts", "开发者": "developer", "运营人员": "operator", "访客": "guest"}
for role in list(set(param['角色'])):
roles.append(role_map[role])
body['role'] = {"$in": roles}
return body
# 顺序条件构造方法
def make_sort_body(param):
if param != {}:
columns_map = {"姓名": "name", "邮箱": "email", "状态": "status", "角色": "role", "创建时间": "create_time"}
asc_or_desc = 1 if list(param.values())[0] == "asc" else -1
sort_column = columns_map[list(param.keys())[0]]
body = {sort_column: asc_or_desc}
else:
body = {"create_time": -1}
return body
search_body = make_search_body(self.search)
sort = make_sort_body(self.sort)
page_size = 10 if self.page_size > 10 else self.page_size
page_no = int(self.page_no)
total = self.db.find_all_data_with_count(
"管理端",
"用户",
search_body
)
records = self.db.find_data_by_page_with_sort(
"管理端",
"用户",
search_body,
["UID", "email", "name", "status", "role", "create_time"],
sort,
page_size,
page_no
)
table_data = list()
if records:
for record in records:
user = User()
user.uid = record['UID']
user.email = record['email']
user.name = record['name']
user.status = record['status']
user.role = record['role']
user.create_time = record['create_time']
table_data.append(user.dict_to_show())
result = {
"records": table_data,
"total": total
}
return result
class UserLoginImpl(UserLogin):
"""用户登录实现"""
db = MongoHelper("tfse_v0.21")
def login(self):
""""""
user_info = self.db.find_single_data(
"管理端",
"用户",
{"email": self.email},
["UID", "name", "pwd", "status", "role"]
)
def check_email_existed():
if not user_info:
raise ReturnConditionCheckFailed("邮箱不存在", 200)
def check_user_is_disable():
if user_info['status'] != 'normal':
raise ReturnConditionCheckFailed("账户已禁用", 200)
def check_vcode_is_correct():
record = self.db.find_single_data(
"管理端",
"邮箱验证码记录",
{"email": self.email},
["vcode", "timestamp"]
)
if not record:
raise ReturnConditionCheckFailed("验证码不存在", 200)
if record['vcode'] != self.vcode:
raise ReturnConditionCheckFailed("验证码错误", 200)
if time.time() - record['timestamp'] > 300:
raise ReturnConditionCheckFailed("验证码过期", 200)
self.db.delete_single_data(
"管理端",
"邮箱验证码记录",
{"email": self.email}
)
def check_pwd_is_correct():
try:
if not check_password_hash(user_info['pwd'], decrypt_data(encrypt_msg=self.pwd)):
raise ReturnConditionCheckFailed("密码错误", 200)
except Exception:
raise ReturnConditionCheckFailed("密码错误", 200)
def make_menus():
role = user_info['role']
with open(os.path.abspath(os.path.dirname(__file__)+'/static/menus.json'), "r", encoding='utf-8') as f:
duties = json.load(f)
self.menus = duties[role]
def make_token():
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
session_id = ''
for i in range(4):
session_id += random.choice(choices)
self.db.update_single_data(
"管理端",
"token记录",
{"UID": user_info['UID']},
{"session_id": session_id}
)
self.token = create_token(session_id)
def __main__():
check_email_existed()
check_user_is_disable()
check_pwd_is_correct()
check_vcode_is_correct()
make_menus()
make_token()
self.name = user_info['name']
return self.dict_to_show()
return __main__()
class SendLoginVcodeEmailImpl(SendLoginVcodeEmail):
"""发送登录验证码邮件 实现"""
db = MongoHelper("tfse_v0.21")
def send_vcode_email(self):
def gen_vcode():
choices = '0123456789'
salt = ''
for i in range(6):
salt += random.choice(choices)
self.vcode = salt
def save_vcode():
self.db.upsert_single_data(
"管理端",
"邮箱验证码记录",
{"email": self.email},
{"vcode": self.vcode, "timestamp": round(time.time())}
)
def send_email():
email_api = 'http://116.63.130.34:30001'
headers = {"Content-Type": "application/json;charset=UTF-8"}
data = {
"title": "【远东资信】{}".format("登录验证码"),
"sender": 'fecribd@fecr.com.cn',
"recipients": [self.email],
"msg_body": "{}的验证码为 【{}5分钟内有效。".format("登录", self.vcode)
}
requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data))
def __main__():
gen_vcode()
save_vcode()
send_email()
return __main__()
class UserManageImpl(UserManage):
db = MongoHelper("tfse_v0.21")
def create(self):
def check_email_registered():
"""检查邮箱是否已注册"""
email_is_existed = self.db.find_single_column(
"管理端",
"用户",
{"email": self.email},
"email"
)
if email_is_existed:
raise ReturnConditionCheckFailed("邮箱已被注册", 200)
def generate_new_uid():
"""生成新的用户ID"""
def uid_maker(num):
"""
用户ID生成器
num: ID长度 int
"""
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
salt = ''
for i in range(num):
salt += random.choice(choices)
return salt
def uid_checker(__uid):
"""
用户ID重复检查
__uid: 用户ID str
return:
True 用户ID可用
False 用户ID不可用
"""
is_uid_existed = self.db.find_single_column(
"管理端",
"用户",
{"UID": __uid},
"UID"
)
return True if is_uid_existed is None else False
uid = uid_maker(8)
while not uid_checker(uid):
uid = uid_maker(8)
self.uid = uid
def __main__():
check_email_registered()
generate_new_uid()
self.status = "normal"
self.create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.db.upsert_single_data(
"管理端",
"用户",
{"UID": self.uid},
self.dict_to_save()
)
__main__()
def disable(self):
"""禁用用户 实现"""
self.db.update_single_data(
"管理端",
"用户",
{"UID": self.uid},
{"status": self.status}
)
def active(self):
"""激活用户 实现"""
self.db.update_single_data(
"管理端",
"用户",
{"UID": self.uid},
{"status": self.status}
)