import json import time from fastapi import HTTPException from pydantic import BaseModel import requests as rq from Utils.DataBase.RedisUtils import redis_pool import base64 APP_ID = 'XINYONGPINGJI001' SECRET_KEY = 'ullamco' TOKEN_CHECK_URL = "http://127.0.0.1:8001/api/app/check_token" class BaseAppTokenData(BaseModel): user_token: str app_id: str secret_key: str # def decode_token(token: str): # try: # payload = jwt.decode(token, Config.SECRET_KEY, algorithms=[Config.ALGORITHM]) # except jwt.exceptions.ExpiredSignatureError: # raise HTTPException(status_code=401, detail="Token Has Expired") # except PyJWTError: # raise HTTPException(status_code=401, detail="Invalid Token") # return payload def base64url_decode(base64_str): size = len(base64_str) % 4 if size == 2: base64_str += '==' elif size == 3: base64_str += '=' elif size != 0: raise ValueError('Invalid base64 string') res = base64.urlsafe_b64decode(base64_str.encode('utf-8')) data = json.loads(res) return data def decode_token(token: str): data = redis_pool.conn.get(token) if data is None: if check_token(token): redis_pool.conn.set(token, '') token_data = base64url_decode(token.split('.')[1]) exp = token_data.get('exp') if exp: redis_pool.conn.expire(token, exp - int(time.time())) else: raise HTTPException(status_code=401, detail="Invalid Token") else: token_data = base64url_decode(token.split('.')[1]) print(token_data, "token_data") token_data['user_info'] = {'uid': token_data.get('uid'), 'name': token_data.get('name'), 'role': token_data.get('role')} return token_data def check_token(token: str): res = rq.post(url=TOKEN_CHECK_URL, json={'user_token': token, 'app_id': APP_ID, 'secret_key': SECRET_KEY}).json() if res and res.get('state'): return True