usermod/Crud/AuthRuleCrud.py

112 lines
4.2 KiB
Python
Raw Normal View History

2023-02-09 15:49:45 +08:00
from typing import Union, List
2023-02-09 10:54:07 +08:00
from sqlalchemy.orm import Session
from Exceptions.common import CommonException
2023-02-13 14:42:35 +08:00
from Model.AuthRuleModel import AuthRule, DefaultAuthRuleConfig, AuthCategory
2023-02-10 09:15:17 +08:00
from Schemas import AuthRuleSchemas
2023-02-09 10:54:07 +08:00
2023-02-13 14:42:35 +08:00
def add_auth_rule(db: Session, name: str, category1: str = "", category2: str = "") -> AuthRule:
2023-02-15 17:03:04 +08:00
item = AuthRule(name=name, category1=category1, category2=category2)
db.add(item)
db.commit()
db.refresh(item)
return item
2023-02-13 14:42:35 +08:00
def delete_auth_rule(db: Session, auth_rule_id: int):
db.query(AuthRule).filter_by(id=auth_rule_id).delete()
2023-02-09 10:54:07 +08:00
db.commit()
2023-02-13 14:42:35 +08:00
# def delete_auth_rule_by_id_list(db: Session, auth_rule_id: int):
# db.query(AuthRule).filter_by(id=auth_rule_id).delete()
# db.commit()
def change_auth_rule(db: Session, rule_id, new_data: dict):
# if "category1" in new_data:
# if not db.query(AuthCategory).filter_by(id=new_data['category1']).first():
# raise CommonException('分类信息不正确')
# if "category2" in new_data:
# if not db.query(AuthCategory).filter_by(id=new_data['category2']).first():
# raise CommonException('分类信息不正确')
db.query(AuthRule).filter_by(id=rule_id).update(new_data)
2023-02-10 14:01:39 +08:00
db.commit()
2023-02-10 09:15:17 +08:00
2023-02-13 14:42:35 +08:00
def get_auth_rule_list(db: Session, query_params: AuthRuleSchemas.AuthRuleQueryParams):
query = db.query(AuthRule)
for key, value in query_params.dict().items():
2023-02-15 17:03:04 +08:00
print(key, value)
if key not in ['page', 'page_size'] and value is not None:
if type(value) == str:
query = query.filter(getattr(AuthRule, key).like(f'%{value}%'))
2023-02-13 14:42:35 +08:00
count = query.count()
if query_params.page is not None and query_params.page_size is not None:
2023-02-15 17:03:04 +08:00
query = query.offset((query_params.page - 1) * query_params.page_size).limit(query_params.page_size).all()
return count, query
2023-02-13 14:42:35 +08:00
def get_default_auth_rule_config(db: Session, item_id: int):
2023-02-10 09:15:17 +08:00
return db.query(DefaultAuthRuleConfig).filter_by(id=item_id).first()
def get_default_auth_rule_config_list(db: Session, params: AuthRuleSchemas.AuthRuleConfigQueryParams):
query = db.query(DefaultAuthRuleConfig)
if params.id:
2023-02-15 17:03:04 +08:00
query = query.filter(DefaultAuthRuleConfig.id.like(f'%{params.id}%'))
2023-02-10 09:15:17 +08:00
if params.name:
2023-02-15 17:03:04 +08:00
query = query.filter(DefaultAuthRuleConfig.name.like(f'%{params.name}%'))
2023-02-10 09:15:17 +08:00
count = query.count()
if params.page and params.page_size:
2023-02-17 10:55:02 +08:00
query = query.offset((params.page - 1) * params.page_size).limit(params.page_size).all()
2023-02-10 09:15:17 +08:00
return count, query
2023-02-13 14:42:35 +08:00
2023-02-15 17:03:04 +08:00
def add_auth_category(db: Session, name: str, belong: int = None, category_level: int = None) -> AuthCategory:
item = AuthCategory(name=name, belong=belong, category_level=category_level)
2023-02-13 14:42:35 +08:00
if not db.query(AuthCategory).filter_by(id=belong).first():
raise CommonException("错误的分类信息")
db.add(item)
db.commit()
db.refresh(item)
return item
def delete_auth_category(db: Session, auth_rule_id: int):
db.query(AuthRule).filter_by(id=auth_rule_id).delete()
db.commit()
def change_auth_category(db: Session, auth_rule_id: int, new_data: dict):
db.query(AuthCategory).filter_by(auth_rule_id).update(new_data)
db.commit()
pass
def get_auth_category_by_id(db: Session, item_id):
return db.get(item_id)
def change_auth_rule(db: Session, rule_id, new_data: dict):
2023-02-15 17:03:04 +08:00
# if "category1" in new_data:
# if not db.query(AuthCategory).filter_by(id=new_data['category1']).first():
# raise CommonException('分类信息不正确')
# if "category2" in new_data:
# if not db.query(AuthCategory).filter_by(id=new_data['category2']).first():
# raise CommonException('分类信息不正确')
2023-02-13 14:42:35 +08:00
db.query(AuthRule).filter_by(id=rule_id).update(new_data)
db.commit()
2023-02-15 17:03:04 +08:00
# def get_auth_rule_list(db: Session, query_params: AuthRuleSchemas.AuthRuleQueryParams):
# query = db.query(AuthRule)
# for key, value in query_params.dict().items():
# if key not in ['page','page_size']:
# query = query.filter_by(**{key: value})
# count = query.count()
# if query_params.page is not None and query_params.page_size is not None:
# query = query.offset((query_params.page - 1) * query_params.page_size).limit(query_params.page).all()
# return count,query