Merge branch 'v0.21' into 'master'

V0.21

See merge request root/tfse_admin!23
This commit is contained in:
FECR-IBD 2022-03-31 02:59:45 +00:00
commit 9aed362a4d
7 changed files with 201 additions and 135 deletions

View File

@ -1,45 +0,0 @@
import time
from flask import request
import functools
import traceback
from DBHelper.MongoHelper import MongoHelper
def error_log(func):
"""
"""
@functools.wraps(func)
def internal(*args, **kwargs):
try:
func()
except Exception as e:
db = MongoHelper("tfse_v0.21")
info = {
"ip": request.remote_addr,
"request_info": {
"path": request.path,
"method": request.method,
"headers": request.headers.__str__(),
"args": request.args.__str__(),
"json": request.json.__str__()
},
"traceback": traceback.format_exc(),
"exception": type(e).__name__,
"is_solved": "no",
"time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
}
db.insert_single_data(
"日志",
"异常日志",
info
)
return {"info": "发生什么事了?"}, 500
return func(*args, **kwargs)
return internal

10
Utils/ErrorUtil.py Normal file
View File

@ -0,0 +1,10 @@
class ObjColumnCheckError(RuntimeError):
"""对象字段检查异常"""
def __init__(self, error_info):
self.error_info = error_info
def __str__(self):
return self.error_info

51
Utils/LogUtil.py Normal file
View File

@ -0,0 +1,51 @@
import time
from flask import request
import functools
import traceback
from DBHelper.MongoHelper import MongoHelper
from Utils.ErrorUtil import ObjColumnCheckError
def error_log(func):
"""
异常日志
"""
def save_error_log(e):
db = MongoHelper("tfse_v0.21")
info = {
"ip": request.remote_addr,
"request_info": {
"path": request.path,
"method": request.method,
"headers": request.headers.__str__(),
"args": request.args.__str__(),
"json": request.json.__str__()
},
"traceback": traceback.format_exc(),
"exception": type(e).__name__,
"is_solved": "no",
"time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
}
db.insert_single_data(
"日志",
"异常日志",
info
)
@functools.wraps(func)
def internal(*args, **kwargs):
try:
func()
except ObjColumnCheckError as e:
save_error_log(e)
return {"info": e.__str__()}, 400
except Exception as e:
save_error_log(e)
return {"info": "发生什么事了?"}, 400
return func(*args, **kwargs)
return internal

View File

