user-wsc/Utils/AccessControl/AccessUtil.py

47 lines
1.3 KiB
Python
Raw Normal View History

2022-11-07 13:53:42 +08:00
import os
2022-10-20 16:29:54 +08:00
from typing import Optional
2022-11-07 13:53:42 +08:00
import casbin
2022-11-01 14:02:58 +08:00
from fastapi import HTTPException, Header
2022-10-20 16:29:54 +08:00
from Utils.Authentication.TokenUtil import decode_token
2022-11-01 14:02:58 +08:00
def ac_admin(token: Optional[str] = Header(...)):
2022-10-20 16:29:54 +08:00
2022-11-01 14:18:09 +08:00
user_info = decode_token(token).get("user_info")
if not user_info:
raise HTTPException(status_code=400, detail="Invalid Token")
role = user_info.get("role")
2022-10-20 16:29:54 +08:00
2022-11-07 13:53:42 +08:00
e = casbin.Enforcer(
os.getcwd() + r"\Utils\AccessControl\model.conf",
os.getcwd() + r"\Utils\AccessControl\policy_index_store.csv"
)
sub = role
obj = ""
act = ""
if not e.enforce(sub, obj, act):
2022-11-01 14:02:58 +08:00
raise HTTPException(status_code=400, detail="No Access")
2022-10-20 16:29:54 +08:00
2022-11-07 13:53:42 +08:00
# if role != "admin":
# raise HTTPException(status_code=400, detail="No Access")
2022-11-01 14:02:58 +08:00
# def ac_index_store(request: Request, token: Optional[str] = Header(...)):
# e = casbin.Enforcer(
# os.getcwd() + r"\Utils\AccessControl\model.conf",
# os.getcwd() + r"\Utils\AccessControl\policy_index_store.csv"
# )
#
# sub = decode_token(token).get("role")
# obj = request.url.__str__().split(request.base_url.__str__()[:-1])[-1].split("?")[0]
# act = request.method
#
# if not e.enforce(sub, obj, act):
# raise HTTPException(status_code=400, detail="No Access")