from typing import List from jose import jwt, JWTError from datetime import datetime, timedelta from fastapi import Header, Request, HTTPException from Crud import AppCrud from Schemas import AppSchemas from Schemas.UserSchemas import TokenData SECRET_KEY = "MADASDZXC255f" ALGORITHM = "HS256" def create_token(data: dict, secret_key=SECRET_KEY, algorithm=ALGORITHM, expires_delta: timedelta = timedelta(days=3)): # 设置加密数据 to_encode_body = dict() to_encode_body.update(data.copy()) # 设置过期时间 if expires_delta: expire = datetime.utcnow() + expires_delta to_encode_body.update({"exp": expire}) # Token编码 encoded_jwt = jwt.encode(to_encode_body, secret_key, algorithm=algorithm) return encoded_jwt def decode_token(token: str, secret_key: str, algorithms: List[str] = ['HS256']): payload = jwt.decode(token, secret_key, algorithms=algorithms) return payload class Token: SECRET_KEY = SECRET_KEY ALGORITHM = ALGORITHM @classmethod def create_token(cls, data: dict, expires_delta: timedelta = timedelta(days=3)): return create_token(data, cls.SECRET_KEY, cls.ALGORITHM, expires_delta) @classmethod def decode_token(cls, token: str): payload = decode_token(token, cls.SECRET_KEY, algorithms=[cls.ALGORITHM]) return payload def token_data_depend(Authorization: str = Header(None))->TokenData: try: print('Authorization',Authorization) _, jwt_token = Authorization.split(" ") payload = Token.decode_token(jwt_token) print(payload) token_data = TokenData(**payload) return token_data except Exception as e: print(e) raise HTTPException(status_code=403, detail="无权限的操作") # 管理员token验证 def admin_auth_token_depend(Authorization: str = Header(None)) -> TokenData: try: _, token = Authorization.split(" ") token_data: TokenData = token_data_depend(Authorization) except Exception as e: print(e) raise HTTPException(status_code=403, detail="非管理员,无权限的操作") if token_data.role != 'admin': print(token_data) raise HTTPException(status_code=403, detail="非管理员,无权限的操作") return token_data # 应用服务对用户模块的请求验证