from datetime import datetime from Models.DepartmentModel import Department from Schemas.UserSchemas import TokenData from sqlalchemy import func, or_, and_ from sqlalchemy.orm import Session from Models.DailyModel import Daily from Schemas.DailySchemas import DailyQuery, DailyTypeEnum def daily_get(db: Session, item_id): return db.query(Daily).filter_by(id=item_id).first() def daily_add(db: Session, data): daily = Daily(**data) db.add(daily) db.commit() db.refresh(daily) return daily def daily_change(db: Session, item_id, data): db.query(Daily).filter_by(id=item_id).update(data) db.commit() item = db.query(Daily).filter_by(id=item_id).first() return item def daily_delete(db: Session, item_id): db.query(Daily).filter_by(id=item_id).delete() db.commit() def daily_query(db: Session, params: DailyQuery, token_data: TokenData) -> [int]: print(params) department = token_data.department auth = token_data.auth_data or '' params_dict = params.dict() query = db.query(Daily) db_model = Daily if auth: auth_list = auth.split(',') # 有日报管理权限的可以查看所有 10 # 所有部门动态查看 2 # 动态权限设为了董监高的话,只有自己和董监高可以看 if '10' not in auth_list and '2' not in auth_list: has_auth = [int(item) for item in auth_list if item in ['17', '18']] departments = [int(item) for item in department.split(",")] # 筛选能有权限看的日报 query = query.filter( or_(*[func.find_in_set(str(item), Daily.required_auth) for item in has_auth], Daily.required_auth == None, Daily.required_auth == '', Daily.fill_user == token_data.email )) if params.type == DailyTypeEnum.部门子公司日报: 业务部门 = [] # 如果是只能看自己日报的,在业务部门内只能看自己日报 if '20' in auth_list: for dp in db.query(Department).filter(Department.id.in_(departments)): if dp.type.value == '业务部门': 业务部门.append(dp.id) query_department = [d for d in departments if d not in 业务部门] if query_department: query = query.filter(or_(Daily.department.in_(query_department),Daily.fill_user == token_data.email )) # .filter( # or_(*[func.find_in_set(str(item), Daily.required_auth) for item in has_auth], # Daily.required_auth == None, Daily.required_auth == '', # Daily.fill_user == token_data.email # ) # ) else: query = query.filter(Daily.fill_user == token_data.email) # 能看到自己动态 ,但看不到同部门的保密日报 for key, value in params_dict.items(): if value in ['', None]: continue if key == 'department' and type(value) == list: query = query.filter(Daily.department.in_(value)) continue # if key == 'required_auth': # if not value: # continue # if auth: # auth_list = auth.split(',') # # 有日报管理权限的可以查看所有 # if '10' in auth_list: # continue # else: # has_auth = [int(item) for item in auth_list if item in ['17', '18']] # query = query.filter(or_(*[func.find_in_set(str(item), Daily.required_auth) for item in has_auth], # Daily.required_auth._in['', None])) # continue # print(key, value) if key not in ['page', 'page_size'] and value is not None: # 在存储的数组值内查询 如存的 1,2,3,4 查询时则使用的 [1,2]这样的数据来查 # if key in ["xx"]: # for item in value: # query = query.filter(func.find_in_set(str(item), getattr(db_model, key))) # continue if type(value) == str: query = query.filter(getattr(db_model, key).like(f'%{value}%')) elif type(value) in [int, float, bool]: query = query.filter_by(**{key: value}) # 日期范围查询 elif type(value) in [list, tuple] and getattr(db_model, key).type.python_type == datetime: if value: if value[0] is not None: query = query.filter(getattr(db_model, key) >= datetime.fromtimestamp(value[0] / 1000)) print(datetime.fromtimestamp(value[0] / 1000)) if value[1] is not None: query = query.filter(getattr(db_model, key) < datetime.fromtimestamp(value[1] / 1000)) print(datetime.fromtimestamp(value[1] / 1000)) else: query = query.filter(getattr(db_model, key) == value) query = query.order_by(Daily.daily_time.desc()) count = query.count() page = None page_size = None if 'page' in params_dict: page = params_dict['page'] if 'page_size' in params_dict: page_size = params_dict['page_size'] if page is not None and page_size is not None: query = query.offset((page - 1) * page_size).limit(page_size).all() # print(query,"daily_query") return count, query