import re import requests from typing import Optional from fastapi import HTTPException, Header, Request from Utils.Authentication import TokenUtil def get_user_info(token: str): """ 从token中解析用户信息 """ # 校验token decoded_token = TokenUtil.decode_token(token) if decoded_token == "Expired": raise HTTPException(status_code=401, detail="Token Has Expired") if decoded_token == "Invalid": raise HTTPException(status_code=401, detail="Invalid Token") # 校验用户信息 user_info = decoded_token.get("user_info") if not user_info: raise HTTPException(status_code=401, detail="Invalid Token") return user_info def get_role_access_policy(name: str, token: str): """ 获取用户的访问权限 """ url = "http://test.fecribd.com/api/user/role/role_access_policy/view?name={}" url = url.format(name) headers = {"token": token} res = requests.post(url=url, headers=headers) return res.json() def ac(token: Optional[str] = Header(None)): get_user_info(token) return True def rbac(request: Request, token: Optional[str] = Header(None)): # 获取用户角色 user_info = get_user_info(token) role = user_info.get("role") # 管理员权限 if role == "管理员": return True # 获取角色权限 raps = get_role_access_policy(name=role, token=token) # 权限匹配成功 for rap in raps: p_obj = rap.get("obj") r_obj = request.url.__str__() m = re.search(p_obj, r_obj) if m: return True # 权限匹配失败 raise HTTPException(status_code=202, detail="No Access")