usermod/Crud/AuthRuleCrud.py

113 lines
4.2 KiB
Python
Raw Normal View History

2023-02-09 15:49:45 +08:00
from typing import Union, List
2023-02-09 10:54:07 +08:00
from sqlalchemy.orm import Session
from Exceptions.common import CommonException
2023-02-13 14:42:35 +08:00
from Model.AuthRuleModel import AuthRule, DefaultAuthRuleConfig, AuthCategory
2023-02-10 09:15:17 +08:00
from Schemas import AuthRuleSchemas
2023-02-09 10:54:07 +08:00
2023-02-13 14:42:35 +08:00
def add_auth_rule(db: Session, name: str, category1: str = "", category2: str = "") -> AuthRule:
if db.query(AuthCategory).filter_by(id=category1).first() and db.query(AuthCategory).filter_by(
id=category2).first():
item = AuthRule(name=name, category1=category1, category2=category2)
db.add(item)
db.commit()
db.refresh(item)
return item
else:
raise CommonException('分类信息不正确')
def delete_auth_rule(db: Session, auth_rule_id: int):
db.query(AuthRule).filter_by(id=auth_rule_id).delete()
2023-02-09 10:54:07 +08:00
db.commit()
2023-02-13 14:42:35 +08:00
# def delete_auth_rule_by_id_list(db: Session, auth_rule_id: int):
# db.query(AuthRule).filter_by(id=auth_rule_id).delete()
# db.commit()
def change_auth_rule(db: Session, rule_id, new_data: dict):
# if "category1" in new_data:
# if not db.query(AuthCategory).filter_by(id=new_data['category1']).first():
# raise CommonException('分类信息不正确')
# if "category2" in new_data:
# if not db.query(AuthCategory).filter_by(id=new_data['category2']).first():
# raise CommonException('分类信息不正确')
db.query(AuthRule).filter_by(id=rule_id).update(new_data)
2023-02-10 14:01:39 +08:00
db.commit()
2023-02-10 09:15:17 +08:00
2023-02-13 14:42:35 +08:00
def get_auth_rule_list(db: Session, query_params: AuthRuleSchemas.AuthRuleQueryParams):
query = db.query(AuthRule)
for key, value in query_params.dict().items():
query = query.filter_by(**{key: value})
count = query.count()
if query_params.page is not None and query_params.page_size is not None:
query = query.offset((query_params.page - 1) * query_params.page_size).limit(query_params.page).all()
return query, count
def get_default_auth_rule_config(db: Session, item_id: int):
2023-02-10 09:15:17 +08:00
return db.query(DefaultAuthRuleConfig).filter_by(id=item_id).first()
def get_default_auth_rule_config_list(db: Session, params: AuthRuleSchemas.AuthRuleConfigQueryParams):
query = db.query(DefaultAuthRuleConfig)
if params.id:
2023-02-13 14:42:35 +08:00
query = query.filter(DefaultAuthRuleConfig.id.like(params.id))
2023-02-10 09:15:17 +08:00
if params.name:
query = query.filter(DefaultAuthRuleConfig.name.like(params.name))
count = query.count()
if params.page and params.page_size:
query = query.offset((params.page - 1) * params.page_size).limit(params.page).all()
return count, query
2023-02-13 14:42:35 +08:00
def add_auth_category(db: Session, name: str, belong: int = None,category_level:int=None) -> AuthCategory:
item = AuthCategory(name=name, belong=belong,category_level=category_level)
if not db.query(AuthCategory).filter_by(id=belong).first():
raise CommonException("错误的分类信息")
db.add(item)
db.commit()
db.refresh(item)
return item
def delete_auth_category(db: Session, auth_rule_id: int):
db.query(AuthRule).filter_by(id=auth_rule_id).delete()
db.commit()
def change_auth_category(db: Session, auth_rule_id: int, new_data: dict):
db.query(AuthCategory).filter_by(auth_rule_id).update(new_data)
db.commit()
pass
def get_auth_category_by_id(db: Session, item_id):
return db.get(item_id)
def change_auth_rule(db: Session, rule_id, new_data: dict):
if "category1" in new_data:
if not db.query(AuthCategory).filter_by(id=new_data['category1']).first():
raise CommonException('分类信息不正确')
if "category2" in new_data:
if not db.query(AuthCategory).filter_by(id=new_data['category2']).first():
raise CommonException('分类信息不正确')
db.query(AuthRule).filter_by(id=rule_id).update(new_data)
db.commit()
def get_auth_rule_list(db: Session, query_params: AuthRuleSchemas.AuthRuleQueryParams):
query = db.query(AuthRule)
for key, value in query_params.dict().items():
query = query.filter_by(**{key: value})
count = query.count()
if query_params.page is not None and query_params.page_size is not None:
query = query.offset((query_params.page - 1) * query_params.page_size).limit(query_params.page).all()
return query, count