@ -8,11 +8,11 @@ from werkzeug.security import generate_password_hash, check_password_hash
from DBHelper.MongoHelper import MongoHelper
from user.user_auth import create_token
from user.user_utils import check_mail_fmt, check_pwd_fmt, decrypt_data
from user.user_utils import check_mail_fmt, decrypt_data
from user.user_db import DELETE_DATA, UPDATE_INFO
class UserImpl:
class UserManageImpl:
@staticmethod
def list_user_impl(param1, param2, param3, param4):
@ -105,7 +105,7 @@ class UserImpl:
record['uid'] = record.pop('UID')
record['邮箱'] = record.pop('email')
record['姓名'] = record.pop('name')
record['状态'] = record.pop('status')
record['状态'] = "正常" if record.pop('status') == "normal" else "停用"
record['角色'] = record.pop('role')
record['创建时间'] = record.pop('create_time')
else:
@ -118,6 +118,45 @@ class UserImpl:
return result
@staticmethod
def send_vcode_to_user_impl(email):
def gen_vcode():
choices = '0123456789'
salt = ''
for i in range(6):
salt += random.choice(choices)
return salt
def send_email():
email_api = 'http://116.63.130.34:30001'
vcode = gen_vcode()
timestamp = round(time.time())
headers = {"Content-Type": "application/json;charset=UTF-8"}
data = {"title": "【远东资信】{}".format("登录验证码"),
"sender": 'fecribd@fecr.com.cn',
"recipients": [email],
"msg_body": "{}的验证码为 【{}5分钟内有效。".format("登录", vcode)}
requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data))
db = MongoHelper("tfse_v0.21")
db.upsert_single_data(
"管理端",
"邮箱验证码记录",
{"email": email},
{"vcode": vcode, "timestamp": timestamp}
)
return True
def __main__():
res = send_email()
if res is not True:
return res
return True
return __main__()
@staticmethod
def create_user_impl(email, name, pwd, role):
"""
@ -137,16 +176,6 @@ class UserImpl:
"""
参数检查
"""
roles = ['admin', 'developer', 'analysts', 'manager', 'guest']
if role not in roles:
return "用户角色异常"
if not check_mail_fmt(email):
return "邮箱格式错误"
if not check_pwd_fmt(pwd):
return "密码格式错误"
email_is_existed = db.find_single_column(
"管理端",
"用户",
@ -205,14 +234,15 @@ class UserImpl:
if check_param_result is not True:
return check_param_result
user = dict()
user['UID'] = gen_new_uid()
user['email'] = email
user['name'] = name
user['pwd'] = generate_password_hash(decrypt_data(encrypt_msg=pwd))
user['status'] = 'normal'
user['role'] = role
user['create_time'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
user = {
'UID': gen_new_uid(),
'email': email,
'name': name,
'pwd': generate_password_hash(decrypt_data(encrypt_msg=pwd)),
'status': 'normal',
'role': role,
'create_time': time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
}
db.upsert_single_data(
"管理端",
@ -225,6 +255,9 @@ class UserImpl:
return __main__()
class UserOperateImpl:
@staticmethod
def login_impl(email, pwd, vcode):
db = MongoHelper("tfse_v0.21")
@ -333,54 +366,6 @@ class UserImpl:
return __main__()
@staticmethod
def send_vcode_to_user_impl(email):
def check_param():
if not check_mail_fmt(email):
return "邮箱格式错误"
return True
def gen_vcode():
choices = '0123456789'
salt = ''
for i in range(6):
salt += random.choice(choices)
return salt
def send_email():
email_api = 'http://116.63.130.34:30001'
vcode = gen_vcode()
timestamp = round(time.time())
headers = {"Content-Type": "application/json;charset=UTF-8"}
data = {"title": "【远东资信】{}".format("登录验证码"),
"sender": 'fecribd@fecr.com.cn',
"recipients": [email],
"msg_body": "{}的验证码为 【{}5分钟内有效。".format("登录", vcode)}
requests.post(url=email_api + '/send_mail', headers=headers, data=json.dumps(data))
db = MongoHelper("tfse_v0.21")
db.upsert_single_data(
"管理端",
"邮箱验证码记录",
{"email": email},
{"vcode": vcode, "timestamp": timestamp}
)
return True
def __main__():
res = check_param()
if res is not True:
return res
res = send_email()
if res is not True:
return res
return True
return __main__()
@staticmethod
def disable_user_impl(uid):
"""

View File

