重写用户登录方法

This commit is contained in:
王思川 2022-04-01 11:15:09 +08:00
parent 9c623c06e6
commit 16b090d335
4 changed files with 177 additions and 248 deletions

View File

@ -1,51 +0,0 @@
import time
from flask import request
import functools
import traceback
from DBHelper.MongoHelper import MongoHelper
from Utils.ErrorUtil import ConditionCheckFailed
def error_log(func):
"""
异常日志
"""
def save_error_log(e):
db = MongoHelper("tfse_v0.21")
info = {
"ip": request.remote_addr,
"request_info": {
"path": request.path,
"method": request.method,
"headers": request.headers.__str__(),
"args": request.args.__str__(),
"json": request.json.__str__()
},
"traceback": traceback.format_exc(),
"exception": type(e).__name__,
"is_solved": "no",
"time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
}
db.insert_single_data(
"日志",
"异常日志",
info
)
@functools.wraps(func)
def internal(*args, **kwargs):
try:
func()
except ConditionCheckFailed as e:
save_error_log(e)
return {"info": e.__str__()}, 400
except Exception as e:
save_error_log(e)
return {"info": "发生什么事了?"}, 400
return func(*args, **kwargs)
return internal

View File

@ -4,13 +4,13 @@ import time
import random
import requests
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.security import check_password_hash
from DBHelper.MongoHelper import MongoHelper
from Utils.ErrorUtil import ReturnConditionCheckFailed
from user.user_auth import create_token
from user.user_obj import UserManage, ListUser, User
from user.user_utils import check_mail_fmt, decrypt_data
from user.user_obj import UserManage, ListUser, User, UserLogin
from user.user_utils import decrypt_data
class UserManageImpl(UserManage):
@ -123,117 +123,6 @@ class UserManageImpl(UserManage):
return __main__()
class UserOperateImpl:
@staticmethod
def login_impl(email, pwd, vcode):
db = MongoHelper("tfse_v0.21")
user_info = db.find_single_data(
"管理端",
"用户",
{"email": email},
["UID", "name", "pwd", "status", "role"]
)
def check_email():
if not user_info:
return "不存在该邮箱"
if not check_mail_fmt(email):
return "邮箱格式错误"
return True
def check_disable():
if user_info['status'] != 'normal':
return "账户已禁用"
return True
def check_vcode():
record = db.find_single_data(
"管理端",
"邮箱验证码记录",
{"email": email},
["vcode", "timestamp"]
)
if not record:
return "无验证信息"
if record['vcode'] != vcode:
return "验证码错误"
if time.time() - record['timestamp'] > 300:
return "验证码过期"
db.delete_single_data(
"管理端",
"邮箱验证码记录",
{"email": email}
)
return True
def check_pwd():
hash_pwd = user_info['pwd']
try:
if not check_password_hash(hash_pwd, decrypt_data(encrypt_msg=pwd)):
return "密码错误"
except Exception:
return "密码错误"
return True
def make_menus():
role = user_info['role']
with open(os.path.abspath(os.path.dirname(__file__)+'/static/menus.json'), "r", encoding='utf-8') as f:
duties = json.load(f)
return duties[role]
def make_session_id():
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
session_id = ''
for i in range(4):
session_id += random.choice(choices)
db.update_single_data(
"管理端",
"token记录",
{"UID": user_info['UID']},
{"session_id": session_id}
)
return session_id
def make_login_return_data():
return_data = dict()
return_data['name'] = user_info["name"]
return_data['token'] = create_token(make_session_id())
return_data['menus'] = make_menus()
return return_data
def __main__():
result = check_email()
if result is not True:
return result
result = check_disable()
if result is not True:
return result
result = check_pwd()
if result is not True:
return result
result = check_vcode()
if result is not True:
return result
return make_login_return_data()
return __main__()
class ListUserImpl(ListUser):
db = MongoHelper("tfse_v0.21")
@ -322,3 +211,92 @@ class ListUserImpl(ListUser):
}
return result
class UserLoginImpl(UserLogin):
"""用户登录实现"""
db = MongoHelper("tfse_v0.21")
def login(self):
""""""
user_info = self.db.find_single_data(
"管理端",
"用户",
{"email": self.email},
["UID", "name", "pwd", "status", "role"]
)
def check_email_existed():
if not user_info:
raise ReturnConditionCheckFailed("邮箱不存在", 200)
def check_user_is_disable():
if user_info['status'] != 'normal':
raise ReturnConditionCheckFailed("账户已禁用", 200)
def check_vcode_is_correct():
record = self.db.find_single_data(
"管理端",
"邮箱验证码记录",
{"email": self.email},
["vcode", "timestamp"]
)
if not record:
raise ReturnConditionCheckFailed("验证码不存在", 200)
if record['vcode'] != self.vcode:
raise ReturnConditionCheckFailed("验证码错误", 200)
if time.time() - record['timestamp'] > 300:
raise ReturnConditionCheckFailed("验证码过期", 200)
self.db.delete_single_data(
"管理端",
"邮箱验证码记录",
{"email": self.email}
)
def check_pwd_is_correct():
try:
if not check_password_hash(user_info['pwd'], decrypt_data(encrypt_msg=self.pwd)):
raise ReturnConditionCheckFailed("密码错误", 200)
except Exception:
raise ReturnConditionCheckFailed("密码错误", 200)
def make_menus():
role = user_info['role']
with open(os.path.abspath(os.path.dirname(__file__)+'/static/menus.json'), "r", encoding='utf-8') as f:
duties = json.load(f)
self.menus = duties[role]
def make_token():
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
session_id = ''
for i in range(4):
session_id += random.choice(choices)
self.db.update_single_data(
"管理端",
"token记录",
{"UID": user_info['UID']},
{"session_id": session_id}
)
self.token = create_token(session_id)
def __main__():
check_email_existed()
check_user_is_disable()
check_pwd_is_correct()
check_vcode_is_correct()
make_menus()
make_token()
self.name = user_info['name']
return self.dict_to_show()
return __main__()

