112 lines
4.2 KiB
Python
112 lines
4.2 KiB
Python
from typing import Union, List
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from Exceptions.common import CommonException
|
|
from Model.AuthRuleModel import AuthRule, DefaultAuthRuleConfig, AuthCategory
|
|
from Schemas import AuthRuleSchemas
|
|
|
|
|
|
def add_auth_rule(db: Session, name: str, category1: str = "", category2: str = "") -> AuthRule:
|
|
item = AuthRule(name=name, category1=category1, category2=category2)
|
|
db.add(item)
|
|
db.commit()
|
|
db.refresh(item)
|
|
return item
|
|
|
|
|
|
def delete_auth_rule(db: Session, auth_rule_id: int):
|
|
db.query(AuthRule).filter_by(id=auth_rule_id).delete()
|
|
db.commit()
|
|
|
|
|
|
# def delete_auth_rule_by_id_list(db: Session, auth_rule_id: int):
|
|
# db.query(AuthRule).filter_by(id=auth_rule_id).delete()
|
|
# db.commit()
|
|
|
|
|
|
def change_auth_rule(db: Session, rule_id, new_data: dict):
|
|
# if "category1" in new_data:
|
|
# if not db.query(AuthCategory).filter_by(id=new_data['category1']).first():
|
|
# raise CommonException('分类信息不正确')
|
|
# if "category2" in new_data:
|
|
# if not db.query(AuthCategory).filter_by(id=new_data['category2']).first():
|
|
# raise CommonException('分类信息不正确')
|
|
db.query(AuthRule).filter_by(id=rule_id).update(new_data)
|
|
db.commit()
|
|
|
|
|
|
def get_auth_rule_list(db: Session, query_params: AuthRuleSchemas.AuthRuleQueryParams):
|
|
query = db.query(AuthRule)
|
|
for key, value in query_params.dict().items():
|
|
print(key, value)
|
|
if key not in ['page', 'page_size'] and value is not None:
|
|
if type(value) == str:
|
|
query = query.filter(getattr(AuthRule, key).like(f'%{value}%'))
|
|
count = query.count()
|
|
if query_params.page is not None and query_params.page_size is not None:
|
|
query = query.offset((query_params.page - 1) * query_params.page_size).limit(query_params.page_size).all()
|
|
return count, query
|
|
|
|
|
|
def get_default_auth_rule_config(db: Session, item_id: int):
|
|
return db.query(DefaultAuthRuleConfig).filter_by(id=item_id).first()
|
|
|
|
|
|
def get_default_auth_rule_config_list(db: Session, params: AuthRuleSchemas.AuthRuleConfigQueryParams):
|
|
query = db.query(DefaultAuthRuleConfig)
|
|
if params.id:
|
|
query = query.filter(DefaultAuthRuleConfig.id.like(f'%{params.id}%'))
|
|
if params.name:
|
|
query = query.filter(DefaultAuthRuleConfig.name.like(f'%{params.name}%'))
|
|
count = query.count()
|
|
if params.page and params.page_size:
|
|
query = query.offset((params.page - 1) * params.page_size).limit(params.page_size).all()
|
|
return count, query
|
|
|
|
|
|
def add_auth_category(db: Session, name: str, belong: int = None, category_level: int = None) -> AuthCategory:
|
|
item = AuthCategory(name=name, belong=belong, category_level=category_level)
|
|
if not db.query(AuthCategory).filter_by(id=belong).first():
|
|
raise CommonException("错误的分类信息")
|
|
db.add(item)
|
|
db.commit()
|
|
db.refresh(item)
|
|
return item
|
|
|
|
|
|
def delete_auth_category(db: Session, auth_rule_id: int):
|
|
db.query(AuthRule).filter_by(id=auth_rule_id).delete()
|
|
db.commit()
|
|
|
|
|
|
def change_auth_category(db: Session, auth_rule_id: int, new_data: dict):
|
|
db.query(AuthCategory).filter_by(auth_rule_id).update(new_data)
|
|
db.commit()
|
|
pass
|
|
|
|
|
|
def get_auth_category_by_id(db: Session, item_id):
|
|
return db.get(item_id)
|
|
|
|
|
|
def change_auth_rule(db: Session, rule_id, new_data: dict):
|
|
# if "category1" in new_data:
|
|
# if not db.query(AuthCategory).filter_by(id=new_data['category1']).first():
|
|
# raise CommonException('分类信息不正确')
|
|
# if "category2" in new_data:
|
|
# if not db.query(AuthCategory).filter_by(id=new_data['category2']).first():
|
|
# raise CommonException('分类信息不正确')
|
|
db.query(AuthRule).filter_by(id=rule_id).update(new_data)
|
|
db.commit()
|
|
|
|
# def get_auth_rule_list(db: Session, query_params: AuthRuleSchemas.AuthRuleQueryParams):
|
|
# query = db.query(AuthRule)
|
|
# for key, value in query_params.dict().items():
|
|
# if key not in ['page','page_size']:
|
|
# query = query.filter_by(**{key: value})
|
|
# count = query.count()
|
|
# if query_params.page is not None and query_params.page_size is not None:
|
|
# query = query.offset((query_params.page - 1) * query_params.page_size).limit(query_params.page).all()
|
|
# return count,query
|