@ -1,16 +1,19 @@
from user.user_impl import UserImpl
import re
from Utils.ErrorUtil import ObjColumnCheckError
from user.user_impl import UserManageImpl, UserOperateImpl
from user.user_utils import decrypt_data
class User:
"""
用户对象
Parameters:
uid str 用户ID
email str 邮箱
name str 用户名
pwd str 密码
role str 角色
duty str 职责
uid: str 用户ID
email: str 邮箱
name: str 用户名
pwd: str 密码
status: str 用户状态
role: str 角色
"""
def __init__(self):
@ -18,8 +21,61 @@ class User:
self.email = None
self.name = None
self.pwd = None
self.status = None
self.role = None
def check_uid(self):
"""用户ID校验"""
if type(self.uid) is not str:
raise ObjColumnCheckError("用户ID格式错误")
if len(self.uid) != 8:
raise ObjColumnCheckError("用户ID格式错误")
def check_email(self):
"""邮箱格式仅允许@fecr.com.cn"""
regex = "^.+\\@fecr.com.cn"
case = (len(self.email) > 7) and (re.match(regex, self.email) is not None)
result = True if case else False
if not result:
raise ObjColumnCheckError("邮箱格式错误")
def check_name(self):
"""用户名格式校验"""
if type(self.name) is not str:
raise ObjColumnCheckError("用户名格式错误")
def check_pwd(self):
"""密码格式校验"""
if type(self.pwd) is not str:
raise ObjColumnCheckError("密码格式错误")
password = decrypt_data(encrypt_msg=self.pwd)
if not password:
raise ObjColumnCheckError("密码格式错误")
regex = "^(?![A-Za-z0-9]+$)(?![a-z0-9\\W]+$)(?![A-Za-z\\W]+$)(?![A-Z0-9\\W]+$)^.{8,}$"
case = (len(password) >= 8) and (re.match(regex, password) is not None)
result = True if case else False
if not result:
raise ObjColumnCheckError("密码格式错误")
def check_status(self):
"""用户状态校验"""
if self.status not in ['normal', 'disable']:
raise ObjColumnCheckError("用户状态格式错误")
def check_role(self):
"""用户角色校验"""
if self.role not in ['admin', 'analysts', 'developer', 'operator', 'guest']:
raise ObjColumnCheckError("用户角色格式错误")
class UserManage(User):
"""
@ -31,7 +87,7 @@ class UserManage(User):
"""
用户信息列表
"""
return UserImpl.list_user_impl(search, sort, page_size, page_no)
return UserManageImpl.list_user_impl(search, sort, page_size, page_no)
def send_vcode_to_user(self):
"""
@ -40,7 +96,8 @@ class UserManage(User):
发送成功 bool 返回True
发送失败 str 返回异常信息
"""
return UserImpl.send_vcode_to_user_impl(self.email)
self.check_email()
return UserManageImpl.send_vcode_to_user_impl(self.email)
def create_user(self):
"""
@ -49,7 +106,7 @@ class UserManage(User):
创建成功 bool 返回True
创建失败 str 返回异常信息
"""
return UserImpl.create_user_impl(self.email, self.name, self.pwd, self.role)
return UserManageImpl.create_user_impl(self.email, self.name, self.pwd, self.role)
def disable_user(self):
"""
@ -86,16 +143,22 @@ class UserOperation(User):
"""
用户登录
"""
return UserImpl.login_impl(self.email, self.pwd, self.vcode)
return UserOperateImpl.login_impl(self.email, self.pwd, self.vcode)
# 停用
def deactivate(self):
return UserImpl.disable_user_impl(self.uid)
return UserOperateImpl.disable_user_impl(self.uid)
# 删除
def delete(self):
return UserImpl.delete_user_impl(self.uid)
return UserOperateImpl.delete_user_impl(self.uid)
# 管理角色
def manage(self):
return UserImpl.manage_role_of_user_impl(self.uid, self.role)
return UserOperateImpl.manage_role_of_user_impl(self.uid, self.role)
if __name__ == '__main__':
user = User()
user.email = "aaa"
user.check_email()

View File

@ -1,6 +1,6 @@
from flask import Blueprint, request
from Log.log_utils import error_log
from Utils.LogUtil import error_log
from user.user_obj import UserManage, UserOperation
from user.user_auth import verify_token, authority_scope
@ -36,17 +36,19 @@ def list_user_route(**kwargs):
@user_route.route('/send_vcode_to_user', methods=['POST'])
@error_log
def send_vcode_to_user_route():
"""
发送验证码
"""
um = UserManage()
um.email = request.json['email']
res = um.send_vcode_to_user()
if res is True:
req = request.json
user_manage = UserManage()
user_manage.email = req['email']
result = user_manage.send_vcode_to_user()
if result is True:
return {"info": "验证邮件已发送"}, 200
else:
return {"info": res}, 200
return {"info": result}, 200
@user_route.route('/login', methods=['POST'])