import jwt from jwt import PyJWTError from datetime import datetime, timedelta from fastapi import HTTPException, status, Header from Utils.Authentication import Config def create_token(*, data: dict, expires_delta: timedelta = None): to_encode = dict() # 载入用户信息 user_info = data.copy() to_encode.update({"user_info": user_info}) # 设置过期时间 if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) # Token编码 encoded_jwt = jwt.encode(to_encode, Config.SECRET_KEY, algorithm=Config.ALGORITHM) return encoded_jwt def decode_token(token: str): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid Token", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, Config.SECRET_KEY, algorithms=[Config.ALGORITHM]) except PyJWTError: raise credentials_exception return payload async def get_token_header(authorization: str = Header(...)): """ 获取Token并验证 :param authorization: :return: uid """ token = authorization.split(' ')[-1] # 获取token openid = decode_token(token) # 验证token if not openid: raise HTTPException(status_code=400, detail="无效Token")