import functools from flask import request from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, SignatureExpired, BadSignature from common.db import find_data TOKEN_KEY = "P0eHym@&CbaLzWkq" def api_secret(func): """ 校验接口请求密钥 """ secret = "EZgo9ykxrYuBMYnYmmKIh" # 接口密钥 @functools.wraps(func) def internal(*args, **kwargs): try: token = request.headers.get('secret') if token != secret: return {"info": "接口密钥错误"}, 401 except Exception: return {"info": "请求异常"}, 401 return func(*args, **kwargs) return internal def api_verification_code(func): """ 检查验证码 """ v_client = "tfse_admin" # 校验数据库服务 v_database = "用户" # 校验数据库名称 v_collection = "验证记录" # 校验数据表名称 @functools.wraps(func) def internal(*args, **kwargs): try: email = request.json['email'] vcode = request.json['vcode'] res = find_data(v_client, v_database, v_collection, {"email": email}) if len(res) == 0: return {"info": "验证码错误"}, 401 if res[0]['vcode'] == vcode: pass else: return {"info": "验证码错误"}, 401 except Exception: return {"info": "请求异常"}, 401 return func(*args, **kwargs) return internal def verify_token(func): """ 校验token Returns: 返回token被解析后的值 """ @functools.wraps(func) def internal(*args, **kwargs): try: s = Serializer(TOKEN_KEY) token = s.loads(request.headers.get('token')) except TypeError: return {"info": "参数错误"}, 401 except KeyError: return {"info": "参数错误"}, 401 except BadSignature: return {"info": "token错误"}, 401 except SignatureExpired: return {"info": "token过期"}, 401 return func(*args, **kwargs, token=token) return internal def create_token(param): """ 创建token Parameters: param: 传入参数,用于创建token Returns: token: 用户访问令牌 """ token_expiration = 14400 s = Serializer(TOKEN_KEY, expires_in=token_expiration) token = '' + s.dumps(param).decode('ascii') return token