重写创建新用户方法

This commit is contained in:
王思川 2022-04-01 02:24:02 +08:00
parent 04c22ca47b
commit 89d8cd7de8
7 changed files with 256 additions and 79 deletions

View File

@ -32,7 +32,7 @@ def primary_industry_update_data_impl(param):
"update_time": "更新时间"
}
result = dict_fields_map(param.__dict__, fields_map)
result = dict_fields_map(param.dict_to_save, fields_map)
UPDATE_DATA('行业宏观分析', {"行业": result["行业"]}, result)

View File

@ -1,10 +1,51 @@
import time
import traceback
from flask import request
from DBHelper.MongoHelper import MongoHelper
class ObjColumnCheckError(RuntimeError):
"""对象字段检查异常"""
class ConditionCheckFailed(RuntimeError):
"""条件检查失败 抛出异常"""
def __init__(self, error_info):
self.error_info = error_info
def __str__(self):
return self.error_info
class ReturnConditionCheckFailed(RuntimeError):
"""条件检查失败 抛出异常 接口返回失败原因和状态码"""
def __init__(self, failed_info, status_code):
self.failed_info = failed_info # 失败信息
self.status_code = status_code # 状态码
def __str__(self):
return self.failed_info
def log_error(self):
db = MongoHelper("tfse_v0.21")
info = {
"ip": request.remote_addr,
"request_info": {
"path": request.path,
"method": request.method,
"headers": request.headers.__str__(),
"args": request.args.__str__(),
"json": request.json.__str__()
},
"traceback": traceback.format_exc(),
"exception": type(self).__name__,
"is_solved": "no",
"time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
}
db.insert_single_data(
"日志",
"异常日志",
info
)

View File

@ -5,7 +5,7 @@ import functools
import traceback
from DBHelper.MongoHelper import MongoHelper
from Utils.ErrorUtil import ObjColumnCheckError
from Utils.ErrorUtil import ConditionCheckFailed
def error_log(func):
@ -40,7 +40,7 @@ def error_log(func):
def internal(*args, **kwargs):
try:
func()
except ObjColumnCheckError as e:
except ConditionCheckFailed as e:
save_error_log(e)
return {"info": e.__str__()}, 400
except Exception as e:

13
Utils/ObjUtil.py Normal file
View File

@ -0,0 +1,13 @@
class ObjUtil:
"""对象实用程序"""
@staticmethod
def sub_dict(base_dict, sub_keys):
"""
字典子集
base_dict: 原字典
sub_keys: 子集键值
"""
return {key: base_dict[key] for key in sub_keys}

View File

