from sqlalchemy.orm import Session from . import Schemas from .Models import EditRecord, EditItemType def edit_item_type_add(db: Session, data: Schemas.EditItemTypeAddInfo): item = EditItemType(**data.dict()) db.add(item) db.commit() db.refresh(item) return item def edit_item_type_delete(db: Session, item_id: int): db.query(EditItemType).filter_by(id=item_id).delete() db.commit() def edit_item_type_update(db: Session, data: Schemas.EditItemTypeUpdateInfo): db.query(EditItemType).filter_by(id=data.id).update({key: v for key, v in data.dict().items() if v is not None}) db.commit() item = db.query(EditItemType).filter_by(id=data.id).first() return item def edit_item_type_get(db: Session, item_id: int): item = db.query(EditItemType).filter_by(id=item_id).first() return item def edit_item_type_query(db: Session, params: Schemas.EditItemTypeQuery): params_dict = params.dict() query = db.query(EditItemType) db_model = EditItemType for key, value in params_dict.items(): if key not in ['page', 'page_size'] and value is not None: if type(value) == str: query = query.filter(getattr(db_model, key).like(f'%{value}%')) elif type(value) in [int, float, bool]: query = query.filter_by(**{key: value}) else: query = query.filter(getattr(db_model, key) == value) count = query.count() page = None page_size = None if 'page' in params_dict: page = params_dict['page'] if 'page_size' in params_dict: page_size = params_dict['page_size'] # 页数不超过100 page_size = min(page_size, 100) if page is not None and page_size is not None: query = query.offset((page - 1) * page_size).limit(page_size).all() return count, query ################# def edit_record_add(db: Session, data: Schemas.EditRecordAddInfo): item = EditRecord(**data.dict()) db.add(item) db.commit() db.refresh(item) return item def edit_record_delete(db: Session, item_id: int): db.query(EditRecord).filter_by(id=item_id).delete() db.commit() def edit_record_update(db: Session, data: Schemas.EditRecordUpdateInfo): db.query(EditRecord).filter_by(id=data.id).update({key: v for key, v in data.dict().items() if v is not None}) db.commit() item = db.query(EditRecord).filter_by(id=data.id).first() return item def edit_record_get(db: Session, item_id: int): item = db.query(EditRecord).filter_by(id=item_id).first() return item def edit_record_query(db: Session, params: Schemas.EditRecordQuery): params_dict = params.dict() query = db.query(EditRecord) db_model = EditRecord for key, value in params_dict.items(): if key not in ['page', 'page_size'] and value is not None: if type(value) == str: query = query.filter(getattr(db_model, key).like(f'%{value}%')) elif type(value) in [int, float, bool]: query = query.filter_by(**{key: value}) else: query = query.filter(getattr(db_model, key) == value) count = query.count() page = None page_size = None if 'page' in params_dict: page = params_dict['page'] if 'page_size' in params_dict: page_size = params_dict['page_size'] # 页数不超过100 page_size = min(page_size, 100) if page is not None and page_size is not None: query = query.offset((page - 1) * page_size).limit(page_size).all() return count, query