tfse-admin-api-v0.2/user/user_utils.py

160 lines
4.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import re
import base64
import functools
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_v1_5 as PKCS1_cipher
from flask import request
from itsdangerous import Serializer
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, SignatureExpired, BadSignature
from user.user_db import FIND_DATA
TOKEN_KEY = "P0eHym@&CbaLzWkq"
def encrypt_data(**kwargs):
"""
用公钥加密
Parameters:
msg str 待加密信息
Returns:
加密后结果
"""
msg = kwargs['msg'] # 待加密信息
with open(os.path.abspath(os.path.dirname(__file__)+'/static/rsa_public_key.pem')) as f:
data = f.read()
public_key = RSA.importKey(data)
cipher = PKCS1_cipher.new(public_key)
encrypt_text = base64.b64encode(cipher.encrypt(bytes(msg.encode("utf8"))))
return encrypt_text.decode('utf-8')
def decrypt_data(**kwargs):
"""
用私钥解密
Parameters:
encrypt_msg str 加密信息
Returns:
执行正确 解密后结果
执行错误 False
"""
try:
encrypt_msg = kwargs['encrypt_msg'] # 加密信息
with open(os.path.abspath(os.path.dirname(__file__)+'/static/rsa_private_key.pem')) as f:
data = f.read()
private_key = RSA.importKey(data)
cipher = PKCS1_cipher.new(private_key)
back_text = cipher.decrypt(base64.b64decode(encrypt_msg), 0)
return back_text.decode('utf-8')
except Exception:
return False
def check_mail_fmt(email):
"""
邮箱地址格式校验,仅允许@fecr.com.cn
Parameters:
email: 邮箱
Returns:
result: 邮箱校验结果正确返回True不正确返回False
"""
# regex = "^.+\\@(\\[?)[a-zA-Z0-9\\-\\.]+\\.([a-zA-Z]{2,3}|[0-9]{1,3})(\\]?)$"
regex = "^.+\\@fecr.com.cn"
case = (len(email) > 7) and (re.match(regex, email) is not None)
result = True if case else False
return result
def check_pwd_fmt(pwd):
"""
密码强度校验
Parameters:
pwd: 密码(已加密)
Returns:
result: 密码强度校验结果正确返回True不正确返回False
"""
password = decrypt_data(encrypt_msg=pwd)
if not password:
return False
regex = "^(?![A-Za-z0-9]+$)(?![a-z0-9\\W]+$)(?![A-Za-z\\W]+$)(?![A-Z0-9\\W]+$)^.{8,}$"
case = (len(password) >= 8) and (re.match(regex, password) is not None)
result = True if case else False
return result
def create_token(param):
"""
创建token
Parameters:
param: 传入参数用于创建token
Returns:
token: 用户访问令牌
"""
token_expiration = 14400
s = Serializer(TOKEN_KEY, expires_in=token_expiration)
token = '' + s.dumps(param).decode('ascii')
return token
def verify_token(func):
"""
校验token
Returns:
返回token被解析后的值
"""
@functools.wraps(func)
def internal(*args, **kwargs):
try:
s = Serializer(TOKEN_KEY)
session_id = s.loads(request.headers.get('token'))
records = FIND_DATA("用户", "token记录", {"session_id": session_id})
if not records:
return {"info": "提示: 账号已在别处登录"}, 401
uid = records[0]['UID']
except TypeError:
return {"info": "验证参数错误"}, 401
except KeyError:
return {"info": "验证参数错误"}, 401
except BadSignature:
return {"info": "token错误"}, 401
except SignatureExpired:
return {"info": "token过期"}, 401
return func(*args, **kwargs, uid=uid)
return internal
def authority_scope(scope):
def decorate(func):
@functools.wraps(func)
def internal(*args, ** kwargs):
records = FIND_DATA("用户", "用户信息", {"UID": kwargs['uid']})
if not records:
return {"info": "提示: 不存在该用户"}, 401
if records[0]['status'] != "normal":
return {"info": "提示: 该账户已被禁用"}, 401
if records[0]['role'] not in scope:
return {"info": "提示: 该账户没有权限"}, 401
return func(*args, ** kwargs)
return internal
return decorate