@ -7,15 +7,89 @@ import requests
from werkzeug.security import generate_password_hash, check_password_hash
from DBHelper.MongoHelper import MongoHelper
from Utils.ErrorUtil import ReturnConditionCheckFailed
from user.user_auth import create_token
from user.user_obj import User
from user.user_utils import check_mail_fmt, decrypt_data
from user.user_db import DELETE_DATA, UPDATE_INFO
class UserImpl(User):
"""管理端用户实现"""
db = MongoHelper("tfse_v0.21")
def create(self):
def check_email_registered():
"""检查邮箱是否已注册"""
email_is_existed = self.db.find_single_column(
"管理端",
"用户",
{"email": self.email},
"email"
)
if email_is_existed:
raise ReturnConditionCheckFailed("邮箱已被注册", 200)
def generate_new_uid():
"""生成新的用户ID"""
def uid_maker(num):
"""
用户ID生成器
num: ID长度 int
"""
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
salt = ''
for i in range(num):
salt += random.choice(choices)
return salt
def uid_checker(__uid):
"""
用户ID重复检查
__uid: 用户ID str
return:
True 用户ID可用
False 用户ID不可用
"""
is_uid_existed = self.db.find_single_column(
"管理端",
"用户",
{"UID": __uid},
"UID"
)
return True if is_uid_existed is None else False
uid = uid_maker(8)
while not uid_checker(uid):
uid = uid_maker(8)
self.uid = uid
def __main__():
check_email_registered()
generate_new_uid()
self.status = "normal"
self.create_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
self.db.upsert_single_data(
"管理端",
"用户",
{"UID": self.uid},
self.dict_to_save()
)
__main__()
class UserManageImpl:
@staticmethod
def list_user_impl(param1, param2, param3, param4):
def list_user(param1, param2, param3, param4):
"""
查询用户列表
param1:

View File

@ -1,37 +1,32 @@
import re
from Utils.ErrorUtil import ObjColumnCheckError
from user.user_impl import UserManageImpl, UserOperateImpl
from Utils.ErrorUtil import ReturnConditionCheckFailed
from user.user_utils import decrypt_data
class User:
"""
用户对象
uid: str 用户ID
email: str 邮箱
name: str 用户名
pwd: str 密码
status: str 用户状态
role: str 角色
"""
"""管理端用户"""
def __init__(self):
self.uid = None
self.email = None
self.name = None
self.pwd = None
self.status = None
self.role = None
self.uid = None # 用户ID str
self.email = None # 邮箱 str
self.name = None # 用户名 str
self.pwd = None # 密码 str
self.status = None # 用户状态 str
self.role = None # 角色 str
self.create_time = None # 创建时间 str
def create(self):
"""创建新用户"""
def check_uid(self):
"""用户ID校验"""
if type(self.uid) is not str:
raise ObjColumnCheckError("用户ID格式错误")
raise ReturnConditionCheckFailed("用户ID格式错误", 200)
if len(self.uid) != 8:
raise ObjColumnCheckError("用户ID格式错误")
raise ReturnConditionCheckFailed("用户ID格式错误", 200)
def check_email(self):
"""邮箱格式仅允许@fecr.com.cn"""
@ -40,41 +35,115 @@ class User:
case = (len(self.email) > 7) and (re.match(regex, self.email) is not None)
result = True if case else False
if not result:
raise ObjColumnCheckError("邮箱格式错误")
raise ReturnConditionCheckFailed("邮箱格式错误", 200)
def check_name(self):
"""用户名格式校验"""
if type(self.name) is not str:
raise ObjColumnCheckError("用户名格式错误")
raise ReturnConditionCheckFailed("用户名格式错误", 200)
def check_pwd(self):
"""密码格式校验"""
if type(self.pwd) is not str:
raise ObjColumnCheckError("密码格式错误")
raise ReturnConditionCheckFailed("密码格式错误", 200)
password = decrypt_data(encrypt_msg=self.pwd)
if not password:
raise ObjColumnCheckError("密码格式错误")
raise ReturnConditionCheckFailed("密码格式错误", 200)
regex = "^(?![A-Za-z0-9]+$)(?![a-z0-9\\W]+$)(?![A-Za-z\\W]+$)(?![A-Z0-9\\W]+$)^.{8,}$"
case = (len(password) >= 8) and (re.match(regex, password) is not None)
result = True if case else False
if not result:
raise ObjColumnCheckError("密码格式错误")
raise ReturnConditionCheckFailed("密码格式错误", 200)
def check_status(self):
"""用户状态校验"""
if self.status not in ['normal', 'disable']:
raise ObjColumnCheckError("用户状态格式错误")
raise ReturnConditionCheckFailed("用户状态格式错误", 200)
def check_role(self):
"""用户角色校验"""
if self.role not in ['admin', 'analysts', 'developer', 'operator', 'guest']:
raise ObjColumnCheckError("用户角色格式错误")
raise ReturnConditionCheckFailed("用户角色格式错误", 200)
def check_user_obj(self, columns):
"""对象字段校验"""
for column in columns:
if column == "uid":
self.check_uid()
elif column == "email":
self.check_email()
elif column == "name":
self.check_name()
elif column == "pwd":
self.check_pwd()
elif column == "status":
self.check_status()
elif column == "role":
self.check_role()
else:
pass
def trans_status(self):
"""用户状态转换成前端显示格式"""
return "正常" if self.status == "normal" else "停用"
def trans_role(self):
"""用户角色转换成前端显示格式"""
role_map = {"admin": "管理员", "analysts": "分析师", "developer": "开发者", "operator": "运营人员", "guest": "访客"}
return role_map[self.role]
def dict_to_save(self, **kwargs):
"""存储对象"""
user_dict = {
"UID": self.uid,
"email": self.email,
"name": self.name,
"pwd": self.pwd,
"status": self.status,
"role": self.role,
"create_time": self.create_time
}
if 'columns' in list(kwargs.keys()):
user_dict = {key: user_dict for key in kwargs['columns']}
return user_dict
def dict_to_show(self, **kwargs):
"""显示对象"""
user_dict = {
"uid": self.uid,
"邮箱": self.email,
"姓名": self.name,
"状态": self.trans_status(),
"角色": self.trans_role(),
"创建时间": self.create_time
}
if 'columns' in list(kwargs.keys()):
user_dict = {key: user_dict for key in kwargs['columns']}
return user_dict
class ListUser(User):
"""查询用户列表"""
def __init__(self, search, sort, page_size, page_no):
super().__init__()
self.search = search
self.sort = sort
self.page_size = page_size
self.page_no = page_no
class UserManage(User):
@ -85,9 +154,8 @@ class UserManage(User):
@staticmethod
def list_user(search, sort, page_size, page_no):
"""
用户信息列表
查询用户列表
"""
return UserManageImpl.list_user_impl(search, sort, page_size, page_no)
def send_vcode_to_user(self):
"""
@ -96,17 +164,6 @@ class UserManage(User):
发送成功 bool 返回True
发送失败 str 返回异常信息
"""
self.check_email()
return UserManageImpl.send_vcode_to_user_impl(self.email)
def create_user(self):
"""
创建用户
Returns:
创建成功 bool 返回True
创建失败 str 返回异常信息
"""
return UserManageImpl.create_user_impl(self.email, self.name, self.pwd, self.role)
def disable_user(self):
"""
@ -143,22 +200,18 @@ class UserOperation(User):
"""
用户登录
"""
return UserOperateImpl.login_impl(self.email, self.pwd, self.vcode)
# 停用
def deactivate(self):
return UserOperateImpl.disable_user_impl(self.uid)
"""停用"""
# return UserOperateImpl.disable_user_impl(self.uid)
# 删除
def delete(self):
return UserOperateImpl.delete_user_impl(self.uid)
"""删除"""
# return UserOperateImpl.delete_user_impl(self.uid)
# 管理角色
def manage(self):
return UserOperateImpl.manage_role_of_user_impl(self.uid, self.role)
if __name__ == '__main__':
user = User()
user.email = "aaa"
user.check_email()
"""管理角色"""
# return UserOperateImpl.manage_role_of_user_impl(self.uid, self.role)

