daily/Crud/DailyCrud.py

105 lines
4.6 KiB
Python

from datetime import datetime
from sqlalchemy import func, or_, and_
from sqlalchemy.orm import Session
from Models.DailyModel import Daily
from Schemas.DailySchemas import DailyQuery, DailyTypeEnum
def daily_get(db: Session, item_id):
return db.query(Daily).filter_by(id=item_id).first()
def daily_add(db: Session, data):
daily = Daily(**data)
db.add(daily)
db.commit()
db.refresh(daily)
return daily
def daily_change(db: Session, item_id, data):
db.query(Daily).filter_by(id=item_id).update(data)
db.commit()
item = db.query(Daily).filter_by(id=item_id).first()
return item
def daily_delete(db: Session, item_id):
db.query(Daily).filter_by(id=item_id).delete()
db.commit()
def daily_query(db: Session, params: DailyQuery, department: str, auth: str = None) -> [int]:
print(params)
params_dict = params.dict()
query = db.query(Daily)
db_model = Daily
if auth:
auth_list = auth.split(',')
# 有日报管理权限的可以查看所有 10
# 所有部门动态查看 2
#
if '10' not in auth_list and '2' not in auth_list and params.type == DailyTypeEnum.动态:
has_auth = [int(item) for item in auth_list if item in ['17', '18']]
# query = query.filter(or_(*[func.find_in_set(str(item), Daily.required_auth) for item in has_auth],
# Daily.required_auth._in['', None]))
departments = [int(item) for item in department.split(",")]
query = query.filter(or_(Daily.department.in_(departments), and_(Daily.department.not_in(departments),
or_(*[func.find_in_set(str(item),
Daily.required_auth)
for item in has_auth],
not Daily.required_auth))))
for key, value in params_dict.items():
if key == 'department' and type(value) == list:
query = query.filter(Daily.department.in_(value))
continue
# if key == 'required_auth':
# if not value:
# continue
# if auth:
# auth_list = auth.split(',')
# # 有日报管理权限的可以查看所有
# if '10' in auth_list:
# continue
# else:
# has_auth = [int(item) for item in auth_list if item in ['17', '18']]
# query = query.filter(or_(*[func.find_in_set(str(item), Daily.required_auth) for item in has_auth],
# Daily.required_auth._in['', None]))
# continue
# print(key, value)
if key not in ['page', 'page_size'] and value is not None:
# 在存储的数组值内查询 如存的 1,2,3,4 查询时则使用的 [1,2]这样的数据来查
# if key in ["xx"]:
# for item in value:
# query = query.filter(func.find_in_set(str(item), getattr(db_model, key)))
# continue
if type(value) == str:
query = query.filter(getattr(db_model, key).like(f'%{value}%'))
elif type(value) in [int, float, bool]:
query = query.filter_by(**{key: value})
# 日期范围查询
elif type(value) in [list, tuple] and getattr(db_model, key).type.python_type == datetime:
if value:
if value[0] is not None:
query = query.filter(getattr(db_model, key) >= datetime.fromtimestamp(value[0] / 1000))
print(datetime.fromtimestamp(value[0] / 1000))
if value[1] is not None:
query = query.filter(getattr(db_model, key) < datetime.fromtimestamp(value[1] / 1000))
print(datetime.fromtimestamp(value[1] / 1000))
else:
query = query.filter(getattr(db_model, key) == value)
query = query.order_by(Daily.daily_time.desc())
count = query.count()
page = None
page_size = None
if 'page' in params_dict:
page = params_dict['page']
if 'page_size' in params_dict:
page_size = params_dict['page_size']
if page is not None and page_size is not None:
query = query.offset((page - 1) * page_size).limit(page_size).all()
# print(query,"daily_query")
return count, query