65 lines
2.0 KiB
Python
65 lines
2.0 KiB
Python
import json
|
|
import time
|
|
from fastapi import HTTPException
|
|
from pydantic import BaseModel
|
|
import requests as rq
|
|
from Utils.DataBase.RedisUtils import redis_pool
|
|
import base64
|
|
|
|
APP_ID = 'XINYONGPINGJI001'
|
|
SECRET_KEY = 'ullamco'
|
|
TOKEN_CHECK_URL = "http://127.0.0.1:8001/api/app/check_token"
|
|
|
|
|
|
class BaseAppTokenData(BaseModel):
|
|
user_token: str
|
|
app_id: str
|
|
secret_key: str
|
|
|
|
|
|
# def decode_token(token: str):
|
|
# try:
|
|
# payload = jwt.decode(token, Config.SECRET_KEY, algorithms=[Config.ALGORITHM])
|
|
# except jwt.exceptions.ExpiredSignatureError:
|
|
# raise HTTPException(status_code=401, detail="Token Has Expired")
|
|
# except PyJWTError:
|
|
# raise HTTPException(status_code=401, detail="Invalid Token")
|
|
# return payload
|
|
def base64url_decode(base64_str):
|
|
size = len(base64_str) % 4
|
|
if size == 2:
|
|
base64_str += '=='
|
|
elif size == 3:
|
|
base64_str += '='
|
|
elif size != 0:
|
|
raise ValueError('Invalid base64 string')
|
|
res = base64.urlsafe_b64decode(base64_str.encode('utf-8'))
|
|
data = json.loads(res)
|
|
return data
|
|
|
|
|
|
def decode_token(token: str):
|
|
data = redis_pool.conn.get(token)
|
|
if data is None:
|
|
if check_token(token):
|
|
redis_pool.conn.set(token, '')
|
|
token_data = base64url_decode(token.split('.')[1])
|
|
exp = token_data.get('exp')
|
|
if exp:
|
|
redis_pool.conn.expire(token, exp - int(time.time()))
|
|
else:
|
|
raise HTTPException(status_code=401, detail="Invalid Token")
|
|
else:
|
|
token_data = base64url_decode(token.split('.')[1])
|
|
print(token_data, "token_data")
|
|
token_data['user_info'] = {'uid': token_data.get('uid'), 'name': token_data.get('name'),
|
|
'role': token_data.get('role')}
|
|
return token_data
|
|
|
|
|
|
def check_token(token: str):
|
|
res = rq.post(url=TOKEN_CHECK_URL,
|
|
json={'user_token': token, 'app_id': APP_ID, 'secret_key': SECRET_KEY}).json()
|
|
if res and res.get('state'):
|
|
return True
|