from typing import List from sqlalchemy.orm import Session from Model.DepartmentModel import Department from Model.UserModel import User from Schemas import DepartmentSchemas def get_department_list(db: Session, query_params: DepartmentSchemas.DepartmentListQueryParams): query = db.query(Department) count = query.count() if query_params.page is not None and query_params.page_size is not None: query = query.offset((query_params.page - 1) * query_params.page_size).limit(query_params.page).all() return count, query def is_department_created(db: Session, name: str = None, department_id=None): if name is not None: if db.query(Department).filter_by(name=name).first(): return True else: return False if department_id is not None: if db.query(Department).filter_by(id=department_id).first(): return True else: return False def add_department(db: Session, name: str, belong: int): if not is_department_created(db, name): db.add(Department(name=name, belong=belong)) db.commit() else: raise Exception("部门已经存在") def change_department(db: Session, department_id: int, name: str, belong: int): db.query(Department).filter_by(id=department_id).update({"name": name, "belong": belong}) db.commit() def delete_department(db: Session, item_id: str): db.query(Department).filter_by(id=item_id).delete() db.commit() def set_user_department(db: Session, user_id: str, department_id_list: List[int]): department_info = "".join([str(item) for item in department_id_list]) db.query(User).filter_by(id=user_id).update({"department": department_info}) db.commit() db.refresh(User) return User.department def get_department_tree(db: Session) -> dict: item_list = db.query(Department).all() node_list = [] for item in item_list: item_dic = item.to_dict() item_dic['children'] = [] node_list.append(item_dic) nodes_dic = {node["id"]: node for node in node_list} tree = [] for node in node_list: belong = node['belong'] if belong: if belong in nodes_dic: nodes_dic[belong]['children'].append(node) else: print(f"不存在{belong}") else: tree.append(node) return tree