import os import re import base64 import functools from Crypto.PublicKey import RSA from Crypto.Cipher import PKCS1_v1_5 as PKCS1_cipher from flask import request from itsdangerous import Serializer from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, SignatureExpired, BadSignature from user.user_db import find_data TOKEN_KEY = "P0eHym@&CbaLzWkq" def check_mail_fmt(email): """ 邮箱地址格式校验,仅允许 Parameters: email: 邮箱 Returns: result: 邮箱校验结果,正确返回True,不正确返回False """ # regex = "^.+\\@(\\[?)[a-zA-Z0-9\\-\\.]+\\.([a-zA-Z]{2,3}|[0-9]{1,3})(\\]?)$" regex = "^.+\\" case = (len(email) > 7) and (re.match(regex, email) is not None) result = True if case else False return result def check_pwd_fmt(pwd): """ 密码强度校验 Parameters: pwd: 密码(已加密) Returns: result: 密码强度校验结果,正确返回True,不正确返回False """ password = decrypt_data(encrypt_msg=pwd) if not password: return False regex = "^(?![A-Za-z0-9]+$)(?![a-z0-9\\W]+$)(?![A-Za-z\\W]+$)(?![A-Z0-9\\W]+$)^.{8,}$" case = (len(password) >= 8) and (re.match(regex, password) is not None) result = True if case else False return result def create_token(param): """ 创建token Parameters: param: 传入参数,用于创建token Returns: token: 用户访问令牌 """ token_expiration = 14400 s = Serializer(TOKEN_KEY, expires_in=token_expiration) token = '' + s.dumps(param).decode('ascii') return token def verify_token(func): """ 校验token Returns: 返回token被解析后的值 """ @functools.wraps(func) def internal(*args, **kwargs): try: s = Serializer(TOKEN_KEY) session_id = s.loads(request.headers.get('token')) records = find_data("tfse_admin", "用户", "token记录", {"session_id": session_id}) if not records: return {"info": "提示: 账号已在别处登录"}, 401 uid = records[0]['UID'] except TypeError: return {"info": "验证参数错误"}, 401 except KeyError: return {"info": "验证参数错误"}, 401 except BadSignature: return {"info": "token错误"}, 401 except SignatureExpired: return {"info": "token过期"}, 401 return func(*args, **kwargs, uid=uid) return internal def authority_scope(scope): def decorate(func): def internal(*args, ** kwargs): records = find_data("tfse_admin", "用户", "用户信息", {"UID": kwargs['uid']}) if not records: return {"info": "提示: 不存在该用户"}, 401 if records[0]['status'] != "normal": return {"info": "提示: 该账户已被禁用"}, 401 if records[0]['role'] not in scope: return {"info": "提示: 该账户没有权限"}, 401 return func(*args, ** kwargs) return internal return decorate def encrypt_data(**kwargs): """ 用公钥加密 Parameters: msg str 待加密信息 Returns: 加密后结果 """ msg = kwargs['msg'] # 待加密信息 with open(os.path.abspath(os.path.dirname(__file__)+'/static/rsa_public_key.pem')) as f: data = public_key = RSA.importKey(data) cipher = encrypt_text = base64.b64encode(cipher.encrypt(bytes(msg.encode("utf8")))) return encrypt_text.decode('utf-8') def decrypt_data(**kwargs): """ 用私钥解密 Parameters: encrypt_msg str 加密信息 Returns: 执行正确 解密后结果 执行错误 False """ try: encrypt_msg = kwargs['encrypt_msg'] # 加密信息 with open(os.path.abspath(os.path.dirname(__file__)+'/static/rsa_private_key.pem')) as f: data = private_key = RSA.importKey(data) cipher = back_text = cipher.decrypt(base64.b64decode(encrypt_msg), 0) return back_text.decode('utf-8') except Exception: return False