View File

@ -1,6 +1,7 @@
from flask import Blueprint, request
from Utils.LogUtil import error_log
from Utils.ErrorUtil import ReturnConditionCheckFailed
from user.user_impl import UserManageImpl, UserImpl
from user.user_obj import UserManage, UserOperation
from user.user_auth import verify_token, authority_scope
@ -16,16 +17,32 @@ def online_check_route(**kwargs):
return {"info": "正常"}, 200
@user_route.route('/create_user', methods=['POST'])
@verify_token
@authority_scope(['admin'])
def create_user_route(**kwargs):
"""新建用户"""
try:
req = request.json
user = UserImpl()
user.email, user.name, user.pwd, user.role = req['email'], req['name'], req['pwd'], req['role']
user.check_user_obj(["email", "name", "pwd", "role"])
user.create()
return {"info": "用户创建成功"}
except ReturnConditionCheckFailed as e:
e.log_error()
return {"info": e.__str__()}, e.status_code
@user_route.route('/list_user', methods=['POST'])
@verify_token
@authority_scope(['admin'])
@error_log
def list_user_route(**kwargs):
"""
用户信息列表接口
"""
req = request.json
user = UserManage()
user = UserManageImpl()
result = user.list_user(
req['search'],
req['sort'],
@ -36,7 +53,6 @@ def list_user_route(**kwargs):
@user_route.route('/send_vcode_to_user', methods=['POST'])
@error_log
def send_vcode_to_user_route():
"""
发送验证码
@ -68,26 +84,6 @@ def login_route():
return {"info": result}, 200
@user_route.route('/create_user', methods=['POST'])
@verify_token
@authority_scope(['admin'])
def create_user_route(**kwargs):
"""
新建用户
"""
req = request.json
um = UserManage()
um.email, um.name, um.pwd, um.role = req['email'], req['name'], req['pwd'], req['role']
result = um.create_user()
if result is True:
return {"info": "新建用户成功"}, 200
else:
return {"info": result}, 200
@user_route.route('/disable_user', methods=['GET'])
@verify_token
@authority_scope(['admin'])