import re import requests from typing import Optional from fastapi import HTTPException, Header, Request from Utils.Authentication.TokenUtil import decode_token def get_user_info(token: str): """ 从token中解析用户信息 """ user_info = decode_token(token).get("user_info") if not user_info: raise HTTPException(status_code=401, detail="Invalid Token") return user_info def get_role_access_policy(name: str, token: str): """ 获取用户的访问权限 """ url = "http://test.fecribd.com/api/user/role/role_access_policy/view?name={}" url = url.format(name) headers = {"token": token} res = requests.post(url=url, headers=headers) return res.json() def ac(token: Optional[str] = Header(...)): get_user_info(token) return True def rbac(request: Request, token: Optional[str] = Header(None)): # 获取用户角色 user_info = get_user_info(token) role = user_info.get("role") # 管理员权限 if role == "管理员": return True # 获取角色权限 raps = get_role_access_policy(name=role, token=token) # 权限匹配成功 for rap in raps: p_obj = rap.get("obj") r_obj = request.url.__str__() m = re.search(p_obj, r_obj) if m: return True # 权限匹配失败 raise HTTPException(status_code=202, detail="No Access")