commercialcompany/App/Router/ModelRouter.py

437 lines
19 KiB
Python
Raw Normal View History

2023-08-04 10:55:05 +08:00
# -*- coding: utf-8 -*-
import json
import os
import numpy as np
import openpyxl
import pandas as pd
import pythoncom
from openpyxl import load_workbook
from openpyxl.utils import get_column_letter
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, BackgroundTasks
from openpyxl.worksheet.datavalidation import DataValidation
from starlette.background import BackgroundTask
from starlette.responses import FileResponse
from win32com.client import Dispatch
from App.Schemas import ModelSchemas
from Utils.DataBase.MongoHelperUtils import MongoHelper, get_mongodb
router = APIRouter(
2023-08-04 11:02:37 +08:00
prefix="/api/model_management/"
2023-08-04 10:55:05 +08:00
)
def del_file(path):
os.remove(path)
def del_files(paths):
for path in paths:
os.remove(path)
# 模型管理流程 -> 上传模型excel文件 -> 保存文件至gridfs -> 下载excel文件至管理端查看 -> 使用excel插件对模型进行修改
# 模型使用流程 -> 读取数据库excel文件 -> pandas清洗excel数据提取问卷数据 -> 前端获取问卷 -> 用户填报 ->
# pandas将模型excel清洗成打分流程需要的数据格式 -> 带入填报数据计算指标数值 -> 带入计算好的指标值传入打分模型得出各指标的得分 ->
# 传入指标总分得出对应级别 -> 接口返回打分结果
@router.post("upload_model_excel", summary="新模型Excel上传", tags=["模型管理"])
async def func(model_name: str = '中小商业企业信用模型', file: UploadFile = File(...),
mongo_db: MongoHelper = Depends(get_mongodb)):
contents = await file.read()
file_id = mongo_db.insert_file(db='中小商会企业信用评级', bucket='模型文件', file=contents, filename=model_name)
mongo_db.insert_data(db='中小商会企业信用评级', col="模型数据", data={"模型名称": model_name, "模型ID": file_id})
return {"message": "文件上传成功", "file_id": str(file_id)}
@router.post("download_model_excel", summary="新模型Excel下载", tags=["模型管理"])
async def func(model_name: str = '中小商业企业信用模型', mongo_db: MongoHelper = Depends(get_mongodb)):
result = mongo_db.search_data(db='中小商会企业信用评级', col='模型数据', cd={"模型名称": model_name})
if not result:
raise HTTPException(status_code=404, detail="模型不存在")
fid = result.get('模型ID')
file = mongo_db.get_file(db='中小商会企业信用评级', bucket='模型文件', fid=str(fid))
file_name = "{}.xlsx".format(model_name)
path = os.path.join(os.getcwd(), 'Utils', 'File', 'generate', file_name)
myfile = open(path, mode='wb')
myfile.write(file)
task = BackgroundTask(del_file, path)
return FileResponse(path, filename=file_name, media_type='application/octet-stream', background=task)
@router.post("save_model_excel", summary="保存修改后的模型excel", tags=["模型管理"])
async def func(model_name: str = '中小商业企业信用模型', file: UploadFile = File(...),
mongo_db: MongoHelper = Depends(get_mongodb)):
# 根据模型名称查询该模型是否存在
result = mongo_db.search_data(db='中小商会企业信用评级', col='模型数据', cd={"模型名称": model_name})
if not result:
raise HTTPException(status_code=404, detail="模型不存在")
fid = result.get('模型ID')
# 删除Gridfs数据
mongo_db.delete_file(db='中小商会企业信用评级', bucket='模型文件', fid=str(fid))
# 存储新文件返回fid再根据name进行更新
contents = await file.read()
file_id = mongo_db.insert_file(db='中小商会企业信用评级', bucket='模型文件', file=contents, filename=model_name)
result = mongo_db.update_data(db='中小商会企业信用评级', col='模型数据', cd={"模型名称": model_name},
ud={"模型ID": file_id})
return {"message": "文件更新成功", "file_id": str(result.get('模型ID'))}
@router.post("new_process_file", summary="新建流程文件", tags=["流程文件管理"])
def func(company: str = '远东资信企业执信有限公司', model_name: str = '中小商业企业信用模型',
mongo_db: MongoHelper = Depends(get_mongodb)):
# 根据模型名称查询该模型是否存在
result = mongo_db.search_data(db='中小商会企业信用评级', col='模型数据', cd={"模型名称": model_name})
if not result:
raise HTTPException(status_code=404, detail="模型不存在")
fid = result.get('模型ID')
file = mongo_db.get_file(db='中小商会企业信用评级', bucket='模型文件', fid=str(fid))
# 保存模型文件流到数据库
file_id = mongo_db.insert_file(db='中小商会企业信用评级', bucket='流程文件', file=file, filename=company)
mongo_db.insert_data(db='中小商会企业信用评级', col='流程数据', data={"企业名称": company, "模型ID": file_id})
return {"message": "流程文件新建成功", "file_id": str(file_id)}
@router.post("delete_process_file", summary="删除流程文件", tags=["流程文件管理"])
def func(company: str = '远东资信企业执信有限公司', mongo_db: MongoHelper = Depends(get_mongodb)):
# 根据processid查询对应fid
result = mongo_db.search_data(db='中小商会企业信用评级', col='流程数据', cd={"企业名称": company})
if not result:
raise HTTPException(status_code=404, detail="模型不存在")
fid = result.get('模型ID')
# 删除Gridfs数据
mongo_db.delete_file(db='中小商会企业信用评级', bucket='流程文件', fid=str(fid))
# 删除数据表对应条目
mongo_db.delete_data(db='中小商会企业信用评级', col='流程数据', cd={"企业名称": company})
return {"message": "删除成功"}
@router.post("download_questionnaire", summary="下载模型问卷", tags=["问卷管理"])
async def func(model_name: str = '中小商业企业信用模型', mongo_db: MongoHelper = Depends(get_mongodb)):
result = mongo_db.search_data(db='中小商会企业信用评级', col='模型数据', cd={"模型名称": model_name})
if not result:
raise HTTPException(status_code=404, detail="模型不存在")
fid = result.get('模型ID')
file = mongo_db.get_file(db='中小商会企业信用评级', bucket='模型文件', fid=str(fid))
file_name = "{}.xlsx".format('填报问卷')
path = os.path.join(os.getcwd(), 'Utils', 'File', 'generate', file_name)
myfile = open(path, mode='wb')
myfile.write(file)
task = BackgroundTask(del_file, path)
wb = load_workbook(path)
# 检查要删除的工作表是否存在
for name in wb.sheetnames:
if '问卷' not in name:
# 获取要删除的工作表
sheet = wb[name]
# 删除工作表
wb.remove(sheet)
# 保存修改后的Excel文件
wb.save(path)
return FileResponse(path, filename=file_name, media_type='application/octet-stream', background=task)
@router.post("upload_questionnaire", summary="模型问卷上传", tags=["问卷管理"])
async def func(background_tasks: BackgroundTasks, company: str = '远东资信企业执信有限公司',
file: UploadFile = File(...),
mongo_db: MongoHelper = Depends(get_mongodb)):
# 将上传的问卷文件保存到本地
temp_file = os.path.join(os.getcwd(), 'Utils', 'File', 'generate', '{}填报问卷.xlsx'.format(company))
with open(temp_file, "wb") as buffer:
buffer.write(await file.read())
# 数据库excel
result = mongo_db.search_data(db='中小商会企业信用评级', col='流程数据', cd={"企业名称": company})
if not result:
raise HTTPException(status_code=404, detail="流程不存在")
fid = result.get('模型ID')
file = mongo_db.get_file(db='中小商会企业信用评级', bucket='流程文件', fid=str(fid))
file_name = "{}.xlsx".format(company)
path = os.path.join(os.getcwd(), 'Utils', 'File', 'generate', file_name)
myfile = open(path, mode='wb')
myfile.write(file)
# 用上传的excel中的sheet去替换对应数据库excel 的sheet
db_workbook = load_workbook(path)
uploaded_workbook = load_workbook(temp_file)
for sheet in uploaded_workbook.sheetnames:
if sheet in db_workbook.sheetnames:
db_workbook.remove(db_workbook[sheet])
new_sheet = db_workbook.create_sheet(sheet)
source_sheet = uploaded_workbook[sheet]
for row in source_sheet:
for cell in row:
new_sheet[cell.coordinate].value = cell.value
db_workbook.save(path)
# 删除Gridfs数据
mongo_db.delete_file(db='中小商会企业信用评级', bucket='流程文件', fid=str(fid))
# 存储新文件返回fid再根据name进行更新
with open(path, 'rb') as f:
file_id = mongo_db.insert_file(db='中小商会企业信用评级', bucket='流程文件', file=f.read(), filename=company)
mongo_db.update_data(db='中小商会企业信用评级', col="流程数据", cd={"企业名称": company}, ud={"模型ID": file_id})
background_tasks.add_task(del_files, [path, temp_file])
return {"message": "上传成功"}
@router.post("download_reference_data", summary="获取背调数据表", tags=["问卷管理"])
async def func(background_tasks: BackgroundTasks, model_name: str = '中小商业企业信用模型',
mongo_db: MongoHelper = Depends(get_mongodb)):
result = mongo_db.search_data(db='中小商会企业信用评级', col='模型数据', cd={"模型名称": model_name})
if not result:
raise HTTPException(status_code=404, detail="模型不存在")
fid = result.get('模型ID')
file = mongo_db.get_file(db='中小商会企业信用评级', bucket='模型文件', fid=str(fid))
file_name = "{}.xlsx".format(model_name)
path = os.path.join(os.getcwd(), 'Utils', 'File', 'generate', file_name)
myfile = open(path, mode='wb')
myfile.write(file)
wb = load_workbook(path)
sheet = wb['背调接口']
# 查找所有下拉选项的单元格
dropdown_cells = {}
for dv in sheet.data_validations.dataValidation:
if isinstance(dv, DataValidation):
cell_range = dv.cells.__str__()
dropdown_cells[cell_range] = dv.formula1.replace('"', "").split(',')
# 单元格位置
cells_position = dict()
df = pd.read_excel(path, sheet_name='背调接口')
df = df.replace([np.inf, -np.inf], np.nan)
df = df.fillna(value="")
for i in range(len(df)):
cell_address = df.iloc[i, 0]
cells_position[cell_address] = get_column_letter(3) + str(i + 2)
# 转换为字典列表
data = df.to_dict(orient='records')
# 转换为json字符串设置ensure_ascii=False来保持非ASCII字符的原始形式
json_data = json.dumps(data, ensure_ascii=False)
# 使用 json.loads() 解析 JSON 字符串
dict_data = json.loads(json_data)
# 判断类型,并添加下拉属性
for index in dict_data:
address = cells_position[index['指标']]
dropdown = dropdown_cells.get(address)
if dropdown:
index['类型'] = '单选'
index['选项'] = dropdown
else:
index['类型'] = '填空'
background_tasks.add_task(del_file, path)
return {
"message": "查询成功",
"data": dict_data
}
@router.post("upload_reference_data", summary="提交背调数据", tags=["问卷管理"])
async def func(background_tasks: BackgroundTasks, schemas: ModelSchemas.ReferencDataBody,
mongo_db: MongoHelper = Depends(get_mongodb)):
company = schemas.company
refrerence_data = schemas.referenc_data
result = mongo_db.search_data(db='中小商会企业信用评级', col='流程数据', cd={"企业名称": company})
if not result:
raise HTTPException(status_code=404, detail="流程不存在")
fid = result.get('模型ID')
file = mongo_db.get_file(db='中小商会企业信用评级', bucket='流程文件', fid=str(fid))
file_name = "{}.xlsx".format(company)
path = os.path.join(os.getcwd(), 'Utils', 'File', 'generate', file_name)
myfile = open(path, mode='wb')
myfile.write(file)
wb = load_workbook(path)
sheet = wb['背调接口']
# 单元格位置
cells_position = dict()
df = pd.read_excel(path, sheet_name='背调接口')
df = df.replace([np.inf, -np.inf], np.nan)
df = df.fillna(value="")
for i in range(len(df)):
cell_address = df.iloc[i, 0]
cells_position[cell_address] = get_column_letter(3) + str(i + 2)
# 处理背调数据格式
refrerence_dict = dict()
for item in refrerence_data:
refrerence_dict[item['指标']] = item.get('数值')
for key, val in cells_position.items():
sheet[val] = refrerence_dict.get(key)
wb.save(path)
# 删除Gridfs数据
mongo_db.delete_file(db='中小商会企业信用评级', bucket='流程文件', fid=str(fid))
# 存储新文件返回fid再根据name进行更新
with open(path, 'rb') as f:
file_id = mongo_db.insert_file(db='中小商会企业信用评级', bucket='流程文件', file=f.read(), filename=company)
mongo_db.update_data(db='中小商会企业信用评级', col="流程数据", cd={"企业名称": company}, ud={"模型ID": file_id})
background_tasks.add_task(del_file, path)
return {"message": "上传成功"}
@router.post("questionnaire_data", summary="查看问卷数据", tags=["问卷管理"])
async def func(company: str = '远东资信企业执信有限公司', mongo_db: MongoHelper = Depends(get_mongodb)):
result = mongo_db.search_data(db='中小商会企业信用评级', col='流程数据', cd={"企业名称": company})
if not result:
raise HTTPException(status_code=404, detail="流程不存在")
fid = result.get('模型ID')
file = mongo_db.get_file(db='中小商会企业信用评级', bucket='流程文件', fid=str(fid))
# 财务问卷
finance_df1 = pd.read_excel(file, sheet_name='财务问卷')
finance_df = finance_df1.dropna(subset=['科目', '2022年', '2021年'])
# 初始化结果字典
finance_data = {
"财务数据": {
"2022年": {},
"2021年": {}
}
}
# 初始化当前报表名
current_report = None
# 遍历DataFrame的行
for _, row in finance_df.iterrows():
report = row['报表']
subject = row['科目']
year_2022 = row['2022年']
year_2021 = row['2021年']
# 如果报表名不是NaN那么就更新当前报表名
if not pd.isna(report):
current_report = report
# 如果当前报表名是None那么跳过这一行
if current_report is None:
continue
# 如果当前报表名还没有添加到结果字典中,那么就添加它
if current_report not in finance_data["财务数据"]["2022年"]:
finance_data["财务数据"]["2022年"][current_report] = {}
finance_data["财务数据"]["2021年"][current_report] = {}
# 添加科目和对应的值到结果字典中
finance_data["财务数据"]["2022年"][current_report][subject] = year_2022
finance_data["财务数据"]["2021年"][current_report][subject] = year_2021
# 添加单位、是否审计和会计事务所到结果字典中
finance_data["单位"] = finance_df1.loc[12, '科目']
finance_data["是否审计"] = finance_df1.loc[13, '科目']
finance_data["会计事务所"] = finance_df1.loc[14, '科目']
business_df = pd.read_excel(file, sheet_name='经营问卷')
business_data = business_df.set_index('问题')['填写(文本类无则不填写)'].to_dict()
for k, v in business_data.items():
if isinstance(v, (float, int, np.number)):
if np.isnan(v):
business_data[k] = None
return {
"message": "查询成功",
"data": {
"财务问卷": finance_data,
"经营问卷": business_data
}
}
@router.post("reference_data", summary="查看背调数据", tags=["问卷管理"])
async def func(company: str = '远东资信企业执信有限公司', mongo_db: MongoHelper = Depends(get_mongodb)):
result = mongo_db.search_data(db='中小商会企业信用评级', col='流程数据', cd={"企业名称": company})
if not result:
raise HTTPException(status_code=404, detail="流程不存在")
fid = result.get('模型ID')
file = mongo_db.get_file(db='中小商会企业信用评级', bucket='流程文件', fid=str(fid))
df = pd.read_excel(file, sheet_name='背调接口')
data = df.set_index('指标')['数值'].to_dict()
for k, v in data.items():
if isinstance(v, (float, int, np.number)):
if np.isnan(v):
data[k] = None
return {
"message": "查询成功",
"data": data
}
@router.post("score_result", summary="评级结果", tags=["评级结果"])
async def func(background_tasks: BackgroundTasks, company: str = '远东资信企业执信有限公司',
mongo_db: MongoHelper = Depends(get_mongodb)):
result = mongo_db.search_data(db='中小商会企业信用评级', col='流程数据', cd={"企业名称": company})
if not result:
raise HTTPException(status_code=404, detail="流程不存在")
fid = result.get('模型ID')
file = mongo_db.get_file(db='中小商会企业信用评级', bucket='流程文件', fid=str(fid))
def create_file(stream):
# 文件流写入文件
file_name = "{}.xlsx".format('填报问卷')
path = os.path.join(os.getcwd(), 'Utils', 'File', 'generate', file_name)
with open(path, mode='wb') as f:
f.write(stream)
return path
def just_open(filename):
# 解决读取公式计算数据为None的问题
pythoncom.CoInitialize()
xlApp = Dispatch("Excel.Application")
xlApp.Visible = False
xlBook = xlApp.Workbooks.Open(filename)
xlBook.Save()
xlBook.Close()
file_path = create_file(file)
just_open(file_path)
wb = openpyxl.load_workbook(file_path, data_only=True)
sheet_01 = wb['中小商业企业信用模型']
index_data = [
{"指标": row[2].value, "数值": row[3].value, "单位": row[4].value,
"权重": row[7].value, "得分": row[8].value} for row in sheet_01.iter_rows()]
index_data = index_data[2:42]
rating_result = {
"级别": sheet_01.cell(43, 6).value,
"总分": sheet_01.cell(43, 9).value
}
background_tasks.add_task(del_file, file_path)
return {
"message": "查询成功",
"data": {
"评级结果": rating_result,
"指标数值与得分": index_data
}
}