user-wsc/AppUser/Crud/RoleCrud.py

79 lines
2.3 KiB
Python

from sqlalchemy.orm import Session
from AppUser.Schemas import RoleSchema
from AppUser.Model.RoleModel import Role, RoleAccessPolicy, RoleMenu
from Utils.UniqueCoder import TimeSerialNumUtils
def set_next_id(db: Session, num_len: int, prefix: str, model):
data = db.query(model).with_entities(model.id).order_by(model.id.desc()).first()
if data:
num = str(int({**data}.get("id").split(prefix)[-1]) + 1)
return prefix + "0" * (num_len - len(num)) + num
else:
return prefix + "0" * (num_len - 1) + "1"
def create_role(db: Session, name: str):
role_item = Role(**{"name": name})
role_item.id = set_next_id(db=db, num_len=2, prefix="ROLE", model=Role)
db.add(role_item)
db.commit()
db.refresh(role_item)
return role_item
def get_role_by_id(db: Session, role_id: str):
role_obj = db.query(Role).filter_by(id=role_id).first()
return role_obj
def get_role_by_name(db: Session, name: str):
role_obj = db.query(Role).filter_by(name=name).first()
return role_obj
def get_role_list(db: Session):
role_list = db.query(Role).order_by(Role.id.asc()).all()
return role_list
def create_role_menu(db: Session, role_id: str, subject: str):
role_menu_obj = RoleMenu(**{"subject": subject})
role_menu_obj.id = TimeSerialNumUtils.create_time_serial_num(prefix="SUB", suffix="")
role_menu_obj.role_id = role_id
db.add(role_menu_obj)
db.commit()
return True
def get_role_menu(db: Session, role_id: str):
role_menu = db.query(RoleMenu).filter_by(role_id=role_id).all()
return role_menu
def create_role_policy(db: Session, role_id: str, schema: RoleSchema.CreateRolePolicyReqBody):
rap_obj = RoleAccessPolicy(**schema.dict())
rap_obj.id = TimeSerialNumUtils.create_time_serial_num(prefix="AP", suffix="")
rap_obj.role_id = role_id
db.add(rap_obj)
db.commit()
db.refresh(rap_obj)
return rap_obj.to_dict()
def edit_role_policy(db: Session, ap_id: str, schema: RoleSchema.EditRolePolicyReqBody):
item = schema.del_null_value()
if not item:
return False
db.query(RoleAccessPolicy).filter_by(id=ap_id).update(item)
db.commit()
return True
def delete_role_policy(db: Session, ap_id: str):
db.query(RoleAccessPolicy).filter_by(id=ap_id).delete()
db.commit()
return True