usermod/Crud/AuthRuleCrud.py

112 lines
4.2 KiB
Python

from typing import Union, List
from sqlalchemy.orm import Session
from Exceptions.common import CommonException
from Model.AuthRuleModel import AuthRule, DefaultAuthRuleConfig, AuthCategory
from Schemas import AuthRuleSchemas
def add_auth_rule(db: Session, name: str, category1: str = "", category2: str = "") -> AuthRule:
item = AuthRule(name=name, category1=category1, category2=category2)
db.add(item)
db.commit()
db.refresh(item)
return item
def delete_auth_rule(db: Session, auth_rule_id: int):
db.query(AuthRule).filter_by(id=auth_rule_id).delete()
db.commit()
# def delete_auth_rule_by_id_list(db: Session, auth_rule_id: int):
# db.query(AuthRule).filter_by(id=auth_rule_id).delete()
# db.commit()
def change_auth_rule(db: Session, rule_id, new_data: dict):
# if "category1" in new_data:
# if not db.query(AuthCategory).filter_by(id=new_data['category1']).first():
# raise CommonException('分类信息不正确')
# if "category2" in new_data:
# if not db.query(AuthCategory).filter_by(id=new_data['category2']).first():
# raise CommonException('分类信息不正确')
db.query(AuthRule).filter_by(id=rule_id).update(new_data)
db.commit()
def get_auth_rule_list(db: Session, query_params: AuthRuleSchemas.AuthRuleQueryParams):
query = db.query(AuthRule)
for key, value in query_params.dict().items():
print(key, value)
if key not in ['page', 'page_size'] and value is not None:
if type(value) == str:
query = query.filter(getattr(AuthRule, key).like(f'%{value}%'))
count = query.count()
if query_params.page is not None and query_params.page_size is not None:
query = query.offset((query_params.page - 1) * query_params.page_size).limit(query_params.page_size).all()
return count, query
def get_default_auth_rule_config(db: Session, item_id: int):
return db.query(DefaultAuthRuleConfig).filter_by(id=item_id).first()
def get_default_auth_rule_config_list(db: Session, params: AuthRuleSchemas.AuthRuleConfigQueryParams):
query = db.query(DefaultAuthRuleConfig)
if params.id:
query = query.filter(DefaultAuthRuleConfig.id.like(f'%{params.id}%'))
if params.name:
query = query.filter(DefaultAuthRuleConfig.name.like(f'%{params.name}%'))
count = query.count()
if params.page and params.page_size:
query = query.offset((params.page - 1) * params.page_size).limit(params.page_size).all()
return count, query
def add_auth_category(db: Session, name: str, belong: int = None, category_level: int = None) -> AuthCategory:
item = AuthCategory(name=name, belong=belong, category_level=category_level)
if not db.query(AuthCategory).filter_by(id=belong).first():
raise CommonException("错误的分类信息")
db.add(item)
db.commit()
db.refresh(item)
return item
def delete_auth_category(db: Session, auth_rule_id: int):
db.query(AuthRule).filter_by(id=auth_rule_id).delete()
db.commit()
def change_auth_category(db: Session, auth_rule_id: int, new_data: dict):
db.query(AuthCategory).filter_by(auth_rule_id).update(new_data)
db.commit()
pass
def get_auth_category_by_id(db: Session, item_id):
return db.get(item_id)
def change_auth_rule(db: Session, rule_id, new_data: dict):
# if "category1" in new_data:
# if not db.query(AuthCategory).filter_by(id=new_data['category1']).first():
# raise CommonException('分类信息不正确')
# if "category2" in new_data:
# if not db.query(AuthCategory).filter_by(id=new_data['category2']).first():
# raise CommonException('分类信息不正确')
db.query(AuthRule).filter_by(id=rule_id).update(new_data)
db.commit()
# def get_auth_rule_list(db: Session, query_params: AuthRuleSchemas.AuthRuleQueryParams):
# query = db.query(AuthRule)
# for key, value in query_params.dict().items():
# if key not in ['page','page_size']:
# query = query.filter_by(**{key: value})
# count = query.count()
# if query_params.page is not None and query_params.page_size is not None:
# query = query.offset((query_params.page - 1) * query_params.page_size).limit(query_params.page).all()
# return count,query