import re import os import casbin import requests import pandas as pd from typing import Optional from fastapi import HTTPException, Header, Request from Utils.Authentication.TokenUtil import decode_token from Utils.UniqueCoder.TimeSerialNumUtils import create_time_serial_num def get_user_info_from_token(token: str): user_info = decode_token(token).get("user_info") if not user_info: raise HTTPException(status_code=400, detail="Invalid Token") return user_info def match_req_url(regex: str, request: Request): req_url = request.url.__str__() m = re.search(regex, req_url) if not m: raise HTTPException(status_code=400, detail="Invalid Request") return m def get_rap_by_name(name): url = "http://test.fecribd.com/api/user/role/role_access_policy/view?name={}" url = url.format(name) res = requests.post(url=url) return res.json() def rbac(request: Request, token: Optional[str] = Header(...)): user_info = get_user_info_from_token(token) sub = user_info.get("role") raps = get_rap_by_name(name=sub) req_url = request.url.__str__() for rap in raps: regex = rap.get("obj") m = re.search(regex, req_url) if m: df = pd.json_normalize(raps) policy_file = create_time_serial_num(prefix="policy", suffix="") df.to_csv(r".\Utils\AccessControl\{}.csv".format(policy_file), header=False, index=False) e = casbin.Enforcer( os.getcwd() + r"\Utils\AccessControl\model.conf", os.getcwd() + r"\Utils\AccessControl\{}.csv".format(policy_file) ) os.remove(r".\Utils\AccessControl\{}.csv".format(policy_file)) obj = rap.get("obj") act = rap.get("act") if not e.enforce(sub, obj, act): raise HTTPException(status_code=202, detail="No Access") return True raise HTTPException(status_code=400, detail="Invalid Request")