2022-05-25 14:41:58 +08:00
|
|
|
|
import json
|
|
|
|
|
import os
|
|
|
|
|
import random
|
|
|
|
|
import re
|
|
|
|
|
import base64
|
2022-05-25 15:45:31 +08:00
|
|
|
|
import time
|
2022-05-25 14:41:58 +08:00
|
|
|
|
|
|
|
|
|
from Crypto.PublicKey import RSA
|
|
|
|
|
from Crypto.Cipher import PKCS1_v1_5 as PKCS1_cipher
|
2022-05-25 15:45:31 +08:00
|
|
|
|
from werkzeug.security import check_password_hash
|
2022-05-25 14:41:58 +08:00
|
|
|
|
|
|
|
|
|
from DBHelper.MongoHelperInstance import DB_GUA
|
2022-05-25 15:45:31 +08:00
|
|
|
|
from Modules.AdminUser.UserAuthUtils import create_token
|
|
|
|
|
from Modules.AdminUser.UserUtils import decrypt_data
|
2022-05-25 14:41:58 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class UserUtils(object):
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def encrypt_data(**kwargs):
|
|
|
|
|
"""
|
|
|
|
|
用公钥加密
|
|
|
|
|
Parameters: msg str 待加密信息
|
|
|
|
|
Returns: 加密后结果
|
|
|
|
|
"""
|
|
|
|
|
msg = kwargs['msg'] # 待加密信息
|
|
|
|
|
|
|
|
|
|
with open(os.path.abspath(os.path.dirname(__file__)+'/static/rsa_public_key.pem')) as f:
|
|
|
|
|
data = f.read()
|
|
|
|
|
public_key = RSA.importKey(data)
|
|
|
|
|
|
|
|
|
|
cipher = PKCS1_cipher.new(public_key)
|
|
|
|
|
encrypt_text = base64.b64encode(cipher.encrypt(bytes(msg.encode("utf8"))))
|
|
|
|
|
return encrypt_text.decode('utf-8')
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def decrypt_data(**kwargs):
|
|
|
|
|
"""
|
|
|
|
|
用私钥解密
|
|
|
|
|
Parameters: encrypt_msg str 加密信息
|
|
|
|
|
Returns: 执行正确 解密后结果 执行错误 False
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
encrypt_msg = kwargs['encrypt_msg'] # 加密信息
|
|
|
|
|
|
|
|
|
|
with open(os.path.abspath(os.path.dirname(__file__)+'/static/rsa_private_key.pem')) as f:
|
|
|
|
|
data = f.read()
|
|
|
|
|
private_key = RSA.importKey(data)
|
|
|
|
|
|
|
|
|
|
cipher = PKCS1_cipher.new(private_key)
|
|
|
|
|
back_text = cipher.decrypt(base64.b64decode(encrypt_msg), 0)
|
|
|
|
|
return back_text.decode('utf-8')
|
|
|
|
|
except Exception:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def user_enum(obj, field):
|
|
|
|
|
"""获取枚举属性范围"""
|
|
|
|
|
|
|
|
|
|
with open(os.path.abspath(os.path.dirname(__file__)+'/static/EnumDataSetting.json')) as f:
|
|
|
|
|
return json.loads(f.read())[obj][field]
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def check_mail_fmt(email):
|
|
|
|
|
"""
|
|
|
|
|
邮箱地址格式校验,仅允许@fecr.com.cn
|
|
|
|
|
Parameters: email: 邮箱
|
|
|
|
|
Returns: result: 邮箱校验结果,正确返回True,不正确返回False
|
|
|
|
|
"""
|
|
|
|
|
regex = "^.+\\@fecr.com.cn"
|
|
|
|
|
case = (len(email) > 7) and (re.match(regex, email) is not None)
|
|
|
|
|
result = True if case else False
|
|
|
|
|
assert result, "未满足@fecr.com.cn邮箱格式"
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def check_pwd_fmt(pwd):
|
|
|
|
|
"""
|
|
|
|
|
密码强度校验
|
|
|
|
|
Parameters: pwd: 密码(已加密)
|
|
|
|
|
Returns: result: 密码强度校验结果,正确返回True,不正确返回False
|
|
|
|
|
"""
|
|
|
|
|
password = UserUtils.decrypt_data(encrypt_msg=pwd)
|
|
|
|
|
assert not password, "密码强度校验失败"
|
|
|
|
|
|
|
|
|
|
regex = "^(?![A-Za-z0-9]+$)(?![a-z0-9\\W]+$)(?![A-Za-z\\W]+$)(?![A-Z0-9\\W]+$)^.{8,}$"
|
|
|
|
|
case = (len(password) >= 8) and (re.match(regex, password) is not None)
|
|
|
|
|
assert not case, "密码强度不够"
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def check_email_registered(email):
|
|
|
|
|
"""检查邮箱是否已注册"""
|
|
|
|
|
|
|
|
|
|
email_is_existed = DB_GUA.find_single_column(
|
|
|
|
|
"管理端",
|
|
|
|
|
"用户",
|
|
|
|
|
{"邮箱": email},
|
|
|
|
|
"邮箱"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
assert not email_is_existed, "邮箱已被注册"
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def generate_new_uid():
|
|
|
|
|
"""生成新的用户ID"""
|
|
|
|
|
|
|
|
|
|
def uid_maker(num):
|
|
|
|
|
"""
|
|
|
|
|
用户ID生成器
|
|
|
|
|
num: ID长度 int
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
|
|
|
|
|
salt = ''
|
|
|
|
|
for i in range(num):
|
|
|
|
|
salt += random.choice(choices)
|
|
|
|
|
return salt
|
|
|
|
|
|
|
|
|
|
def uid_checker(__uid):
|
|
|
|
|
"""
|
|
|
|
|
用户ID重复检查
|
|
|
|
|
__uid: 用户ID str
|
|
|
|
|
return: True 用户ID可用 False 用户ID不可用
|
|
|
|
|
"""
|
|
|
|
|
is_uid_existed = DB_GUA.find_single_column(
|
|
|
|
|
"管理端",
|
|
|
|
|
"用户",
|
|
|
|
|
{"UID": __uid},
|
|
|
|
|
"UID"
|
|
|
|
|
)
|
|
|
|
|
return True if is_uid_existed is None else False
|
|
|
|
|
|
|
|
|
|
uid = uid_maker(8)
|
|
|
|
|
while not uid_checker(uid):
|
|
|
|
|
uid = uid_maker(8)
|
|
|
|
|
return uid
|
2022-05-25 15:45:31 +08:00
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def check_vcode_is_correct(email, vcode):
|
|
|
|
|
record = DB_GUA.find_single_data(
|
|
|
|
|
"管理端",
|
|
|
|
|
"邮箱验证码记录",
|
|
|
|
|
{"email": email},
|
|
|
|
|
["vcode", "timestamp"]
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
assert record, "验证码不存在"
|
|
|
|
|
|
|
|
|
|
assert not record['vcode'] != vcode, "验证码错误"
|
|
|
|
|
|
|
|
|
|
assert not time.time() - record['timestamp'] > 300, "验证码过期"
|
|
|
|
|
|
|
|
|
|
DB_GUA.delete_single_data(
|
|
|
|
|
"管理端",
|
|
|
|
|
"邮箱验证码记录",
|
|
|
|
|
{"email": email}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def check_pwd_is_correct(send_pwd, base_pwd):
|
|
|
|
|
try:
|
|
|
|
|
assert not check_password_hash(base_pwd, decrypt_data(encrypt_msg=send_pwd)), "密码错误"
|
|
|
|
|
except Exception:
|
|
|
|
|
assert False, "密码错误"
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def make_menus(role):
|
|
|
|
|
with open(os.path.abspath(os.path.dirname(__file__) + '/static/menus.json'), "r", encoding='utf-8') as f:
|
|
|
|
|
duties = json.load(f)
|
|
|
|
|
return duties[role]
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def make_token(uid):
|
|
|
|
|
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
|
|
|
|
|
session_id = ''
|
|
|
|
|
for i in range(4):
|
|
|
|
|
session_id += random.choice(choices)
|
|
|
|
|
|
|
|
|
|
DB_GUA.update_single_data(
|
|
|
|
|
"管理端",
|
|
|
|
|
"token记录",
|
|
|
|
|
{"用户ID": uid},
|
|
|
|
|
{"会话ID": session_id}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return create_token(session_id)
|