daily/Crud/DailyCrud.py

195 lines
7.6 KiB
Python

from datetime import datetime, date
import re
from Models.DepartmentModel import Department
from Models.UserModel import User
from Schemas.UserSchemas import TokenData
from sqlalchemy import func, or_, and_, DATE, cast
from sqlalchemy.orm import Session
from Models.DailyModel import Daily
from Schemas.DailySchemas import DailyQuery, DailyTypeEnum
def daily_get(db: Session, item_id):
return db.query(Daily).filter_by(id=item_id).first()
def replace_word(content: str):
word_dic = {"gep": "GEP",
"ppt": "PPT",
"esg": "ESG",
"ppn": "PPN",
"gdp": "GDP",
"eod": "EOD",
"bug": "BUG",
"pdf": "PDF",
"ui": "UI",
"word": "WORD",
"excel": "EXCEL",
"reits": "REITs",
"abs": "ABS",
"oa": "OA",
"issb": "ISSB"}
for k, v in word_dic.items():
content = content.replace(k, v)
return content
def replace_words(content):
word_list = [
"GEP",
"PPT",
"ESG",
"PPN",
"GDP",
"EOD",
"BUG",
"PDF",
"UI",
"WORD",
"EXCEL",
"REITs",
"ABS",
"OA",
"ISSB"
]
# 对词语列表中的每个词,使用正则表达式找到所有的匹配项,并将它们替换为词语列表中的形式
for word in word_list:
content = re.sub(rf"(?<![A-Za-z]){word}(?![A-Za-z])", word, content, flags=re.IGNORECASE)
return content
def daily_add(db: Session, data):
daily = Daily(**data)
if daily.content:
daily.content = replace_words(daily.content)
db.add(daily)
db.commit()
db.refresh(daily)
return daily
def daily_change(db: Session, item_id, data):
content = data.get('content')
if content:
data['content'] = replace_words(content)
db.query(Daily).filter_by(id=item_id).update(data)
db.commit()
item = db.query(Daily).filter_by(id=item_id).first()
return item
def daily_delete(db: Session, item_id):
db.query(Daily).filter_by(id=item_id).delete()
db.commit()
def daily_query(db: Session, params: DailyQuery, token_data: TokenData) -> [int]:
print(params)
department = token_data.department
auth = token_data.auth_data or ''
params_dict = params.dict()
query = db.query(Daily)
db_model = Daily
if auth:
auth_list = auth.split(',')
# 有日报管理权限的可以查看所有 10
# 所有部门动态查看 2
# 动态权限设为了董监高的话,只有自己和董监高可以看
if '10' not in auth_list and '2' not in auth_list:
has_auth = [int(item) for item in auth_list if item in ['17', '18']]
departments = [int(item) for item in department.split(",")]
# 筛选能有权限看的日报
query = query.filter(
or_(*[func.find_in_set(str(item), Daily.required_auth) for item in has_auth],
Daily.required_auth == None, Daily.required_auth == '', Daily.required_auth == '-1',
Daily.fill_user == token_data.email
))
if params.type == DailyTypeEnum.部门子公司日报:
业务部门 = []
# 如果是只能看自己日报的,在业务部门内只能看自己日报
if '20' in auth_list:
for dp in db.query(Department).filter(Department.id.in_(departments)):
if dp.type.value == '业务部门':
业务部门.append(dp.id)
query_department = [d for d in departments if d not in 业务部门]
if query_department:
query = query.filter(or_(Daily.department.in_(query_department), Daily.fill_user == token_data.email
))
# .filter(
# or_(*[func.find_in_set(str(item), Daily.required_auth) for item in has_auth],
# Daily.required_auth == None, Daily.required_auth == '',
# Daily.fill_user == token_data.email
# )
# )
else:
query = query.filter(Daily.fill_user == token_data.email)
# 能看到自己动态 ,但看不到同部门的保密日报
for key, value in params_dict.items():
if value in ['', None, []]:
continue
if key == 'fill_user_name':
query = query.join(User).filter(User.name.like(f'%{value}%'))
continue
if key == 'department' and type(value) == list:
query = query.filter(Daily.department.in_(value))
continue
# if key == 'required_auth':
# if not value:
# continue
# if auth:
# auth_list = auth.split(',')
# # 有日报管理权限的可以查看所有
# if '10' in auth_list:
# continue
# else:
# has_auth = [int(item) for item in auth_list if item in ['17', '18']]
# query = query.filter(or_(*[func.find_in_set(str(item), Daily.required_auth) for item in has_auth],
# Daily.required_auth._in['', None]))
# continue
# print(key, value)
if key not in ['page', 'page_size'] and value is not None:
# 在存储的数组值内查询 如存的 1,2,3,4 查询时则使用的 [1,2]这样的数据来查
# if key in ["xx"]:
# for item in value:
# query = query.filter(func.find_in_set(str(item), getattr(db_model, key)))
# continue
if type(value) == str:
query = query.filter(getattr(db_model, key).like(f'%{value}%'))
elif getattr(db_model, key).type.python_type == datetime and type(value) == int:
query = query.filter(
cast(getattr(db_model, key), DATE) == date.fromtimestamp(value and value / 1000))
print(date.fromtimestamp(value and value / 1000))
elif type(value) in [int, float, bool]:
query = query.filter_by(**{key: value})
# 日期范围查询
elif type(value) in [list, tuple] and getattr(db_model, key).type.python_type == datetime:
if value:
if value[0] is not None:
query = query.filter(getattr(db_model, key) >= datetime.fromtimestamp(value[0] / 1000))
print(datetime.fromtimestamp(value[0] / 1000))
if value[1] is not None:
query = query.filter(getattr(db_model, key) < datetime.fromtimestamp(value[1] / 1000))
print(datetime.fromtimestamp(value[1] / 1000))
else:
query = query.filter(getattr(db_model, key) == value)
# query = query.filter(func.date(getattr(db_model, key)) == value)
query = query.order_by(Daily.daily_time.desc())
count = query.count()
page = None
page_size = None
if 'page' in params_dict:
page = params_dict['page']
if 'page_size' in params_dict:
page_size = params_dict['page_size']
if page is not None and page_size is not None:
query = query.offset((page - 1) * page_size).limit(page_size).all()
# print(query,"daily_query")
return count, query