tfse-admin-api-v0.2/user/user_impl.py

462 lines
13 KiB
Python
Raw Normal View History

2022-01-11 15:12:08 +08:00
import json
2022-01-25 16:48:42 +08:00
import os
2022-01-10 05:22:25 +08:00
import time
2022-01-11 07:13:28 +08:00
import random
2022-01-10 05:22:25 +08:00
2022-01-11 15:12:08 +08:00
import requests
2022-01-11 17:36:31 +08:00
from werkzeug.security import generate_password_hash, check_password_hash
2022-01-10 05:22:25 +08:00
from DBHelper.MongoHelper import MongoHelper
2022-04-01 02:24:02 +08:00
from Utils.ErrorUtil import ReturnConditionCheckFailed
2022-02-11 11:24:28 +08:00
from user.user_auth import create_token
2022-04-01 02:34:49 +08:00
from user.user_obj import UserManage
2022-03-31 07:00:37 +08:00
from user.user_utils import check_mail_fmt, decrypt_data
2022-03-30 16:47:27 +08:00
from user.user_db import DELETE_DATA, UPDATE_INFO
2022-04-01 02:34:49 +08:00
class UserManageImpl(UserManage):
2022-04-01 02:24:02 +08:00
db = MongoHelper("tfse_v0.21")
def create(self):
def check_email_registered():
"""检查邮箱是否已注册"""
email_is_existed = self.db.find_single_column(
"管理端",
"用户",
{"email": self.email},
"email"
)
if email_is_existed:
raise ReturnConditionCheckFailed("邮箱已被注册", 200)
def generate_new_uid():
"""生成新的用户ID"""
def uid_maker(num):
"""
用户ID生成器
num: ID长度 int
"""
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
salt = ''
for i in range(num):
salt += random.choice(choices)
return salt
def uid_checker(__uid):
"""
用户ID重复检查
__uid: 用户ID str
return:
True 用户ID可用
False 用户ID不可用
"""
is_uid_existed = self.db.find_single_column(
"管理端",
"用户",
{"UID": __uid},
"UID"
)
return True if is_uid_existed is None else False
uid = uid_maker(8)
while not uid_checker(uid):
uid = uid_maker(8)
self.uid = uid
def __main__():
check_email_registered()
generate_new_uid()
self.status = "normal"
self.create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.db.upsert_single_data(
"管理端",
"用户",
{"UID": self.uid},
self.dict_to_save()
)
__main__()
2022-03-31 03:57:36 +08:00
@staticmethod
2022-04-01 02:24:02 +08:00
def list_user(param1, param2, param3, param4):
2022-01-11 07:13:28 +08:00
"""
2022-03-31 03:57:36 +08:00
查询用户列表
param1:
type: dict
desc: 用户搜索条件 支持模糊查询条件
demo: {"姓名": "xxx", "邮箱": "xxx"}
param2:
type: dict
desc: 排序条件 asc正序 desc倒序
demo: {"姓名": asc}
param3:
type: int
desc: 每页数据数量
param4:
type: int
desc: 当前页数
2022-01-11 07:13:28 +08:00
"""
2022-03-31 03:57:36 +08:00
# 查询体构造方法
def make_search_body(param):
body = dict()
search_keys = list(param.keys())
if "uid" in search_keys:
body['UID'] = {"UID": param['uid']}
if "姓名" in search_keys:
body['name'] = {"$regex": param['姓名']}
if "邮箱" in search_keys:
body['email'] = {"$regex": param['邮箱']}
if "状态" in search_keys:
if param['状态'] == "正常":
body['status'] = "normal"
elif param['状态'] == "停用":
body['status'] = "disable"
else:
pass
if "角色" in search_keys:
roles = list()
role_map = {"管理员": "admin", "分析师": "analysts", "开发者": "developer", "运营人员": "operator", "访客": "guest"}
for role in list(set(param['角色'])):
roles.append(role_map[role])
body['role'] = {"$in": roles}
return body
# 顺序条件构造方法
def make_sort_body(param):
if param != {}:
columns_map = {"姓名": "name", "邮箱": "email", "状态": "status", "角色": "role", "创建时间": "create_time"}
asc_or_desc = 1 if list(param.values())[0] == "asc" else -1
sort_column = columns_map[list(param.keys())[0]]
body = {sort_column: asc_or_desc}
else:
body = {"create_time": -1}
2022-01-10 05:22:25 +08:00
2022-03-31 03:57:36 +08:00
return body
2022-01-10 05:22:25 +08:00
2022-03-31 03:57:36 +08:00
# 连接数据库
db = MongoHelper("tfse_v0.21")
2022-01-10 18:07:02 +08:00
2022-03-31 03:57:36 +08:00
search_body = make_search_body(param1)
sort = make_sort_body(param2)
page_size = 10 if int(param3) > 10 else int(param3)
page_no = int(param4)
total = db.find_all_data_with_count(
2022-03-30 04:58:12 +08:00
"管理端",
"用户",
2022-03-31 03:57:36 +08:00
search_body
2022-03-30 04:58:12 +08:00
)
2022-03-31 03:57:36 +08:00
records = db.find_data_by_page_with_sort(
"管理端",
"用户",
search_body,
["UID", "email", "name", "status", "role", "create_time"],
sort,
page_size,
page_no
)
if records:
for record in records:
record['uid'] = record.pop('UID')
record['邮箱'] = record.pop('email')
record['姓名'] = record.pop('name')
2022-03-31 10:59:46 +08:00
record['状态'] = "正常" if record.pop('status') == "normal" else "停用"
2022-03-31 03:57:36 +08:00
record['角色'] = record.pop('role')
record['创建时间'] = record.pop('create_time')
else:
records = []
result = {
"records": records,
"total": total
}
2022-01-11 07:13:28 +08:00
2022-03-31 03:57:36 +08:00
return result
2022-01-11 07:13:28 +08:00
2022-03-31 07:00:37 +08:00
@staticmethod
def send_vcode_to_user_impl(email):
def gen_vcode():
choices = '0123456789'
salt = ''
for i in range(6):
salt += random.choice(choices)
return salt
def send_email():
email_api = 'http://116.63.130.34:30001'
vcode = gen_vcode()
timestamp = round(time.time())
headers = {"Content-Type": "application/json;charset=UTF-8"}
data = {"title": "【远东资信】{}".format("登录验证码"),
"sender": 'fecribd@fecr.com.cn',
"recipients": [email],
"msg_body": "{}的验证码为 【{}5分钟内有效。".format("登录", vcode)}
requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data))
db = MongoHelper("tfse_v0.21")
db.upsert_single_data(
"管理端",
"邮箱验证码记录",
{"email": email},
{"vcode": vcode, "timestamp": timestamp}
)
return True
def __main__():
res = send_email()
if res is not True:
return res
return True
return __main__()
2022-03-31 03:57:36 +08:00
@staticmethod
def create_user_impl(email, name, pwd, role):
2022-01-11 07:13:28 +08:00
"""
2022-03-31 03:57:36 +08:00
创建新用户流程
Parameters:
email str 邮箱
name str 姓名
pwd str 密码
role str 角色
2022-01-11 07:13:28 +08:00
Returns:
2022-03-31 03:57:36 +08:00
执行成功 bool True
执行失败 str 异常信息
2022-01-11 07:13:28 +08:00
"""
2022-03-31 03:57:36 +08:00
db = MongoHelper("tfse_v0.21")
2022-01-11 07:13:28 +08:00
2022-03-31 03:57:36 +08:00
def check_params():
2022-01-11 07:13:28 +08:00
"""
2022-03-31 03:57:36 +08:00
参数检查
2022-01-11 07:13:28 +08:00
"""
2022-03-31 03:57:36 +08:00
email_is_existed = db.find_single_column(
2022-03-30 04:58:12 +08:00
"管理端",
"用户",
2022-03-31 03:57:36 +08:00
{"email": email},
"email"
)
2022-01-11 07:13:28 +08:00
2022-03-31 03:57:36 +08:00
if email_is_existed:
return "邮箱已被注册"
2022-01-11 07:13:28 +08:00
2022-03-31 03:57:36 +08:00
return True
2022-01-11 15:12:08 +08:00
2022-03-31 03:57:36 +08:00
def gen_new_uid():
"""
生成新的用户ID如果该ID存在则重新生成
Returns:
生成的用户ID
"""
2022-01-11 15:12:08 +08:00
2022-03-31 03:57:36 +08:00
def make_id(num):
"""
随机生成字符串
"""
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
salt = ''
for i in range(num):
salt += random.choice(choices)
return salt
def gen_id():
"""
生成新ID
检查新ID是否存在如果存在则继续生成新ID
若新ID可使用返回新ID
"""
new_id = make_id(8)
2022-03-31 03:57:36 +08:00
case = db.find_single_column(
"管理端",
"用户",
{"UID": new_id},
"UID"
) is not None
2022-01-11 17:36:31 +08:00
2022-03-31 03:57:36 +08:00
while case:
new_id = make_id(8)
return new_id
2022-01-11 17:36:31 +08:00
2022-03-31 03:57:36 +08:00
return gen_id()
2022-03-11 14:58:43 +08:00
2022-03-31 03:57:36 +08:00
def __main__():
"""
执行流程
"""
check_param_result = check_params()
if check_param_result is not True:
return check_param_result
2022-03-31 07:00:37 +08:00
user = {
'UID': gen_new_uid(),
'email': email,
'name': name,
'pwd': generate_password_hash(decrypt_data(encrypt_msg=pwd)),
'status': 'normal',
'role': role,
'create_time': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
}
2022-03-31 03:57:36 +08:00
db.upsert_single_data(
"管理端",
"用户",
{"UID": user['UID']},
user
)
2022-03-31 03:57:36 +08:00
return True
2022-01-25 16:48:42 +08:00
2022-03-31 03:57:36 +08:00
return __main__()
2022-01-25 16:48:42 +08:00
2022-03-31 07:00:37 +08:00
class UserOperateImpl:
2022-03-31 03:57:36 +08:00
@staticmethod
def login_impl(email, pwd, vcode):
db = MongoHelper("tfse_v0.21")
2022-01-25 16:48:42 +08:00
2022-03-31 03:57:36 +08:00
user_info = db.find_single_data(
"管理端",
2022-03-31 03:57:36 +08:00
"用户",
{"email": email},
["UID", "name", "pwd", "status", "role"]
)
2022-01-25 16:48:42 +08:00
2022-03-31 03:57:36 +08:00
def check_email():
if not user_info:
return "不存在该邮箱"
if not check_mail_fmt(email):
return "邮箱格式错误"
return True
2022-01-25 16:48:42 +08:00
2022-03-31 03:57:36 +08:00
def check_disable():
if user_info['status'] != 'normal':
return "账户已禁用"
return True
2022-01-11 17:36:31 +08:00
2022-03-31 03:57:36 +08:00
def check_vcode():
record = db.find_single_data(
"管理端",
"邮箱验证码记录",
{"email": email},
["vcode", "timestamp"]
)
2022-01-25 16:48:42 +08:00
2022-03-31 03:57:36 +08:00
if not record:
return "无验证信息"
2022-01-25 16:48:42 +08:00
2022-03-31 03:57:36 +08:00
if record['vcode'] != vcode:
return "验证码错误"
2022-01-11 17:36:31 +08:00
2022-03-31 03:57:36 +08:00
if time.time() - record['timestamp'] > 300:
return "验证码过期"
2022-03-31 03:57:36 +08:00
db.delete_single_data(
"管理端",
"邮箱验证码记录",
{"email": email}
)
return True
def check_pwd():
hash_pwd = user_info['pwd']
try:
if not check_password_hash(hash_pwd, decrypt_data(encrypt_msg=pwd)):
return "密码错误"
except Exception:
return "密码错误"
return True
2022-03-31 03:57:36 +08:00
def make_menus():
role = user_info['role']
2022-01-25 16:48:42 +08:00
2022-03-31 03:57:36 +08:00
with open(os.path.abspath(os.path.dirname(__file__)+'/static/menus.json'), "r", encoding='utf-8') as f:
duties = json.load(f)
2022-01-11 17:36:31 +08:00
2022-03-31 03:57:36 +08:00
return duties[role]
2022-01-11 17:36:31 +08:00
2022-03-31 03:57:36 +08:00
def make_session_id():
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
session_id = ''
for i in range(4):
session_id += random.choice(choices)
2022-03-11 14:58:43 +08:00
2022-03-31 03:57:36 +08:00
db.update_single_data(
"管理端",
"token记录",
{"UID": user_info['UID']},
{"session_id": session_id}
)
2022-01-25 16:48:42 +08:00
2022-03-31 03:57:36 +08:00
return session_id
2022-01-11 17:36:31 +08:00
2022-03-31 03:57:36 +08:00
def make_login_return_data():
return_data = dict()
return_data['name'] = user_info["name"]
return_data['token'] = create_token(make_session_id())
return_data['menus'] = make_menus()
return return_data
2022-01-11 17:36:31 +08:00
2022-03-31 03:57:36 +08:00
def __main__():
result = check_email()
if result is not True:
return result
2022-01-11 15:12:08 +08:00
2022-03-31 03:57:36 +08:00
result = check_disable()
if result is not True:
return result
2022-01-11 15:12:08 +08:00
2022-03-31 03:57:36 +08:00
result = check_pwd()
if result is not True:
return result
2022-01-11 15:12:08 +08:00
2022-03-31 03:57:36 +08:00
result = check_vcode()
if result is not True:
return result
2022-01-11 15:12:08 +08:00
2022-03-31 03:57:36 +08:00
return make_login_return_data()
2022-01-11 15:12:08 +08:00
2022-03-31 03:57:36 +08:00
return __main__()
2022-03-30 04:58:12 +08:00
2022-03-31 03:57:36 +08:00
@staticmethod
def disable_user_impl(uid):
"""
禁用用户
"""
result = UPDATE_INFO("用户", "用户信息", {"UID": uid}, {"status": "disable"})
return result
2022-01-26 15:07:18 +08:00
2022-03-31 03:57:36 +08:00
@staticmethod
def delete_user_impl(uid):
"""
删除用户
"""
result = DELETE_DATA("用户", "用户信息", {"UID": uid})
return result
2022-01-26 15:07:18 +08:00
2022-03-31 03:57:36 +08:00
@staticmethod
def manage_role_of_user_impl(uid, role):
"""
管理用户角色
"""
result = UPDATE_INFO("用户", "用户信息", {"UID": uid}, {"role": role})
return result