wd-rating/utils/auth_utils.py

110 lines
3.8 KiB
Python

from typing import List, TypeVar, Generic, Union, Optional, Type
from jose import jwt
from datetime import datetime, timedelta
from fastapi import Header, HTTPException
from pydantic import BaseModel
class TokenDataModel(BaseModel):
email: str
name: Optional[str]
registered: Optional[bool]
auth_data: Optional[str]
department: Optional[str]
post: Union[str]
TokenDataType = TypeVar("TokenDataType", bound=BaseModel)
def create_token(data: dict, secret_key, algorithm, expires_delta: timedelta = timedelta(days=3)):
# 设置加密数据
to_encode_body = dict()
to_encode_body.update(data.copy())
# 设置过期时间
if expires_delta:
expire = datetime.utcnow() + expires_delta
to_encode_body.update({"exp": expire})
# Token编码
encoded_jwt = jwt.encode(to_encode_body, secret_key, algorithm=algorithm)
return encoded_jwt
def decode_token(token: str, secret_key: str, algorithms: List[str] = ['HS256']):
payload = jwt.decode(token, secret_key, algorithms=algorithms)
return payload
class AuthUtil(Generic[TokenDataType]):
def __init__(self, secret_key="MADASDZXC255f", algorithm="HS256",
tokenDataModel: Type[TokenDataType] = TokenDataModel):
self.secret_key = secret_key
self.algorithm = algorithm
self.tokenDataModel = tokenDataModel
def create_token(self, data: dict, expires_delta: timedelta = timedelta(days=3)):
return create_token(data, self.secret_key, self.algorithm, expires_delta)
def decode_token(self, token: str):
payload = decode_token(token, self.secret_key, algorithms=[self.algorithm])
return payload
def token_data_depend(self, authorization: str = Header(None)) -> TokenDataType:
try:
jwt_token = authorization.split(" ")[-1]
payload = self.decode_token(jwt_token)
token_data = self.tokenDataModel(**payload)
return token_data
except Exception as e:
print(e)
raise HTTPException(status_code=403, detail="无权限的操作")
# 应用服务对用户模块的请求验证
# 接口需要的权限列表
def get_auth_rule_check_depend(self, requisite_rules=[], contains_rules=[]):
"""
接口需要的权限列表验证
:param requisite_rules: 必须的权限规则id
:param contains_rules: 权限只包含其中一个即可授权的权限列表
:return:
"""
def auth_rule_check_depend(authorization: str = Header(None)):
try:
jwt_token = authorization.split(" ")[-1]
payload = self.decode_token(jwt_token)
token_data = self.tokenDataModel(**payload)
if not check_auth(token_data.auth_data, requisite_rules, contains_rules):
raise HTTPException(status_code=403, detail="无权限的操作")
return token_data
except Exception as e:
print(e)
raise HTTPException(status_code=403, detail="无权限的操作")
return auth_rule_check_depend
def check_auth(auth_data, requisite_rules=[], contains_rules=[]):
auth_rule_id_list = []
if auth_data:
auth_rule_id_list = [int(item) for item in auth_data.split(',')]
pass
if contains_rules:
contains_rules_checked = False
for item in auth_rule_id_list:
if item in contains_rules:
contains_rules_checked = True
break
else:
contains_rules_checked = True
if requisite_rules:
requisite_rules_checked = True
for item in requisite_rules:
if item not in auth_rule_id_list:
requisite_rules_checked = False
break
else:
requisite_rules_checked = True
return contains_rules_checked and requisite_rules_checked