import json import os import random import re import base64 from Crypto.PublicKey import RSA from Crypto.Cipher import PKCS1_v1_5 as PKCS1_cipher from DBHelper.MongoHelperInstance import DB_GUA class UserUtils(object): @staticmethod def encrypt_data(**kwargs): """ 用公钥加密 Parameters: msg str 待加密信息 Returns: 加密后结果 """ msg = kwargs['msg'] # 待加密信息 with open(os.path.abspath(os.path.dirname(__file__)+'/static/rsa_public_key.pem')) as f: data = f.read() public_key = RSA.importKey(data) cipher = PKCS1_cipher.new(public_key) encrypt_text = base64.b64encode(cipher.encrypt(bytes(msg.encode("utf8")))) return encrypt_text.decode('utf-8') @staticmethod def decrypt_data(**kwargs): """ 用私钥解密 Parameters: encrypt_msg str 加密信息 Returns: 执行正确 解密后结果 执行错误 False """ try: encrypt_msg = kwargs['encrypt_msg'] # 加密信息 with open(os.path.abspath(os.path.dirname(__file__)+'/static/rsa_private_key.pem')) as f: data = f.read() private_key = RSA.importKey(data) cipher = PKCS1_cipher.new(private_key) back_text = cipher.decrypt(base64.b64decode(encrypt_msg), 0) return back_text.decode('utf-8') except Exception: return False @staticmethod def user_enum(obj, field): """获取枚举属性范围""" with open(os.path.abspath(os.path.dirname(__file__)+'/static/EnumDataSetting.json')) as f: return json.loads(f.read())[obj][field] @staticmethod def check_mail_fmt(email): """ 邮箱地址格式校验,仅允许@fecr.com.cn Parameters: email: 邮箱 Returns: result: 邮箱校验结果,正确返回True,不正确返回False """ regex = "^.+\\@fecr.com.cn" case = (len(email) > 7) and (re.match(regex, email) is not None) result = True if case else False assert result, "未满足@fecr.com.cn邮箱格式" @staticmethod def check_pwd_fmt(pwd): """ 密码强度校验 Parameters: pwd: 密码(已加密) Returns: result: 密码强度校验结果,正确返回True,不正确返回False """ password = UserUtils.decrypt_data(encrypt_msg=pwd) assert not password, "密码强度校验失败" regex = "^(?![A-Za-z0-9]+$)(?![a-z0-9\\W]+$)(?![A-Za-z\\W]+$)(?![A-Z0-9\\W]+$)^.{8,}$" case = (len(password) >= 8) and (re.match(regex, password) is not None) assert not case, "密码强度不够" @staticmethod def check_email_registered(email): """检查邮箱是否已注册""" email_is_existed = DB_GUA.find_single_column( "管理端", "用户", {"邮箱": email}, "邮箱" ) assert not email_is_existed, "邮箱已被注册" @staticmethod def generate_new_uid(): """生成新的用户ID""" def uid_maker(num): """ 用户ID生成器 num: ID长度 int """ choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789' salt = '' for i in range(num): salt += random.choice(choices) return salt def uid_checker(__uid): """ 用户ID重复检查 __uid: 用户ID str return: True 用户ID可用 False 用户ID不可用 """ is_uid_existed = DB_GUA.find_single_column( "管理端", "用户", {"UID": __uid}, "UID" ) return True if is_uid_existed is None else False uid = uid_maker(8) while not uid_checker(uid): uid = uid_maker(8) return uid