异常日志模块

This commit is contained in:
王思川 2022-03-31 03:57:36 +08:00
parent 3f64eb9e26
commit b2cd0e1a49
5 changed files with 416 additions and 374 deletions

View File

@ -146,6 +146,17 @@ class MongoHelper:
record = list(collection.find(param3, columns).sort(list(param5.keys())[0], list(param5.values())[0]).limit(page_size).skip(skip_num))
return False if record == [] else record
def insert_single_data(self, param1, param2, param3):
"""
插入一条数据
param1: str 数据库
param2: str 数据集
param3: obj 插入数据
return: None
"""
collection = self.client[param1][param2]
collection.insert_one(param3)
def upsert_single_data(self, param1, param2, param3, param4):
"""
插入单条数据

View File

@ -1,18 +1,45 @@
import time
from flask import request
import functools
import traceback
from DBHelper.MongoHelper import MongoHelper
def error_info(func):
def error_log(func):
"""
校验token
return:
type:str
desc: token被解析后的值
"""
@functools.wraps(func)
def internal(*args, **kwargs):
try:
func()
except Exception:
return {"info": "参数异常了?"}, 500
except Exception as e:
db = MongoHelper("tfse_v0.21")
info = {
"ip": request.remote_addr,
"request_info": {
"path": request.path,
"method": request.method,
"headers": request.headers.__str__(),
"args": request.args.__str__(),
"json": request.json.__str__()
},
"traceback": traceback.format_exc(),
"exception": type(e).__name__,
"is_solved": "no",
"time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
}
db.insert_single_data(
"日志",
"异常日志",
info
)
return {"info": "发生什么事了?"}, 500
return func(*args, **kwargs)
return internal

View File

@ -12,392 +12,395 @@ from user.user_utils import check_mail_fmt, check_pwd_fmt, decrypt_data
from user.user_db import DELETE_DATA, UPDATE_INFO
def list_user_impl(param1, param2, param3, param4):
"""
查询用户列表
param1:
type: dict
desc: 用户搜索条件 支持模糊查询条件
demo: {"姓名": "xxx", "邮箱": "xxx"}
param2:
type: dict
desc: 排序条件 asc正序 desc倒序
demo: {"姓名": asc}
param3:
type: int
desc: 每页数据数量
param4:
type: int
desc: 当前页数
"""
# 查询体构造方法
def make_search_body(param):
body = dict()
search_keys = list(param.keys())
class UserImpl:
if "uid" in search_keys:
body['UID'] = {"UID": param['uid']}
@staticmethod
def list_user_impl(param1, param2, param3, param4):
"""
查询用户列表
param1:
type: dict
desc: 用户搜索条件 支持模糊查询条件
demo: {"姓名": "xxx", "邮箱": "xxx"}
param2:
type: dict
desc: 排序条件 asc正序 desc倒序
demo: {"姓名": asc}
param3:
type: int
desc: 每页数据数量
param4:
type: int
desc: 当前页数
"""
# 查询体构造方法
def make_search_body(param):
body = dict()
search_keys = list(param.keys())
if "姓名" in search_keys:
body['name'] = {"$regex": param['姓名']}
if "uid" in search_keys:
body['UID'] = {"UID": param['uid']}
if "邮箱" in search_keys:
body['email'] = {"$regex": param['邮箱']}
if "姓名" in search_keys:
body['name'] = {"$regex": param['姓名']}
if "状态" in search_keys:
if param['状态'] == "正常":
body['status'] = "normal"
elif param['状态'] == "停用":
body['status'] = "disable"
if "邮箱" in search_keys:
body['email'] = {"$regex": param['邮箱']}
if "状态" in search_keys:
if param['状态'] == "正常":
body['status'] = "normal"
elif param['状态'] == "停用":
body['status'] = "disable"
else:
pass
if "角色" in search_keys:
roles = list()
role_map = {"管理员": "admin", "分析师": "analysts", "开发者": "developer", "运营人员": "operator", "访客": "guest"}
for role in list(set(param['角色'])):
roles.append(role_map[role])
body['role'] = {"$in": roles}
return body
# 顺序条件构造方法
def make_sort_body(param):
if param != {}:
columns_map = {"姓名": "name", "邮箱": "email", "状态": "status", "角色": "role", "创建时间": "create_time"}
asc_or_desc = 1 if list(param.values())[0] == "asc" else -1
sort_column = columns_map[list(param.keys())[0]]
body = {sort_column: asc_or_desc}
else:
pass
body = {"create_time": -1}
if "角色" in search_keys:
roles = list()
role_map = {"管理员": "admin", "分析师": "analysts", "开发者": "developer", "运营人员": "operator", "访客": "guest"}
for role in list(set(param['角色'])):
roles.append(role_map[role])
body['role'] = {"$in": roles}
return body
return body
# 连接数据库
db = MongoHelper("tfse_v0.21")
# 顺序条件构造方法
def make_sort_body(param):
if param != {}:
columns_map = {"姓名": "name", "邮箱": "email", "状态": "status", "角色": "role", "创建时间": "create_time"}
asc_or_desc = 1 if list(param.values())[0] == "asc" else -1
sort_column = columns_map[list(param.keys())[0]]
body = {sort_column: asc_or_desc}
search_body = make_search_body(param1)
sort = make_sort_body(param2)
page_size = 10 if int(param3) > 10 else int(param3)
page_no = int(param4)
total = db.find_all_data_with_count(
"管理端",
"用户",
search_body
)
records = db.find_data_by_page_with_sort(
"管理端",
"用户",
search_body,
["UID", "email", "name", "status", "role", "create_time"],
sort,
page_size,
page_no
)
if records:
for record in records:
record['uid'] = record.pop('UID')
record['邮箱'] = record.pop('email')
record['姓名'] = record.pop('name')
record['状态'] = record.pop('status')
record['角色'] = record.pop('role')
record['创建时间'] = record.pop('create_time')
else:
body = {"create_time": -1}
records = []
return body
result = {
"records": records,
"total": total
}
# 连接数据库
db = MongoHelper("tfse_v0.21")
return result
search_body = make_search_body(param1)
sort = make_sort_body(param2)
page_size = 10 if int(param3) > 10 else int(param3)
page_no = int(param4)
total = db.find_all_data_with_count(
"管理端",
"用户",
search_body
)
records = db.find_data_by_page_with_sort(
"管理端",
"用户",
search_body,
["UID", "email", "name", "status", "role", "create_time"],
sort,
page_size,
page_no
)
if records:
for record in records:
record['uid'] = record.pop('UID')
record['邮箱'] = record.pop('email')
record['姓名'] = record.pop('name')
record['状态'] = record.pop('status')
record['角色'] = record.pop('role')
record['创建时间'] = record.pop('create_time')
else:
records = []
result = {
"records": records,
"total": total
}
return result
def create_user_impl(email, name, pwd, role):
"""
创建新用户流程
Parameters:
email str 邮箱
name str 姓名
pwd str 密码
role str 角色
Returns:
执行成功 bool True
执行失败 str 异常信息
"""
db = MongoHelper("tfse_v0.21")
def check_params():
@staticmethod
def create_user_impl(email, name, pwd, role):
"""
参数检查
创建新用户流程
Parameters:
email str 邮箱
name str 姓名
pwd str 密码
role str 角色
Returns:
执行成功 bool True
执行失败 str 异常信息
"""
roles = ['admin', 'developer', 'analysts', 'manager', 'guest']
if role not in roles:
return "用户角色异常"
db = MongoHelper("tfse_v0.21")
if not check_mail_fmt(email):
return "邮箱格式错误"
def check_params():
"""
参数检查
"""
roles = ['admin', 'developer', 'analysts', 'manager', 'guest']
if role not in roles:
return "用户角色异常"
if not check_pwd_fmt(pwd):
return "密码格式错误"
if not check_mail_fmt(email):
return "邮箱格式错误"
email_is_existed = db.find_single_column(
if not check_pwd_fmt(pwd):
return "密码格式错误"
email_is_existed = db.find_single_column(
"管理端",
"用户",
{"email": email},
"email"
)
if email_is_existed:
return "邮箱已被注册"
return True
def gen_new_uid():
"""
生成新的用户ID如果该ID存在则重新生成
Returns:
生成的用户ID
"""
def make_id(num):
"""
随机生成字符串
"""
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
salt = ''
for i in range(num):
salt += random.choice(choices)
return salt
def gen_id():
"""
生成新ID
检查新ID是否存在如果存在则继续生成新ID
若新ID可使用返回新ID
"""
new_id = make_id(8)
case = db.find_single_column(
"管理端",
"用户",
{"UID": new_id},
"UID"
) is not None
while case:
new_id = make_id(8)
return new_id
return gen_id()
def __main__():
"""
执行流程
"""
check_param_result = check_params()
if check_param_result is not True:
return check_param_result
user = dict()
user['UID'] = gen_new_uid()
user['email'] = email
user['name'] = name
user['pwd'] = generate_password_hash(decrypt_data(encrypt_msg=pwd))
user['status'] = 'normal'
user['role'] = role
user['create_time'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
db.upsert_single_data(
"管理端",
"用户",
{"UID": user['UID']},
user
)
return True
return __main__()
@staticmethod
def login_impl(email, pwd, vcode):
db = MongoHelper("tfse_v0.21")
user_info = db.find_single_data(
"管理端",
"用户",
{"email": email},
"email"
["UID", "name", "pwd", "status", "role"]
)
if email_is_existed:
return "邮箱已被注册"
def check_email():
if not user_info:
return "不存在该邮箱"
if not check_mail_fmt(email):
return "邮箱格式错误"
return True
return True
def check_disable():
if user_info['status'] != 'normal':
return "账户已禁用"
return True
def gen_new_uid():
"""
生成新的用户ID如果该ID存在则重新生成
Returns:
生成的用户ID
"""
def check_vcode():
record = db.find_single_data(
"管理端",
"邮箱验证码记录",
{"email": email},
["vcode", "timestamp"]
)
def make_id(num):
"""
随机生成字符串
"""
if not record:
return "无验证信息"
if record['vcode'] != vcode:
return "验证码错误"
if time.time() - record['timestamp'] > 300:
return "验证码过期"
db.delete_single_data(
"管理端",
"邮箱验证码记录",
{"email": email}
)
return True
def check_pwd():
hash_pwd = user_info['pwd']
try:
if not check_password_hash(hash_pwd, decrypt_data(encrypt_msg=pwd)):
return "密码错误"
except Exception:
return "密码错误"
return True
def make_menus():
role = user_info['role']
with open(os.path.abspath(os.path.dirname(__file__)+'/static/menus.json'), "r", encoding='utf-8') as f:
duties = json.load(f)
return duties[role]
def make_session_id():
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
session_id = ''
for i in range(4):
session_id += random.choice(choices)
db.update_single_data(
"管理端",
"token记录",
{"UID": user_info['UID']},
{"session_id": session_id}
)
return session_id
def make_login_return_data():
return_data = dict()
return_data['name'] = user_info["name"]
return_data['token'] = create_token(make_session_id())
return_data['menus'] = make_menus()
return return_data
def __main__():
result = check_email()
if result is not True:
return result
result = check_disable()
if result is not True:
return result
result = check_pwd()
if result is not True:
return result
result = check_vcode()
if result is not True:
return result
return make_login_return_data()
return __main__()
@staticmethod
def send_vcode_to_user_impl(email):
def check_param():
if not check_mail_fmt(email):
return "邮箱格式错误"
return True
def gen_vcode():
choices = '0123456789'
salt = ''
for i in range(num):
for i in range(6):
salt += random.choice(choices)
return salt
def gen_id():
"""
生成新ID
检查新ID是否存在如果存在则继续生成新ID
若新ID可使用返回新ID
"""
new_id = make_id(8)
def send_email():
email_api = 'http://116.63.130.34:30001'
vcode = gen_vcode()
timestamp = round(time.time())
headers = {"Content-Type": "application/json;charset=UTF-8"}
data = {"title": "【远东资信】{}".format("登录验证码"),
"sender": 'fecribd@fecr.com.cn',
"recipients": [email],
"msg_body": "{}的验证码为 【{}5分钟内有效。".format("登录", vcode)}
requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data))
case = db.find_single_column(
db = MongoHelper("tfse_v0.21")
db.upsert_single_data(
"管理端",
"用户",
{"UID": new_id},
"UID"
) is not None
"邮箱验证码记录",
{"email": email},
{"vcode": vcode, "timestamp": timestamp}
)
return True
while case:
new_id = make_id(8)
return new_id
def __main__():
res = check_param()
if res is not True:
return res
return gen_id()
res = send_email()
if res is not True:
return res
def __main__():
return True
return __main__()
@staticmethod
def disable_user_impl(uid):
"""
执行流程
禁用用户
"""
check_param_result = check_params()
if check_param_result is not True:
return check_param_result
result = UPDATE_INFO("用户", "用户信息", {"UID": uid}, {"status": "disable"})
return result
user = dict()
user['UID'] = gen_new_uid()
user['email'] = email
user['name'] = name
user['pwd'] = generate_password_hash(decrypt_data(encrypt_msg=pwd))
user['status'] = 'normal'
user['role'] = role
user['create_time'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
@staticmethod
def delete_user_impl(uid):
"""
删除用户
"""
result = DELETE_DATA("用户", "用户信息", {"UID": uid})
return result
db.upsert_single_data(
"管理端",
"用户",
{"UID": user['UID']},
user
)
return True
return __main__()
def login_impl(email, pwd, vcode):
db = MongoHelper("tfse_v0.21")
user_info = db.find_single_data(
"管理端",
"用户",
{"email": email},
["UID", "name", "pwd", "status", "role"]
)
def check_email():
if not user_info:
return "不存在该邮箱"
if not check_mail_fmt(email):
return "邮箱格式错误"
return True
def check_disable():
if user_info['status'] != 'normal':
return "账户已禁用"
return True
def check_vcode():
record = db.find_single_data(
"管理端",
"邮箱验证码记录",
{"email": email},
["vcode", "timestamp"]
)
if not record:
return "无验证信息"
if record['vcode'] != vcode:
return "验证码错误"
if time.time() - record['timestamp'] > 300:
return "验证码过期"
db.delete_single_data(
"管理端",
"邮箱验证码记录",
{"email": email}
)
return True
def check_pwd():
hash_pwd = user_info['pwd']
try:
if not check_password_hash(hash_pwd, decrypt_data(encrypt_msg=pwd)):
return "密码错误"
except Exception:
return "密码错误"
return True
def make_menus():
role = user_info['role']
with open(os.path.abspath(os.path.dirname(__file__)+'/static/menus.json'), "r", encoding='utf-8') as f:
duties = json.load(f)
return duties[role]
def make_session_id():
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
session_id = ''
for i in range(4):
session_id += random.choice(choices)
db.update_single_data(
"管理端",
"token记录",
{"UID": user_info['UID']},
{"session_id": session_id}
)
return session_id
def make_login_return_data():
return_data = dict()
return_data['name'] = user_info["name"]
return_data['token'] = create_token(make_session_id())
return_data['menus'] = make_menus()
return return_data
def __main__():
result = check_email()
if result is not True:
return result
result = check_disable()
if result is not True:
return result
result = check_pwd()
if result is not True:
return result
result = check_vcode()
if result is not True:
return result
return make_login_return_data()
return __main__()
def send_vcode_to_user_impl(email):
def check_param():
if not check_mail_fmt(email):
return "邮箱格式错误"
return True
def gen_vcode():
choices = '0123456789'
salt = ''
for i in range(6):
salt += random.choice(choices)
return salt
def send_email():
email_api = 'http://116.63.130.34:30001'
vcode = gen_vcode()
timestamp = round(time.time())
headers = {"Content-Type": "application/json;charset=UTF-8"}
data = {"title": "【远东资信】{}".format("登录验证码"),
"sender": 'fecribd@fecr.com.cn',
"recipients": [email],
"msg_body": "{}的验证码为 【{}5分钟内有效。".format("登录", vcode)}
requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data))
db = MongoHelper("tfse_v0.21")
db.upsert_single_data(
"管理端",
"邮箱验证码记录",
{"email": email},
{"vcode": vcode, "timestamp": timestamp}
)
return True
def __main__():
res = check_param()
if res is not True:
return res
res = send_email()
if res is not True:
return res
return True
return __main__()
def disable_user_impl(uid):
"""
禁用用户
"""
result = UPDATE_INFO("用户", "用户信息", {"UID": uid}, {"status": "disable"})
return result
def delete_user_impl(uid):
"""
删除用户
"""
result = DELETE_DATA("用户", "用户信息", {"UID": uid})
return result
def manage_role_of_user_impl(uid, role):
"""
管理用户角色
"""
result = UPDATE_INFO("用户", "用户信息", {"UID": uid}, {"role": role})
return result
@staticmethod
def manage_role_of_user_impl(uid, role):
"""
管理用户角色
"""
result = UPDATE_INFO("用户", "用户信息", {"UID": uid}, {"role": role})
return result

View File

@ -1,5 +1,4 @@
from user.user_impl import create_user_impl, login_impl, send_vcode_to_user_impl, list_user_impl, \
manage_role_of_user_impl, disable_user_impl, delete_user_impl
from user.user_impl import UserImpl
class User:
@ -32,7 +31,7 @@ class UserManage(User):
"""
用户信息列表
"""
return list_user_impl(search, sort, page_size, page_no)
return UserImpl.list_user_impl(search, sort, page_size, page_no)
def send_vcode_to_user(self):
"""
@ -41,7 +40,7 @@ class UserManage(User):
发送成功 bool 返回True
发送失败 str 返回异常信息
"""
return send_vcode_to_user_impl(self.email)
return UserImpl.send_vcode_to_user_impl(self.email)
def create_user(self):
"""
@ -50,7 +49,7 @@ class UserManage(User):
创建成功 bool 返回True
创建失败 str 返回异常信息
"""
return create_user_impl(self.email, self.name, self.pwd, self.role)
return UserImpl.create_user_impl(self.email, self.name, self.pwd, self.role)
def disable_user(self):
"""
@ -87,16 +86,16 @@ class UserOperation(User):
"""
用户登录
"""
return login_impl(self.email, self.pwd, self.vcode)
return UserImpl.login_impl(self.email, self.pwd, self.vcode)
# 停用
def deactivate(self):
return disable_user_impl(self.uid)
return UserImpl.disable_user_impl(self.uid)
# 删除
def delete(self):
return delete_user_impl(self.uid)
return UserImpl.delete_user_impl(self.uid)
# 管理角色
def manage(self):
return manage_role_of_user_impl(self.uid, self.role)
return UserImpl.manage_role_of_user_impl(self.uid, self.role)

View File

@ -1,6 +1,6 @@
from flask import Blueprint, request
from Log.log_utils import error_info
from Log.log_utils import error_log
from user.user_obj import UserManage, UserOperation
from user.user_auth import verify_token, authority_scope
@ -19,17 +19,19 @@ def online_check_route(**kwargs):
@user_route.route('/list_user', methods=['POST'])
@verify_token
@authority_scope(['admin'])
@error_info
@error_log
def list_user_route(**kwargs):
"""
用户信息列表接口
"""
user_manage = UserManage()
search = request.json['search']
sort = request.json['sort']
page_size = request.json['page_size']
page_no = request.json['page_no']
result = user_manage.list_user(search, sort, page_size, page_no)
req = request.json
user = UserManage()
result = user.list_user(
req['search'],
req['sort'],
req['page_size'],
req['page_no']
)
return {"info": '查询结果', "result": result}, 200