tfse-admin-api-v0.2/user/user_impl.py

404 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import time
import random
import requests
from werkzeug.security import generate_password_hash, check_password_hash
from DBHelper.MongoHelper import MongoHelper
from user.user_auth import create_token
from user.user_utils import check_mail_fmt, check_pwd_fmt, decrypt_data
from user.user_db import DELETE_DATA, UPDATE_INFO
def list_user_impl(param1, param2, param3, param4):
"""
查询用户列表
param1:
type: dict
desc: 用户搜索条件 支持模糊查询条件
demo: {"姓名": "xxx", "邮箱": "xxx"}
param2:
type: dict
desc: 排序条件 asc正序 desc倒序
demo: {"姓名": asc}
param3:
type: int
desc: 每页数据数量
param4:
type: int
desc: 当前页数
"""
# 查询体构造方法
def make_search_body(param):
body = dict()
search_keys = list(param.keys())
if "uid" in search_keys:
body['UID'] = {"UID": param['uid']}
if "姓名" in search_keys:
body['name'] = {"$regex": param['姓名']}
if "邮箱" in search_keys:
body['email'] = {"$regex": param['邮箱']}
if "状态" in search_keys:
if param['状态'] == "正常":
body['status'] = "normal"
elif param['状态'] == "停用":
body['status'] = "disable"
else:
pass
if "角色" in search_keys:
roles = list()
role_map = {"管理员": "admin", "分析师": "analysts", "开发者": "developer", "运营人员": "operator", "访客": "guest"}
for role in list(set(param['角色'])):
roles.append(role_map[role])
body['role'] = {"$in": roles}
return body
# 顺序条件构造方法
def make_sort_body(param):
if param != {}:
columns_map = {"姓名": "name", "邮箱": "email", "状态": "status", "角色": "role", "创建时间": "create_time"}
asc_or_desc = 1 if list(param.values())[0] == "asc" else -1
sort_column = columns_map[list(param.keys())[0]]
body = {sort_column: asc_or_desc}
else:
body = {"create_time": -1}
return body
# 连接数据库
db = MongoHelper("tfse_v0.21")
search_body = make_search_body(param1)
sort = make_sort_body(param2)
page_size = 10 if int(param3) > 10 else int(param3)
page_no = int(param4)
total = db.find_all_data_with_count(
"管理端",
"用户",
search_body
)
records = db.find_data_by_page_with_sort(
"管理端",
"用户",
search_body,
["UID", "email", "name", "status", "role", "create_time"],
sort,
page_size,
page_no
)
if records:
for record in records:
record['uid'] = record.pop('UID')
record['邮箱'] = record.pop('email')
record['姓名'] = record.pop('name')
record['状态'] = record.pop('status')
record['角色'] = record.pop('role')
record['创建时间'] = record.pop('create_time')
else:
records = []
result = {
"records": records,
"total": total
}
return result
def create_user_impl(email, name, pwd, role):
"""
创建新用户流程
Parameters:
email str 邮箱
name str 姓名
pwd str 密码
role str 角色
Returns:
执行成功 bool True
执行失败 str 异常信息
"""
db = MongoHelper("tfse_v0.21")
def check_params():
"""
参数检查
"""
roles = ['admin', 'developer', 'analysts', 'manager', 'guest']
if role not in roles:
return "用户角色异常"
if not check_mail_fmt(email):
return "邮箱格式错误"
if not check_pwd_fmt(pwd):
return "密码格式错误"
email_is_existed = db.find_single_column(
"管理端",
"用户",
{"email": email},
"email"
)
if email_is_existed:
return "邮箱已被注册"
return True
def gen_new_uid():
"""
生成新的用户ID如果该ID存在则重新生成
Returns:
生成的用户ID
"""
def make_id(num):
"""
随机生成字符串
"""
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
salt = ''
for i in range(num):
salt += random.choice(choices)
return salt
def gen_id():
"""
生成新ID
检查新ID是否存在如果存在则继续生成新ID
若新ID可使用返回新ID
"""
new_id = make_id(8)
case = db.find_single_column(
"管理端",
"用户",
{"UID": new_id},
"UID"
) is not None
while case:
new_id = make_id(8)
return new_id
return gen_id()
def __main__():
"""
执行流程
"""
check_param_result = check_params()
if check_param_result is not True:
return check_param_result
user = dict()
user['UID'] = gen_new_uid()
user['email'] = email
user['name'] = name
user['pwd'] = generate_password_hash(decrypt_data(encrypt_msg=pwd))
user['status'] = 'normal'
user['role'] = role
user['create_time'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
db.upsert_single_data(
"管理端",
"用户",
{"UID": user['UID']},
user
)
return True
return __main__()
def login_impl(email, pwd, vcode):
db = MongoHelper("tfse_v0.21")
user_info = db.find_single_data(
"管理端",
"用户",
{"email": email},
["UID", "name", "pwd", "status", "role"]
)
def check_email():
if not user_info:
return "不存在该邮箱"
if not check_mail_fmt(email):
return "邮箱格式错误"
return True
def check_disable():
if user_info['status'] != 'normal':
return "账户已禁用"
return True
def check_vcode():
record = db.find_single_data(
"管理端",
"邮箱验证码记录",
{"email": email},
["vcode", "timestamp"]
)
if not record:
return "无验证信息"
if record['vcode'] != vcode:
return "验证码错误"
if time.time() - record['timestamp'] > 300:
return "验证码过期"
db.delete_single_data(
"管理端",
"邮箱验证码记录",
{"email": email}
)
return True
def check_pwd():
hash_pwd = user_info['pwd']
try:
if not check_password_hash(hash_pwd, decrypt_data(encrypt_msg=pwd)):
return "密码错误"
except Exception:
return "密码错误"
return True
def make_menus():
role = user_info['role']
with open(os.path.abspath(os.path.dirname(__file__)+'/static/menus.json'), "r", encoding='utf-8') as f:
duties = json.load(f)
return duties[role]
def make_session_id():
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
session_id = ''
for i in range(4):
session_id += random.choice(choices)
db.update_single_data(
"管理端",
"token记录",
{"UID": user_info['UID']},
{"session_id": session_id}
)
return session_id
def make_login_return_data():
return_data = dict()
return_data['name'] = user_info["name"]
return_data['token'] = create_token(make_session_id())
return_data['menus'] = make_menus()
return return_data
def __main__():
result = check_email()
if result is not True:
return result
result = check_disable()
if result is not True:
return result
result = check_pwd()
if result is not True:
return result
result = check_vcode()
if result is not True:
return result
return make_login_return_data()
return __main__()
def send_vcode_to_user_impl(email):
def check_param():
if not check_mail_fmt(email):
return "邮箱格式错误"
return True
def gen_vcode():
choices = '0123456789'
salt = ''
for i in range(6):
salt += random.choice(choices)
return salt
def send_email():
email_api = 'http://116.63.130.34:30001'
vcode = gen_vcode()
timestamp = round(time.time())
headers = {"Content-Type": "application/json;charset=UTF-8"}
data = {"title": "【远东资信】{}".format("登录验证码"),
"sender": 'fecribd@fecr.com.cn',
"recipients": [email],
"msg_body": "{}的验证码为 【{}5分钟内有效。".format("登录", vcode)}
requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data))
db = MongoHelper("tfse_v0.21")
db.upsert_single_data(
"管理端",
"邮箱验证码记录",
{"email": email},
{"vcode": vcode, "timestamp": timestamp}
)
return True
def __main__():
res = check_param()
if res is not True:
return res
res = send_email()
if res is not True:
return res
return True
return __main__()
def disable_user_impl(uid):
"""
禁用用户
"""
result = UPDATE_INFO("用户", "用户信息", {"UID": uid}, {"status": "disable"})
return result
def delete_user_impl(uid):
"""
删除用户
"""
result = DELETE_DATA("用户", "用户信息", {"UID": uid})
return result
def manage_role_of_user_impl(uid, role):
"""
管理用户角色
"""
result = UPDATE_INFO("用户", "用户信息", {"UID": uid}, {"role": role})
return result