View File

@ -68,9 +68,11 @@ class User:
if self.role not in ['admin', 'analysts', 'developer', 'operator', 'guest']:
raise ReturnConditionCheckFailed("用户角色格式错误", 200)
def check_user_obj(self, columns):
def check_obj(self, **kwargs):
"""对象字段校验"""
columns = kwargs['columns']
for column in columns:
if column == "uid":
self.check_uid()
@ -132,69 +134,6 @@ class User:
return user_dict
class UserManage(User):
"""
用户管理
"""
def create(self):
"""创建新用户"""
def send_vcode_to_user(self):
"""发送验证码"""
def disable_user(self):
"""
禁用用户
"""
# TODO
def delete_user(self):
"""
删除用户
"""
# TODO
def manage_role_of_user(self):
"""
管理用户角色
"""
# TODO
class UserOperation(User):
"""
用户操作
"""
def __init__(self):
super().__init__()
self.token = None
self.block = None
self.vcode = None
# 登录
def login(self):
"""
用户登录
"""
# 停用
def deactivate(self):
"""停用"""
# return UserOperateImpl.disable_user_impl(self.uid)
# 删除
def delete(self):
"""删除"""
# return UserOperateImpl.delete_user_impl(self.uid)
# 管理角色
def manage(self):
"""管理角色"""
# return UserOperateImpl.manage_role_of_user_impl(self.uid, self.role)
class ListUser:
"""用户列表"""
@ -283,3 +222,67 @@ class ListUser:
self.check_sort()
self.check_page_size()
self.check_page_no()
class UserLogin(User):
"""用户登录"""
def __init__(self):
super().__init__()
self.vcode = None
self.token = None
self.menus = None
def login(self):
"""登录"""
def check_vcode(self):
"""检查验证码格式"""
if type(self.vcode) is not str:
raise ReturnConditionCheckFailed("验证码格式错误", 200)
def check_obj(self):
""""""
self.check_email()
self.check_vcode()
def dict_to_show(self):
"""显示对象"""
show_dict = {
"name": self.name,
"token": self.token,
"menus": self.menus
}
return show_dict
class UserManage(User):
"""用户管理"""
def create(self):
"""创建新用户"""
def send_vcode_to_user(self):
"""发送验证码"""
def disable_user(self):
"""禁用用户"""
def delete_user(self):
"""删除用户"""
def manage_role_of_user(self):
"""管理用户角色"""
class UserOperation(User):
"""用户操作"""
def __init__(self):
super().__init__()
self.token = None
self.block = None
self.vcode = None
self.is_normal = None
self.menus = None

View File

@ -1,7 +1,7 @@
from flask import Blueprint, request
from Utils.ErrorUtil import ReturnConditionCheckFailed
from user.user_impl import UserManageImpl, ListUserImpl
from user.user_impl import UserManageImpl, ListUserImpl, UserLoginImpl
from user.user_obj import UserManage, UserOperation
from user.user_auth import verify_token, authority_scope
@ -26,7 +26,7 @@ def create_user_route(**kwargs):
req = request.json
user_manage = UserManageImpl()
user_manage.email, user_manage.name, user_manage.pwd, user_manage.role = req['email'], req['name'], req['pwd'], req['role']
user_manage.check_user_obj(["email", "name", "pwd", "role"])
user_manage.check_obj(columns=["email", "name", "pwd", "role"])
user_manage.create()
return {"info": "用户创建成功"}
except ReturnConditionCheckFailed as e:
@ -57,6 +57,22 @@ def list_user_route(**kwargs):
return {"info": e.__str__()}, e.status_code
@user_route.route('/login', methods=['POST'])
def login_route():
"""登录"""
try:
req = request.json
user_login = UserLoginImpl()
user_login.email, user_login.pwd, user_login.vcode = req['email'], req['pwd'], req['vcode']
user_login.check_obj()
user_login.login()
result = user_login.dict_to_show()
return {"info": "登录成功", "result": result}, 200
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.__str__()}, e.status_code
@user_route.route('/send_vcode_to_user', methods=['POST'])
def send_vcode_to_user_route():
"""
@ -72,23 +88,6 @@ def send_vcode_to_user_route():
return {"info": result}, 200
@user_route.route('/login', methods=['POST'])
def login_route():
"""
登录
"""
req = request.json
uo = UserOperation()
uo.email, uo.pwd, uo.vcode = req['email'], req['pwd'], req['vcode']
result = uo.login()
if type(result) == dict:
return {"info": "登录成功", "result": result}, 200
else:
return {"info": result}, 200
@user_route.route('/disable_user', methods=['GET'])
@verify_token
@authority_scope(['admin'])