daily/Router/DailyRouter.py

341 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import datetime
import os.path
import time
from docxtpl import DocxTemplate
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import cast, DATE, func, or_
from sqlalchemy.orm import Session
from starlette.responses import FileResponse
from Crud.UserCrud import get_department_config
from Models.DepartmentModel import Department
from Models.UserModel import User
from Mods.Notice.Utils import DailyNotice, daily_notice
from Schemas.DailySchemas import DailyTypeEnum
from Schemas.UserSchemas import TokenData
from Utils.AuthUtils import token_data_depend, check_auth, registered_depend
from Models.DailyModel import Daily
from Utils.CrudUtils import auto_create_crud
import json
from Schemas import DailySchemas
from Crud import DailyCrud
from Utils.SqlAlchemyUtils import get_db, QueryParams, query_common
import pandas as pd
router = APIRouter(
tags=["日报"],
prefix="/api/daily/daily",
# dependencies=[
# Depends(registered_depend)
# ]
)
# crud = auto_create_crud(Daily, 'daily', "日报", auto_create_keys=['create_time', 'update_time', 'id'],
# array_keys=['department', 'post'])
# crud.mount(router)
@router.post("/daily_get", response_model=DailySchemas.DailyGetRes, summary="获取日报")
def daily_get(req: DailySchemas.DailyGetReq, db: Session = Depends(get_db),
# token_data: TokenData = Depends(registered_depend)
):
item = DailyCrud.daily_get(db, req.id)
comments = None
if item.comments:
comments = [DailySchemas.CommentInfo(**comment.to_with_user_dict()) for comment in item.comments]
if not item:
raise HTTPException(detail="未取到信息")
return DailySchemas.DailyGetRes(**item.to_dict(), comments=comments)
@router.post("/daily_add", response_model=DailySchemas.DailyAddRes, summary="添加日报")
def daily_add(req: DailySchemas.DailyAddReq, db: Session = Depends(get_db),
token_data: TokenData = Depends(token_data_depend)):
if req.fill_user != token_data.email:
raise HTTPException(detail="填报人与email不符", status_code=305)
# 本部门填报权限
# print(token_data.department, token_data, "token_data.departmentxx")
# if str(req.department) not in token_data.department and not check_auth(token_data.auth_data, ['7']):
# raise HTTPException(detail="没有本部门填报权限", status_code=305)
new_daily = DailyCrud.daily_add(db, req.dict())
if req.type == DailyTypeEnum.运行日报:
daily_notice.send_on_daily_pdf_upload()
return DailySchemas.DailyAddRes(**new_daily.to_dict())
@router.post("/daily_change", response_model=DailySchemas.DailyChangeRes, summary="修改日报")
def daily_change(req: DailySchemas.DailyChangeReq, db: Session = Depends(get_db),
token_data: TokenData = Depends(registered_depend)):
# 本部门填报权限
# if str(req.department) not in token_data.department and not check_auth(token_data.auth_data, ['7']):
# raise HTTPException(detail="没有本部门填报权限", status_code=305)
new_daily = DailyCrud.daily_change(db, req.id, req.dict())
return DailySchemas.DailyChangeRes(**new_daily.to_dict())
@router.post("/daily_query", response_model=DailySchemas.DailyQueryRes, summary="查询日报")
def daily_query(req: DailySchemas.DailyQuery, db: Session = Depends(get_db),
token_data: TokenData = Depends(registered_depend)):
# print(req, "req")
# 董监高日报动态查看 = check_auth(token_data.auth_data, [1])
# 所有部门动态查看 = check_auth(token_data.auth_data, [2])
#
# if req.type == DailyTypeEnum.董监高日报:
# if not 董监高日报动态查看:
# raise HTTPException(detail="无董监高日报动态查看权限", status_code=305)
# if req.type == DailyTypeEnum.部门子公司日报:
# if not 所有部门动态查看:
# if not req.department:
# raise HTTPException(detail="无所有部门动态查看权限", status_code=305)
# if req.department not in [int(item) for item in token_data.department.split(',')]:
# raise HTTPException(detail="没有该部门动态查看权限", status_code=305)
# same_department = set([req.department]) < set([int(item) for item in token_data.department.split(',')])
# if same_department and not check_auth(token_data.auth_data, [3]):
# raise HTTPException(detail="没有本部门动态查看权限", status_code=305)
count, items = DailyCrud.daily_query(db, req, token_data)
# new_items=[]
# for item in items:
# try:
# new_items.append(DailySchemas.DailyInfo(**item.to_dict()))
# except Exception as e:
# raise e
# print(e)
# print(item)
# print('xxxxx',item.to_dict())
items = [DailySchemas.DailyInfo(**item.to_dict()) for item in items]
return DailySchemas.DailyQueryRes(count=count, items=items)
@router.post("/daily_delete", response_model=DailySchemas.DailyDeleteRes, summary="删除日报")
def daily_delete(req: DailySchemas.DailyDeleteReq, db: Session = Depends(get_db), ):
DailyCrud.daily_delete(db, req.id)
return DailySchemas.DailyDeleteRes(msg="删除成功", state=1)
@router.post("/daily_export", summary="日报导出")
def daily_export(req: QueryParams, db: Session = Depends(get_db)):
count, query = query_common(db, Daily, req)
query = query.order_by(Daily.daily_time.desc())
department_name_dic = {item.id: item.name for item in db.query(Department).all()}
daily_items = []
for item in query:
daily_item = {"日报类型": item.type.name,
"填报人": item.user_info.name,
"部门": department_name_dic.get(item.department), "标题": item.title,
"内容": item.content, "填报时间": item.daily_time}
daily_items.append(daily_item)
temp_path = "static_data/daily_out_temp"
if not os.path.exists(temp_path):
os.makedirs(temp_path)
file_name = f"{time.strftime('%Y%m%d%H%M%S')}.xlsx"
file_path = f"{temp_path}/{file_name}"
pd.DataFrame(daily_items).to_excel(file_path, index=False)
return {"url": "/" + file_path}
# return FileResponse(
# path=file_path, headers={'Content-Disposition': f'attachment; filename="{file_name}"'}, filename=file_name)
# @router.post("/daily_export_to_pdf", summary="每日运行日报导出")
# def daily_export_to_pdf(req: DailySchemas.DailyExportToPdfReq, db: Session = Depends(get_db)):
# query = db.query(Daily).filter(cast(Daily.daily_time, DATE) == cast(req.day, DATE))
# query = query.order_by(Daily.daily_time.desc())
# department_name_dic = {item.id: item.name for item in db.query(Department).all()}
# daily_items = []
# for item in query:
# daily_item = {"日报类型": item.type.name,
# "填报人": item.user_info.name,
# "部门": department_name_dic.get(item.department), "标题": item.title,
# "内容": item.content, "填报时间": item.daily_time}
# daily_items.append(daily_item)
# temp_path = "static_data/daily_out_temp"
# if not os.path.exists(temp_path):
# os.makedirs(temp_path)
# file_name = f"{time.strftime('%Y%m%d%H%M%S')}.xlsx"
# file_path = f"{temp_path}/{file_name}"
# pd.DataFrame(daily_items).to_excel(file_path, index=False)
# return {"url": "/" + file_path}
# return FileResponse(
# path=file_path, headers={'Content-Disposition': f'attachment; filename="{file_name}"'}, filename=file_name)
@router.post("/daily_export_to_pdf", summary="每日运行日报导出")
def daily_export_to_pdf(req: DailySchemas.DailyExportToPdfReq, db: Session = Depends(get_db)):
req.day = req.day + datetime.timedelta(days=1)
print(req)
query = db.query(Daily).filter(cast(Daily.daily_time, DATE) == cast(req.day, DATE))
query = query.order_by(Daily.daily_time.desc())
department_name_dic = {item.id: item.name for item in db.query(Department).all()}
daily_items = dict()
daily_items['董监高日报'] = list()
daily_items['部门子公司日报'] = list()
daily_items['监管和同业动态'] = list()
daily_items['行业信息专题分析'] = list()
for item in query:
daily_item = {"日报类型": item.type.name,
"填报人": item.user_info.name,
"部门": department_name_dic.get(item.department),
"内容": item.content,
"填报时间": item.daily_time.strftime('%Y-%m-%d %H:%M:%S')}
if item.type.name == '董监高日报':
daily_items['董监高日报'].append(daily_item)
elif item.type.name == '部门子公司日报':
daily_items['部门子公司日报'].append(daily_item)
elif item.type.name == '监管和同业动态':
daily_items['监管和同业动态'].append(daily_item)
elif item.type.name == '行业信息专题分析':
daily_items['行业信息专题分析'].append(daily_item)
with open(os.getcwd() + "\\Config\\word_data_temp.json", encoding='utf-8') as f:
temp = json.load(f)
董监高部门id = [item.id for item in db.query(Department).filter(Department.type == '董监高')]
领导列表 = db.query(User).filter(or_(*[func.find_in_set(str(d_id), User.department) for d_id in
董监高部门id]))
conf = {
"监管和同业动态": {
"监管动态": "",
"同业动态": ""
},
"行业信息专题分析": {
"行业信息专题分析": ""
},
'领导动态': [item.name for item in 领导列表],
'部门分类': get_department_config(db)
}
def format_conversion(string):
if not string:
return ""
string = string.replace(";", "")
parts = string.split("")
formatted_parts = []
for part in parts:
part = part.replace("", "")
split_part = part.split(".", 1)
if len(split_part) > 1:
formatted_parts.append(split_part[1].strip())
# 拼接处理后的部分
formatted_string = "".join(formatted_parts)
return formatted_string
# 处理公司领导动态
lead = temp.get('公司领导动态')
for k1 in lead.keys():
lead_data = daily_items.get('董监高日报')
for item in lead_data:
if item.get('填报人') == k1:
content = item.get('内容')
res = format_conversion(content)
lead[k1] = res + ""
part_dict = {
"业务发展部上海一部": "业务发展部(上海)一部",
"业务发展部上海二部": "业务发展部(上海)二部",
"业务发展部上海三部": "业务发展部(上海)三部",
"业务发展部西南部": "业务发展部(西南)部"
}
# 处理各部门和分子公司动态
department = temp.get('各部门和分子公司动态')
for k2 in department.keys():
department[k2] = list()
depart_data = daily_items.get('部门子公司日报')
for item in depart_data:
if k2 in part_dict:
copy_k2 = part_dict.get(k2)
if item.get('部门') == copy_k2:
content = item.get('内容')
res = format_conversion(content)
department[k2].append(res + '' + '({}报送)'.format(item.get('填报人')))
else:
if item.get('部门') == k2:
content = item.get('内容')
res = format_conversion(content)
department[k2].append(res + '' + '({}报送)'.format(item.get('填报人')))
if department[k2]:
if len(department[k2]) == 1:
department[k2] = department[k2][0]
else:
first = department[k2].pop(0)
department[k2] = "\n".join(department[k2])
department[k2] = "\n".join([" " + line for line in department[k2].split("\n")])
department[k2] = first + '\n' + department[k2]
else:
department[k2] = '未报送'
# 监管和同业动态
supervise = temp.get('监管和同业动态')
for k3 in supervise.keys():
supervise[k3] = list()
supervise_data = daily_items.get('监管和同业动态')
for item in supervise_data:
if item.get('日报类型') == k3:
content = item.get('内容').replace("/n", "")
content = content + '({}报送)'.format(item.get('填报人'))
supervise[k3].append(content)
if supervise[k3]:
if len(supervise[k3]) == 1:
supervise[k3] = supervise[k3][0]
else:
first = supervise[k3].pop(0)
first = '1、' + first
for i in range(len(supervise[k3])):
num = i + 2
supervise[k3][i] = f'{num}' + supervise[k3][i]
supervise[k3] = "\n".join(supervise[k3])
supervise[k3] = "\n".join([" " + line for line in supervise[k3].split("\n")])
supervise[k3] = first + '\n' + supervise[k3]
else:
supervise[k3] = '未报送'
# 行业信息专题分析
subject = temp.get('行业信息专题分析')
for k4 in subject.keys():
subject[k4] = list()
subject_data = daily_items.get('行业信息专题分析')
for item in subject_data:
content = item.get('内容').replace("/n", "")
content = content + '({}报送)'.format(item.get('填报人'))
subject[k4].append(content)
if subject[k4]:
if len(subject[k4]) == 1:
subject[k4] = subject[k4][0]
else:
first = subject[k4].pop(0)
first = '1、' + first
for i in range(len(subject[k4])):
num = i + 2
subject[k4][i] = f'{num}' + subject[k4][i]
subject[k4] = "\n".join(subject[k4])
subject[k4] = "\n".join([" " + line for line in subject[k4].split("\n")])
subject[k4] = first + '\n' + subject[k4]
else:
subject[k4] = '未报送'
merged_dict = {
"": req.day.year,
"": req.day.month,
"": req.day.day
}
merged_dict.update(department)
merged_dict.update(lead)
merged_dict.update(supervise)
merged_dict.update(subject)
# 文件模板
doc_path = os.path.join(os.getcwd(), 'Config', 'daily_out_template.docx')
doc = DocxTemplate(doc_path)
doc.render(merged_dict)
# today = datetime.date.today()
date_string = req.day.strftime('%Y%m%d')
file_path = os.path.join(os.getcwd(), 'static_data', 'daily_out_temp', '运行日报{}.docx'.format(date_string))
doc.save(file_path)
temp_path = "static_data/daily_out_temp"
if not os.path.exists(temp_path):
os.makedirs(temp_path)
file_name = "运行日报{}.docx".format(date_string)
file_path = f"{temp_path}/{file_name}"
return {"url": "/" + file_path}