127 lines
3.3 KiB
Python
127 lines
3.3 KiB
Python
|
import functools
|
|||
|
import time
|
|||
|
|
|||
|
from flask import request
|
|||
|
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, BadSignature, SignatureExpired
|
|||
|
|
|||
|
from DBHelper.MongoHelper import MongoHelper
|
|||
|
from Utils.RSAUtil import decrypt_data
|
|||
|
|
|||
|
|
|||
|
def create_token(param):
|
|||
|
"""
|
|||
|
创建token
|
|||
|
Parameters:
|
|||
|
param: 传入参数,用于创建token
|
|||
|
Returns:
|
|||
|
token: 用户访问令牌
|
|||
|
"""
|
|||
|
secret_key = '0FTuOi^#Afx1@2@F'
|
|||
|
token_expire = 14400
|
|||
|
s = Serializer(secret_key, expires_in=token_expire)
|
|||
|
token = '' + s.dumps(param).decode('ascii')
|
|||
|
return token
|
|||
|
|
|||
|
|
|||
|
def verify_token(func):
|
|||
|
"""
|
|||
|
校验token
|
|||
|
"""
|
|||
|
secret_key = '0FTuOi^#Afx1@2@F'
|
|||
|
|
|||
|
@functools.wraps(func)
|
|||
|
def internal(*args, **kwargs):
|
|||
|
# 检查token
|
|||
|
try:
|
|||
|
token = request.headers.get('token')
|
|||
|
s = Serializer(secret_key)
|
|||
|
cid = s.loads(token)['cid']
|
|||
|
except TypeError:
|
|||
|
return {"info": "参数错误"}, 401
|
|||
|
except KeyError:
|
|||
|
return {"info": "参数错误"}, 401
|
|||
|
except BadSignature:
|
|||
|
return {"info": "token错误"}, 401
|
|||
|
except SignatureExpired:
|
|||
|
return {"info": "token过期"}, 401
|
|||
|
# 通过以上检查 返回原函数
|
|||
|
return func(*args, **kwargs, cid=cid)
|
|||
|
|
|||
|
# 返回包装函数结果
|
|||
|
return internal
|
|||
|
|
|||
|
|
|||
|
def check_block(func):
|
|||
|
"""
|
|||
|
检查滑块是否通过
|
|||
|
|
|||
|
注: 前端发送的滑块校验码是rsa加密的时间戳,若时间戳间隔小于3秒则校验通过
|
|||
|
"""
|
|||
|
|
|||
|
@functools.wraps(func)
|
|||
|
def internal(*args, **kwargs):
|
|||
|
try:
|
|||
|
verify_code = decrypt_data(encrypt_msg=request.headers.get('block'))
|
|||
|
if (time.time() - float(verify_code)/1000) > 3:
|
|||
|
return {"info": "滑块校验失败"}, 400
|
|||
|
except TypeError:
|
|||
|
return {"info": "缺少滑块验证"}, 400
|
|||
|
except ValueError:
|
|||
|
return {"info": "滑块验证异常"}, 400
|
|||
|
return func(*args, **kwargs)
|
|||
|
return internal
|
|||
|
|
|||
|
|
|||
|
def verify_report_view_auth(func):
|
|||
|
"""检查是否具有报告查看权限"""
|
|||
|
|
|||
|
db = MongoHelper("tfse_v0.21")
|
|||
|
|
|||
|
@functools.wraps(func)
|
|||
|
def internal(*args, **kwargs):
|
|||
|
|
|||
|
file_id = request.args.get('file_id')
|
|||
|
|
|||
|
records = db.find_all_data(
|
|||
|
"企业数据",
|
|||
|
"评价记录",
|
|||
|
{"企业ID": kwargs['cid']},
|
|||
|
["报告fid", "证书fid"]
|
|||
|
)
|
|||
|
|
|||
|
if not records:
|
|||
|
return {"info": "没有文件记录"}, 200
|
|||
|
|
|||
|
file_id_pool = list()
|
|||
|
for record in records:
|
|||
|
for a_file_id in list(record.values()):
|
|||
|
file_id_pool.append(a_file_id)
|
|||
|
|
|||
|
file_id_pool = list(filter(None, file_id_pool))
|
|||
|
|
|||
|
if file_id not in file_id_pool:
|
|||
|
return {"info": "文件ID异常"}, 200
|
|||
|
|
|||
|
return func(*args, **kwargs)
|
|||
|
return internal
|
|||
|
|
|||
|
|
|||
|
def verify_real_company(func):
|
|||
|
db = MongoHelper("tfse_v0.21")
|
|||
|
|
|||
|
@functools.wraps(func)
|
|||
|
def internal(*args, **kwargs):
|
|||
|
|
|||
|
verify_status = db.find_single_column(
|
|||
|
"应用端",
|
|||
|
"企业用户",
|
|||
|
{"企业ID": kwargs['cid']},
|
|||
|
"已认证"
|
|||
|
)
|
|||
|
|
|||
|
if verify_status == "是":
|
|||
|
return func(*args, **kwargs)
|
|||
|
else:
|
|||
|
return {"info": "该企业未认证"}, 200
|
|||
|
return internal
|