tfse-admin-api-v0.2/Utils/AuthUtil.py

127 lines
3.3 KiB
Python
Raw Permalink Normal View History

2022-05-18 15:52:58 +08:00
import functools
import time
from flask import request
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, BadSignature, SignatureExpired
from DBHelper.MongoHelper import MongoHelper
from Utils.RSAUtil import decrypt_data
def create_token(param):
"""
创建token
Parameters:
param: 传入参数用于创建token
Returns:
token: 用户访问令牌
"""
secret_key = '0FTuOi^#Afx1@2@F'
token_expire = 14400
s = Serializer(secret_key, expires_in=token_expire)
token = '' + s.dumps(param).decode('ascii')
return token
def verify_token(func):
"""
校验token
"""
secret_key = '0FTuOi^#Afx1@2@F'
@functools.wraps(func)
def internal(*args, **kwargs):
# 检查token
try:
token = request.headers.get('token')
s = Serializer(secret_key)
cid = s.loads(token)['cid']
except TypeError:
return {"info": "参数错误"}, 401
except KeyError:
return {"info": "参数错误"}, 401
except BadSignature:
return {"info": "token错误"}, 401
except SignatureExpired:
return {"info": "token过期"}, 401
# 通过以上检查 返回原函数
return func(*args, **kwargs, cid=cid)
# 返回包装函数结果
return internal
def check_block(func):
"""
检查滑块是否通过
: 前端发送的滑块校验码是rsa加密的时间戳若时间戳间隔小于3秒则校验通过
"""
@functools.wraps(func)
def internal(*args, **kwargs):
try:
verify_code = decrypt_data(encrypt_msg=request.headers.get('block'))
if (time.time() - float(verify_code)/1000) > 3:
return {"info": "滑块校验失败"}, 400
except TypeError:
return {"info": "缺少滑块验证"}, 400
except ValueError:
return {"info": "滑块验证异常"}, 400
return func(*args, **kwargs)
return internal
def verify_report_view_auth(func):
"""检查是否具有报告查看权限"""
db = MongoHelper("tfse_v0.21")
@functools.wraps(func)
def internal(*args, **kwargs):
file_id = request.args.get('file_id')
records = db.find_all_data(
"企业数据",
"评价记录",
{"企业ID": kwargs['cid']},
["报告fid", "证书fid"]
)
if not records:
return {"info": "没有文件记录"}, 200
file_id_pool = list()
for record in records:
for a_file_id in list(record.values()):
file_id_pool.append(a_file_id)
file_id_pool = list(filter(None, file_id_pool))
if file_id not in file_id_pool:
return {"info": "文件ID异常"}, 200
return func(*args, **kwargs)
return internal
def verify_real_company(func):
db = MongoHelper("tfse_v0.21")
@functools.wraps(func)
def internal(*args, **kwargs):
verify_status = db.find_single_column(
"应用端",
"企业用户",
{"企业ID": kwargs['cid']},
"已认证"
)
if verify_status == "":
return func(*args, **kwargs)
else:
return {"info": "该企业未认证"}, 200
return internal