from datetime import datetime from sqlalchemy import func from sqlalchemy.orm import Session from Models.DailyModel import Daily from Schemas.DailySchemas import DailyQuery def daily_get(db: Session, item_id): return db.query(Daily).filter_by(id=item_id).first() def daily_add(db: Session, data): daily = Daily(**data) db.add(daily) db.commit() db.refresh(daily) return daily def daily_change(db: Session, item_id, data): db.query(Daily).filter_by(id=item_id).update(data) db.commit() item=db.query(Daily).filter_by(id=item_id).first() return item def daily_delete(db: Session, item_id): db.query(Daily).filter_by(id=item_id).delete() db.commit() def daily_query(db: Session, params: DailyQuery) -> [int]: params_dict = params.dict() query = db.query(Daily) db_model = Daily for key, value in params_dict.items(): # print(key, value) if key not in ['page', 'page_size'] and value is not None: # 在存储的数组值内查询 如存的 1,2,3,4 查询时则使用的 [1,2]这样的数据来查 # if key in ["xx"]: # for item in value: # query = query.filter(func.find_in_set(str(item), getattr(db_model, key))) # continue if type(value) == str: query = query.filter(getattr(db_model, key).like(f'%{value}%')) elif type(value) in [int, float, bool]: query = query.filter_by(**{key: value}) # 日期范围查询 elif type(value) in [list, tuple] and getattr(db_model, key).type.python_type == datetime: if value: if value[0] is not None: query = query.filter(getattr(db_model, key) >= datetime.fromtimestamp(value[0] / 1000)) if value[1] is not None: query = query.filter(getattr(db_model, key) <= datetime.fromtimestamp(value[1] / 1000)) else: query = query.filter(getattr(db_model, key) == value) query = query.order_by(Daily.daily_time.desc()) count = query.count() page = None page_size = None if 'page' in params_dict: page = params_dict['page'] if 'page_size' in params_dict: page_size = params_dict['page_size'] if page is not None and page_size is not None: query = query.offset((page - 1) * page_size).limit(page_size).all() # print(query,"daily_query") return count, query