import datetime import os.path import re import time from ahocorasick import Automaton from docx.enum.text import WD_PARAGRAPH_ALIGNMENT from docx.oxml.ns import qn from docx.shared import Pt, Length, Inches from docxtpl import DocxTemplate from fastapi import APIRouter, Depends, HTTPException from sqlalchemy import cast, DATE, func, or_, and_ from sqlalchemy.orm import Session from starlette.responses import FileResponse from docx import Document from Crud.UserCrud import get_department_config from Models.DepartmentModel import Department from Models.UserModel import User from Mods.Notice.Utils import DailyNotice, daily_notice from Schemas.DailySchemas import DailyTypeEnum from Schemas.UserSchemas import TokenData from Utils.AuthUtils import token_data_depend, check_auth, registered_depend from Models.DailyModel import Daily from Utils.CrudUtils import auto_create_crud import json from Schemas import DailySchemas from Crud import DailyCrud from Utils.SqlAlchemyUtils import get_db, QueryParams, query_common import pandas as pd router = APIRouter( tags=["日报"], prefix="/api/daily/daily", dependencies=[ Depends(registered_depend) ] ) # crud = auto_create_crud(Daily, 'daily', "日报", auto_create_keys=['create_time', 'update_time', 'id'], # array_keys=['department', 'post']) # crud.mount(router) @router.post("/daily_get", response_model=DailySchemas.DailyGetRes, summary="获取日报") def daily_get(req: DailySchemas.DailyGetReq, db: Session = Depends(get_db), # token_data: TokenData = Depends(registered_depend) ): item = DailyCrud.daily_get(db, req.id) comments = None if item.comments: comments = [DailySchemas.CommentInfo(**comment.to_with_user_dict()) for comment in item.comments] if not item: raise HTTPException(detail="未取到信息") return DailySchemas.DailyGetRes(**item.to_dict(), comments=comments) json_filepath = os.path.join(os.getcwd(), 'Config', 'sensitive_word.json') # 将列表保存为 json 文件 with open(json_filepath, 'r', encoding='utf-8') as json_file: words = json.load(json_file) class SensitiveWordChecker: def __init__(self, sensitive_words): self.automaton = Automaton() for idx, word in enumerate(sensitive_words): self.automaton.add_word(word, (idx, word)) self.automaton.make_automaton() def check(self, text): return {item for _, item in self.automaton.iter(text)} checker = SensitiveWordChecker(words) @router.post("/daily_add", response_model=DailySchemas.DailyAddRes, summary="添加日报") def daily_add(req: DailySchemas.DailyAddReq, db: Session = Depends(get_db), token_data: TokenData = Depends(token_data_depend)): # if db.query(Daily).filter( # Daily.type == DailyTypeEnum.部门子公司日报 and Daily.fill_user == token_data.email and cast( # Daily.daily_time, # DATE) == datetime.date.today()).first(): if req.type== DailyTypeEnum.部门子公司日报 and db.query(Daily).filter( and_(Daily.type == DailyTypeEnum.部门子公司日报, Daily.fill_user == token_data.email), cast( Daily.daily_time, DATE) == datetime.date.today() ).first(): raise HTTPException(detail="今日日报已提交,若需修改请在原日报上修改", status_code=305) if req.content: bad_words = [item[1] for item in checker.check(req.content)] if bad_words: raise HTTPException(detail=f"内容包含敏感词:{','.join(bad_words)}", status_code=305) if req.fill_user != token_data.email: raise HTTPException(detail="填报人与email不符", status_code=305) # 本部门填报权限 # print(token_data.department, token_data, "token_data.departmentxx") # if str(req.department) not in token_data.department and not check_auth(token_data.auth_data, ['7']): # raise HTTPException(detail="没有本部门填报权限", status_code=305) new_daily = DailyCrud.daily_add(db, req.dict()) if req.type == DailyTypeEnum.运行日报: daily_notice.send_on_daily_pdf_upload() return DailySchemas.DailyAddRes(**new_daily.to_dict()) @router.post("/daily_change", response_model=DailySchemas.DailyChangeRes, summary="修改日报") def daily_change(req: DailySchemas.DailyChangeReq, db: Session = Depends(get_db), token_data: TokenData = Depends(registered_depend)): # 本部门填报权限 # if str(req.department) not in token_data.department and not check_auth(token_data.auth_data, ['7']): # raise HTTPException(detail="没有本部门填报权限", status_code=305) new_daily = DailyCrud.daily_change(db, req.id, req.dict()) if not new_daily: raise HTTPException(detail="原日报不存在或已删除",status_code=404) return DailySchemas.DailyChangeRes(**new_daily.to_dict()) @router.post("/daily_query", response_model=DailySchemas.DailyQueryRes, summary="查询日报") def daily_query(req: DailySchemas.DailyQuery, db: Session = Depends(get_db), token_data: TokenData = Depends(registered_depend)): # print(req, "req") # 董监高日报动态查看 = check_auth(token_data.auth_data, [1]) # 所有部门动态查看 = check_auth(token_data.auth_data, [2]) # # if req.type == DailyTypeEnum.董监高日报: # if not 董监高日报动态查看: # raise HTTPException(detail="无董监高日报动态查看权限", status_code=305) # if req.type == DailyTypeEnum.部门子公司日报: # if not 所有部门动态查看: # if not req.department: # raise HTTPException(detail="无所有部门动态查看权限", status_code=305) # if req.department not in [int(item) for item in token_data.department.split(',')]: # raise HTTPException(detail="没有该部门动态查看权限", status_code=305) # same_department = set([req.department]) < set([int(item) for item in token_data.department.split(',')]) # if same_department and not check_auth(token_data.auth_data, [3]): # raise HTTPException(detail="没有本部门动态查看权限", status_code=305) count, items = DailyCrud.daily_query(db, req, token_data) # new_items=[] # for item in items: # try: # new_items.append(DailySchemas.DailyInfo(**item.to_dict())) # except Exception as e: # raise e # print(e) # print(item) # print('xxxxx',item.to_dict()) items = [DailySchemas.DailyInfo(**item.to_dict()) for item in items] return DailySchemas.DailyQueryRes(count=count, items=items) @router.post("/daily_delete", response_model=DailySchemas.DailyDeleteRes, summary="删除日报") def daily_delete(req: DailySchemas.DailyDeleteReq, db: Session = Depends(get_db), ): DailyCrud.daily_delete(db, req.id) return DailySchemas.DailyDeleteRes(msg="删除成功", state=1) @router.post("/daily_export", summary="日报导出") def daily_export(req: QueryParams, db: Session = Depends(get_db)): count, query = query_common(db, Daily, req) query = query.order_by(Daily.daily_time.desc()) department_name_dic = {item.id: item.name for item in db.query(Department).limit(1000).all()} daily_items = [] for item in query: daily_item = {"日报类型": item.type.name, "填报人": item.user_info.name, "部门": department_name_dic.get(item.department), "标题": item.title, "内容": item.content, "填报时间": item.daily_time} daily_items.append(daily_item) temp_path = "static_data/daily_out_temp" if not os.path.exists(temp_path): os.makedirs(temp_path) file_name = f"{time.strftime('%Y%m%d%H%M%S')}.xlsx" file_path = f"{temp_path}/{file_name}" pd.DataFrame(daily_items).to_excel(file_path, index=False) return {"url": "/" + file_path} # return FileResponse( # path=file_path, headers={'Content-Disposition': f'attachment; filename="{file_name}"'}, filename=file_name) # @router.post("/daily_export_to_pdf", summary="每日运行日报导出") # def daily_export_to_pdf(req: DailySchemas.DailyExportToPdfReq, db: Session = Depends(get_db)): # query = db.query(Daily).filter(cast(Daily.daily_time, DATE) == cast(req.day, DATE)) # query = query.order_by(Daily.daily_time.desc()) # department_name_dic = {item.id: item.name for item in db.query(Department).all()} # daily_items = [] # for item in query: # daily_item = {"日报类型": item.type.name, # "填报人": item.user_info.name, # "部门": department_name_dic.get(item.department), "标题": item.title, # "内容": item.content, "填报时间": item.daily_time} # daily_items.append(daily_item) # temp_path = "static_data/daily_out_temp" # if not os.path.exists(temp_path): # os.makedirs(temp_path) # file_name = f"{time.strftime('%Y%m%d%H%M%S')}.xlsx" # file_path = f"{temp_path}/{file_name}" # pd.DataFrame(daily_items).to_excel(file_path, index=False) # return {"url": "/" + file_path} # return FileResponse( # path=file_path, headers={'Content-Disposition': f'attachment; filename="{file_name}"'}, filename=file_name) @router.post("/daily_export_to_pdf", summary="每日运行日报导出") def daily_export_to_pdf(req: DailySchemas.DailyExportToPdfReq, db: Session = Depends(get_db)): req.day = req.day + datetime.timedelta(days=1) query = db.query(Daily).filter(cast(Daily.daily_time, DATE) == cast(req.day, DATE)) query = query.order_by(Daily.daily_time.desc()) department_name_dic = {item.id: item.name for item in db.query(Department).all()} daily_items = dict() daily_items['董监高日报'] = list() daily_items['部门子公司日报'] = list() daily_items['监管和同业动态'] = list() daily_items['行业信息专题分析'] = list() for item in query: daily_item = {"日报类型": item.type.name, "填报人": item.user_info.name, "部门": department_name_dic.get(item.department), "内容": item.content, "分类": item.sub_type, "填报时间": item.daily_time.strftime('%Y-%m-%d %H:%M:%S')} if item.type.name == '董监高日报': daily_items['董监高日报'].append(daily_item) elif item.type.name == '部门子公司日报': daily_items['部门子公司日报'].append(daily_item) elif item.type.name == '监管和同业动态': daily_items['监管和同业动态'].append(daily_item) elif item.type.name == '行业信息专题分析': daily_items['行业信息专题分析'].append(daily_item) with open(os.getcwd() + "\\Config\\word_data_temp.json", encoding='utf-8') as f: temp = json.load(f) department_id = [item.id for item in db.query(Department).filter(Department.type == '董监高')] leaders = db.query(User).filter(or_(*[func.find_in_set(str(d_id), User.department) for d_id in department_id])) config = { '领导动态': [item.name for item in leaders], '部门分类': get_department_config(db) } def generate_template(): """ 根据高管名称、部门数据生成日报模板 """ with open(os.getcwd() + "\\Config\\word_data_template.json", encoding='utf-8') as f: template = json.load(f) def list_write_docx(write_list, tep_doc): for inx in range(len(write_list)): for para in tep_doc.paragraphs: try: # 检查上一段是否包含特定文字 if write_list[inx] in para.text: """ 在给定的段落之后插入一个新的段落 """ new_paragraph = Document().add_paragraph() new_run = new_paragraph.add_run(write_list[inx + 1]) # 设置字体样式 new_run.font.name = '仿宋' # 字体 new_run._element.rPr.rFonts.set(qn('w:eastAsia'), '仿宋') new_run.font.size = Pt(16) # 字号:三号字体对应16磅 new_paragraph.alignment = WD_PARAGRAPH_ALIGNMENT.LEFT # 设置首行缩进:1英寸约等于28.35磅,2字符约等于1/2英寸 new_paragraph.paragraph_format.first_line_indent = Inches(2 * 0.56 / 2.54) # 设置段落前缩进:1英寸约等于2.54厘米 new_paragraph.paragraph_format.left_indent = Inches(1.09 / 2.54) # 设置行距 new_paragraph.paragraph_format.line_spacing = Pt(28) new_element = new_paragraph._element para._element.addnext(new_element) except IndexError: break # 生成领导模板 => 董事长杨秋岭:{{杨秋岭}} position = template.get('领导职位') leader = template.get('公司领导动态') leader_temp = ['公司领导动态'] for le in leader: leader_temp.append(position.get(le) + le + ':' + '{{{{{}}}}}'.format(le)) # 将数据写入docx中 dt_path = os.path.join(os.getcwd(), 'Config', 'daily_out_temp.docx') dt_doc = Document(dt_path) list_write_docx(write_list=leader_temp, tep_doc=dt_doc) # 生成各部门和分子公司动态模板 trends = template.get('各部门和分子公司动态') for key, val in trends.items(): val_list = [key] for index, item in enumerate(val): idx = str(index + 1) + '、' + item val_list.append(idx) val_list.append('{{{{{}}}}}'.format(item)) list_write_docx(write_list=val_list, tep_doc=dt_doc) dt_doc.save(os.path.join(os.getcwd(), 'Config', 'daily_out_temps.docx')) generate_template() def format_conversion(string): if not string: return "" string = string.replace(";", ";") parts = string.split(";") formatted_parts = [] for part in parts: part = part.replace("。", "") split_part = part.split(".", 1) if len(split_part) > 1: formatted_parts.append(split_part[1].strip()) # 拼接处理后的部分 formatted_string = ";".join(formatted_parts) return formatted_string # 处理公司领导动态 lead = temp.get('公司领导动态') for k1 in lead.keys(): lead_data = daily_items.get('董监高日报') for item in lead_data: if item.get('填报人') == k1: content = item.get('内容') res = format_conversion(content) lead[k1] = res + "。" part_dict = { "业务发展部上海一部": "业务发展部(上海)一部", "业务发展部上海二部": "业务发展部(上海)二部", "业务发展部上海三部": "业务发展部(上海)三部", "业务发展部西南部": "业务发展部(西南)部" } # 处理各部门和分子公司动态 department = temp.get('各部门和分子公司动态') for k2 in department.keys(): department[k2] = list() depart_data = daily_items.get('部门子公司日报') for item in depart_data: if k2 in part_dict: copy_k2 = part_dict.get(k2) if item.get('部门') == copy_k2: content = item.get('内容') res = format_conversion(content) department[k2].append(res + '。' + '({}报送)'.format(item.get('填报人'))) else: if item.get('部门') == k2: content = item.get('内容') res = format_conversion(content) department[k2].append(res + '。' + '({}报送)'.format(item.get('填报人'))) if department[k2]: if len(department[k2]) == 1: department[k2] = department[k2][0] else: first = department[k2].pop(0) department[k2] = "\n".join(department[k2]) department[k2] = "\n".join([" " + line for line in department[k2].split("\n")]) department[k2] = first + '\n' + department[k2] else: department[k2] = '未报送' # 监管和同业动态 supervise = temp.get('监管和同业动态') for k3 in supervise.keys(): supervise[k3] = list() supervise_data = daily_items.get('监管和同业动态') for item in supervise_data: if item.get('分类') == k3: content = item.get('内容').replace("\n", "") content = content.replace(' ', '') content = ' '.join(content.split()) content = content + '({}报送)'.format(item.get('填报人')) supervise[k3].append(content) if supervise[k3]: if len(supervise[k3]) == 1: supervise[k3] = supervise[k3][0] else: first = supervise[k3].pop(0) first = '1、' + first for i in range(len(supervise[k3])): num = i + 2 supervise[k3][i] = f'{num}、' + supervise[k3][i] supervise[k3] = "\n".join(supervise[k3]) supervise[k3] = "\n".join([" " + line for line in supervise[k3].split("\n")]) supervise[k3] = first + '\n' + supervise[k3] else: supervise[k3] = '未报送' # 行业信息专题分析 subject = temp.get('行业信息专题分析') for k4 in subject.keys(): subject[k4] = list() subject_data = daily_items.get('行业信息专题分析') for item in subject_data: content = item.get('内容').replace("/n", "") content = content.replace(' ', '') content = ' '.join(content.split()) content = content + '({}报送)'.format(item.get('填报人')) subject[k4].append(content) if subject[k4]: if len(subject[k4]) == 1: subject[k4] = subject[k4][0] else: first = subject[k4].pop(0) first = '1、' + first for i in range(len(subject[k4])): num = i + 2 subject[k4][i] = f'{num}、' + subject[k4][i] subject[k4] = "\n".join(subject[k4]) subject[k4] = "\n".join([" " + line for line in subject[k4].split("\n")]) subject[k4] = first + '\n' + subject[k4] else: subject[k4] = '未报送' merged_dict = { "年": req.day.year, "月": req.day.month, "日": req.day.day } merged_dict.update(department) merged_dict.update(lead) merged_dict.update(supervise) merged_dict.update(subject) # 文件模板 doc_path = os.path.join(os.getcwd(), 'Config', 'daily_out_temps.docx') doc = DocxTemplate(doc_path) doc.render(merged_dict) # today = datetime.date.today() date_string = req.day.strftime('%Y%m%d') file_path = os.path.join(os.getcwd(), 'static_data', 'daily_out_temp', '运行日报{}.docx'.format(date_string)) doc.save(file_path) temp_path = "static_data/daily_out_temp" if not os.path.exists(temp_path): os.makedirs(temp_path) file_name = "运行日报{}.docx".format(date_string) file_path = f"{temp_path}/{file_name}" return {"url": "/" + file_path} @router.post("/sensitive_word_verification", summary="敏感词校验") def daily_export_to_pdf(word_str: str): json_filepath = os.path.join(os.getcwd(), 'Config', 'sensitive_word.json') # 将列表保存为 json 文件 with open(json_filepath, 'r', encoding='utf-8') as json_file: words = json.load(json_file) class SensitiveWordChecker: def __init__(self, sensitive_words): self.automaton = Automaton() for idx, word in enumerate(sensitive_words): self.automaton.add_word(word, (idx, word)) self.automaton.make_automaton() def check(self, text): return {item for _, item in self.automaton.iter(text)} checker = SensitiveWordChecker(words) sensitive_words_in_text = checker.check(word_str) return sensitive_words_in_text