guarantee-admin-api-v0.2/Modules/AdminUser/UserUtils.py

184 lines
5.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import random
import re
import base64
import time
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5 as PKCS1_cipher
from werkzeug.security import check_password_hash
from DBHelper.MongoHelperInstance import DB_GUA
from Modules.AdminUser.UserAuthUtils import create_token
class UserUtils(object):
@staticmethod
def encrypt_data(**kwargs):
"""
用公钥加密
Parameters: msg str 待加密信息
Returns: 加密后结果
"""
msg = kwargs['msg'] # 待加密信息
with open(os.path.abspath(os.path.dirname(__file__) + '/static/rsa_public_key.pem')) as f:
data = f.read()
public_key = RSA.importKey(data)
cipher = PKCS1_cipher.new(public_key)
encrypt_text = base64.b64encode(cipher.encrypt(bytes(msg.encode("utf8"))))
return encrypt_text.decode('utf-8')
@staticmethod
def decrypt_data(**kwargs):
"""
用私钥解密
Parameters: encrypt_msg str 加密信息
Returns: 执行正确 解密后结果 执行错误 False
"""
try:
encrypt_msg = kwargs['encrypt_msg'] # 加密信息
with open(os.path.abspath(os.path.dirname(__file__) + '/static/rsa_private_key.pem')) as f:
data = f.read()
private_key = RSA.importKey(data)
cipher = PKCS1_cipher.new(private_key)
back_text = cipher.decrypt(base64.b64decode(encrypt_msg), 0)
return back_text.decode('utf-8')
except Exception:
return False
@staticmethod
def user_enum(obj, field):
"""获取枚举属性范围"""
with open(os.path.abspath(os.path.dirname(__file__) + '/static/EnumDataSetting.json')) as f:
return json.loads(f.read())[obj][field]
@staticmethod
def check_mail_fmt(email):
"""
邮箱地址格式校验,仅允许@fecr.com.cn
Parameters: email: 邮箱
Returns: result: 邮箱校验结果正确返回True不正确返回False
"""
regex = "^.+\\@fecr.com.cn"
case = (len(email) > 7) and (re.match(regex, email) is not None)
result = True if case else False
assert result, "未满足@fecr.com.cn邮箱格式"
@staticmethod
def check_pwd_fmt(pwd):
"""
密码强度校验
Parameters: pwd: 密码(已加密)
Returns: result: 密码强度校验结果正确返回True不正确返回False
"""
password = UserUtils.decrypt_data(encrypt_msg=pwd)
assert not password, "密码强度校验失败"
regex = "^(?![A-Za-z0-9]+$)(?![a-z0-9\\W]+$)(?![A-Za-z\\W]+$)(?![A-Z0-9\\W]+$)^.{8,}$"
case = (len(password) >= 8) and (re.match(regex, password) is not None)
assert not case, "密码强度不够"
@staticmethod
def check_email_registered(email):
"""检查邮箱是否已注册"""
email_is_existed = DB_GUA.find_single_column(
"管理端",
"用户",
{"邮箱": email},
"邮箱"
)
assert not email_is_existed, "邮箱已被注册"
@staticmethod
def generate_new_uid():
"""生成新的用户ID"""
def uid_maker(num):
"""
用户ID生成器
num: ID长度 int
"""
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
salt = ''
for i in range(num):
salt += random.choice(choices)
return salt
def uid_checker(__uid):
"""
用户ID重复检查
__uid: 用户ID str
return: True 用户ID可用 False 用户ID不可用
"""
is_uid_existed = DB_GUA.find_single_column(
"管理端",
"用户",
{"UID": __uid},
"UID"
)
return True if is_uid_existed is None else False
uid = uid_maker(8)
while not uid_checker(uid):
uid = uid_maker(8)
return uid
@staticmethod
def check_vcode_is_correct(email, vcode):
record = DB_GUA.find_single_data(
"管理端",
"邮箱验证码记录",
{"email": email},
["vcode", "timestamp"]
)
assert record, "验证码不存在"
assert not record['vcode'] != vcode, "验证码错误"
assert not time.time() - record['timestamp'] > 300, "验证码过期"
DB_GUA.delete_single_data(
"管理端",
"邮箱验证码记录",
{"email": email}
)
def check_pwd_is_correct(self, send_pwd, base_pwd):
try:
assert not check_password_hash(base_pwd, self.decrypt_data(encrypt_msg=send_pwd)), "密码错误"
except Exception:
assert False, "密码错误"
@staticmethod
def make_menus(role):
with open(os.path.abspath(os.path.dirname(__file__) + '/static/menus.json'), "r", encoding='utf-8') as f:
duties = json.load(f)
return duties[role]
@staticmethod
def make_token(uid):
choices = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'
session_id = ''
for i in range(4):
session_id += random.choice(choices)
DB_GUA.upsert_single_data(
"管理端",
"token记录",
{"用户ID": uid},
{"会话ID": session_id}
)
return create_token(session_id)