132 lines
4.0 KiB
Python
132 lines
4.0 KiB
Python
import json
|
||
import os
|
||
import random
|
||
import re
|
||
import base64
|
||
|
||
from Crypto.PublicKey import RSA
|
||
from Crypto.Cipher import PKCS1_v1_5 as PKCS1_cipher
|
||
|
||
from DBHelper.MongoHelperInstance import DB_GUA
|
||
|
||
|
||
class UserUtils(object):
|
||
|
||
@staticmethod
|
||
def encrypt_data(**kwargs):
|
||
"""
|
||
用公钥加密
|
||
Parameters: msg str 待加密信息
|
||
Returns: 加密后结果
|
||
"""
|
||
msg = kwargs['msg'] # 待加密信息
|
||
|
||
with open(os.path.abspath(os.path.dirname(__file__)+'/static/rsa_public_key.pem')) as f:
|
||
data = f.read()
|
||
public_key = RSA.importKey(data)
|
||
|
||
cipher = PKCS1_cipher.new(public_key)
|
||
encrypt_text = base64.b64encode(cipher.encrypt(bytes(msg.encode("utf8"))))
|
||
return encrypt_text.decode('utf-8')
|
||
|
||
@staticmethod
|
||
def decrypt_data(**kwargs):
|
||
"""
|
||
用私钥解密
|
||
Parameters: encrypt_msg str 加密信息
|
||
Returns: 执行正确 解密后结果 执行错误 False
|
||
"""
|
||
try:
|
||
encrypt_msg = kwargs['encrypt_msg'] # 加密信息
|
||
|
||
with open(os.path.abspath(os.path.dirname(__file__)+'/static/rsa_private_key.pem')) as f:
|
||
data = f.read()
|
||
private_key = RSA.importKey(data)
|
||
|
||
cipher = PKCS1_cipher.new(private_key)
|
||
back_text = cipher.decrypt(base64.b64decode(encrypt_msg), 0)
|
||
return back_text.decode('utf-8')
|
||
except Exception:
|
||
return False
|
||
|
||
@staticmethod
|
||
def user_enum(obj, field):
|
||
"""获取枚举属性范围"""
|
||
|
||
with open(os.path.abspath(os.path.dirname(__file__)+'/static/EnumDataSetting.json')) as f:
|
||
return json.loads(f.read())[obj][field]
|
||
|
||
@staticmethod
|
||
def check_mail_fmt(email):
|
||
"""
|
||
邮箱地址格式校验,仅允许@fecr.com.cn
|
||
Parameters: email: 邮箱
|
||
Returns: result: 邮箱校验结果,正确返回True,不正确返回False
|
||
"""
|
||
regex = "^.+\\@fecr.com.cn"
|
||
case = (len(email) > 7) and (re.match(regex, email) is not None)
|
||
result = True if case else False
|
||
assert result, "未满足@fecr.com.cn邮箱格式"
|
||
|
||
@staticmethod
|
||
def check_pwd_fmt(pwd):
|
||
"""
|
||
密码强度校验
|
||
Parameters: pwd: 密码(已加密)
|
||
Returns: result: 密码强度校验结果,正确返回True,不正确返回False
|
||
"""
|
||
password = UserUtils.decrypt_data(encrypt_msg=pwd)
|
||
assert not password, "密码强度校验失败"
|
||
|
||
regex = "^(?![A-Za-z0-9]+$)(?![a-z0-9\\W]+$)(?![A-Za-z\\W]+$)(?![A-Z0-9\\W]+$)^.{8,}$"
|
||
case = (len(password) >= 8) and (re.match(regex, password) is not None)
|
||
assert not case, "密码强度不够"
|
||
|
||
@staticmethod
|
||
def check_email_registered(email):
|
||
"""检查邮箱是否已注册"""
|
||
|
||
email_is_existed = DB_GUA.find_single_column(
|
||
"管理端",
|
||
"用户",
|
||
{"邮箱": email},
|
||
"邮箱"
|
||
)
|
||
|
||
assert not email_is_existed, "邮箱已被注册"
|
||
|
||
@staticmethod
|
||
def generate_new_uid():
|
||
"""生成新的用户ID"""
|
||
|
||
def uid_maker(num):
|
||
"""
|
||
用户ID生成器
|
||
num: ID长度 int
|
||
"""
|
||
|
||
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
|
||
salt = ''
|
||
for i in range(num):
|
||
salt += random.choice(choices)
|
||
return salt
|
||
|
||
def uid_checker(__uid):
|
||
"""
|
||
用户ID重复检查
|
||
__uid: 用户ID str
|
||
return: True 用户ID可用 False 用户ID不可用
|
||
"""
|
||
is_uid_existed = DB_GUA.find_single_column(
|
||
"管理端",
|
||
"用户",
|
||
{"UID": __uid},
|
||
"UID"
|
||
)
|
||
return True if is_uid_existed is None else False
|
||
|
||
uid = uid_maker(8)
|
||
while not uid_checker(uid):
|
||
uid = uid_maker(8)
|
||
return uid
|