tfse-app-api-v0.2/Utils/AuthUtil.py

129 lines
3.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import functools
import time
from flask import request
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, BadSignature, SignatureExpired
from DBHelper.MongoHelper import MongoHelper
from common.rsa import decrypt_data
def create_token(param):
"""
创建token
Parameters:
param: 传入参数用于创建token
Returns:
token: 用户访问令牌
"""
secret_key = '0FTuOi^#Afx1@2@F'
token_expire = 14400
s = Serializer(secret_key, expires_in=token_expire)
token = '' + s.dumps(param).decode('ascii')
return token
def verify_token(func):
"""
校验token
"""
secret_key = '0FTuOi^#Afx1@2@F'
@functools.wraps(func)
def internal(*args, **kwargs):
# 检查token
try:
token = request.headers.get('token')
s = Serializer(secret_key)
cid = s.loads(token)['cid']
except TypeError:
return {"info": "参数错误"}, 401
except KeyError:
return {"info": "参数错误"}, 401
except BadSignature:
return {"info": "token错误"}, 401
except SignatureExpired:
return {"info": "token过期"}, 401
# 通过以上检查 返回原函数
return func(*args, **kwargs, cid=cid)
# 返回包装函数结果
return internal
def check_block(func):
"""
检查滑块是否通过
注: 前端发送的滑块校验码是rsa加密的时间戳若时间戳间隔小于3秒则校验通过
"""
@functools.wraps(func)
def internal(*args, **kwargs):
try:
verify_code = decrypt_data(encrypt_msg=request.headers.get('block'))
if (time.time() - float(verify_code)/1000) > 3:
return {"info": "滑块校验失败"}, 400
except TypeError:
return {"info": "缺少滑块验证"}, 400
except ValueError:
return {"info": "滑块验证异常"}, 400
return func(*args, **kwargs)
return internal
def verify_report_view_auth(func):
"""
检查是否具有报告查看权限
"""
db = MongoHelper("tfse_v0.21")
@functools.wraps(func)
def internal(*args, **kwargs):
file_id = request.args.get('file_id')
records = db.find_all_data(
"企业数据",
"评级记录",
{"企业ID": kwargs['cid']},
["报告fid", "证书fid"]
)
if not records:
return {"info": "没有找到文件"}, 200
file_id_pool = list()
for record in records:
for a_file_id in list(record.values()):
file_id_pool.append(a_file_id)
file_id_pool = list(filter(None, file_id_pool))
if file_id not in file_id_pool:
return {"info": "文件不存在或无访问权限"}, 200
return func(*args, **kwargs)
return internal
def verify_real_company(func):
db = MongoHelper("tfse_v0.21")
@functools.wraps(func)
def internal(*args, **kwargs):
verify_status = db.find_single_column(
"应用端",
"企业用户",
{"企业ID": kwargs['cid']},
"已认证"
)
if verify_status == "":
return func(*args, **kwargs)
else:
return {"info": "该企业未认证"}, 200
return internal