import functools import time from flask import request from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, BadSignature, SignatureExpired from common.rsa import decrypt_data from user.user_db import FIND_FID_IN_RATING_RECORD def create_token(param): """ 创建token Parameters: param: 传入参数,用于创建token Returns: token: 用户访问令牌 """ secret_key = '0FTuOi^#Afx1@2@F' token_expire = 14400 s = Serializer(secret_key, expires_in=token_expire) token = '' + s.dumps(param).decode('ascii') return token def verify_token(func): """ 校验token """ secret_key = '0FTuOi^#Afx1@2@F' @functools.wraps(func) def internal(*args, **kwargs): # 检查token try: token = request.headers.get('token') s = Serializer(secret_key) cid = s.loads(token)['cid'] except TypeError: return {"info": "参数错误"}, 401 except KeyError: return {"info": "参数错误"}, 401 except BadSignature: return {"info": "token错误"}, 401 except SignatureExpired: return {"info": "token过期"}, 401 # 通过以上检查 返回原函数 return func(*args, **kwargs, cid=cid) # 返回包装函数结果 return internal def check_block(func): """ 检查滑块是否通过 注: 前端发送的滑块校验码是rsa加密的时间戳,若时间戳间隔小于3秒则校验通过 """ @functools.wraps(func) def internal(*args, **kwargs): try: verify_code = decrypt_data(encrypt_msg=request.headers.get('block')) if (time.time() - float(verify_code)/1000) > 3: return {"info": "滑块校验失败"}, 400 except TypeError: return {"info": "缺少滑块验证"}, 400 except ValueError: return {"info": "滑块验证异常"}, 400 return func(*args, **kwargs) return internal def verify_report_view_auth(func): """ 检查是否具有报告查看权限 """ @functools.wraps(func) def internal(*args, **kwargs): file_id = request.args.get('file_id') records = FIND_FID_IN_RATING_RECORD(kwargs['cid']) if not records: return {"info": "没有找到文件"}, 404 file_id_pool = list() for record in records: for a_file_id in list(record.values()): file_id_pool.append(a_file_id) if file_id not in file_id_pool: return {"info": "没有文件访问权限"}, 401 return func(*args, **kwargs) return internal