usermod/Crud/AuthRuleCrud.py

89 lines
2.8 KiB
Python
Raw Normal View History

2023-02-09 15:49:45 +08:00
from typing import Union, List
2023-02-09 10:54:07 +08:00
from sqlalchemy.orm import Session
from Exceptions.common import CommonException
from Model.AuthRuleModel import AuthRuleNode
def add_auth_rule_node(db: Session, name: str, belong: Union[int, None], node_type: str) -> AuthRuleNode:
2023-02-09 15:49:45 +08:00
if belong:
parent_node = get_auth_rule_node_by_id(belong)
if not parent_node:
raise CommonException("父节点不存在")
if parent_node.node_type != 'category':
raise CommonException("父节点类型不为分类节点")
2023-02-09 10:54:07 +08:00
item = AuthRuleNode(name=name, belong=belong, node_type=node_type)
db.add(item)
db.commit()
db.refresh(item)
return item
def get_auth_rule_node_by_id(db: Session, node_id: int) -> AuthRuleNode:
auth_rule_node = db.query(AuthRuleNode).filter_by(id=node_id).first()
return auth_rule_node
2023-02-09 15:49:45 +08:00
def delete_auth_rule_node_by_id_list(db: Session, node_id_list: List[int]):
delete_id_set = set(node_id_list)
node_id_set_temp = set(node_id_list)
n = 0
while True:
n += 1
node_id_set_temp = [item.id for item in
db.query(AuthRuleNode).filter(AuthRuleNode.belong.in_(node_id_set_temp))]
if node_id_set_temp:
for node_id in node_id_set_temp:
delete_id_set.add(node_id)
else:
break
# 可能出现死循环?
if n > 15:
raise CommonException("接口出现错误")
db.query(AuthRuleNode).filter(AuthRuleNode.belong.in_(delete_id_set)).delete()
2023-02-09 10:54:07 +08:00
2023-02-09 15:49:45 +08:00
# test_list = [
# {'id': 1, "belong": 7},
# {'id': 2, "belong": 1},
# {'id': 3, "belong": 2},
# {'id': 4, "belong": 3},
# {'id': 5, "belong": 8},
# {'id': 6, "belong": 5},
# {'id': 7, "belong": None},
# {'id': 8, "belong": None},
# ]
def get_auth_rule_tree(db: Session) -> dict:
item_list = db.query(AuthRuleNode).all()
node_list = []
for item in item_list:
item_dic = item.to_dict()
item_dic['children'] = []
node_list.append(item_dic)
nodes_dic = {node["id"]: node for node in node_list}
rule_tree = []
for node in node_list:
belong = node['belong']
if belong:
if belong in nodes_dic:
nodes_dic[belong]['children'].append(node)
else:
print(f"不存在{belong}")
else:
rule_tree.append(node)
return rule_tree
def change_auth_rule_node(db: Session, node_id: int, name: str, belong: Union[int, None]):
if belong:
parent_node = get_auth_rule_node_by_id(belong)
if not parent_node:
raise CommonException("父节点不存在")
if parent_node.node_type != 'category':
raise CommonException("父节点类型不为分类节点")
db.query(AuthRuleNode).filter_by(node_id).update({'name': name, 'belong': belong})