526 lines
22 KiB
Python
526 lines
22 KiB
Python
# -*- coding: utf-8 -*-
|
||
import json
|
||
import os
|
||
import numpy as np
|
||
import openpyxl
|
||
import pandas as pd
|
||
import xlwings as xw
|
||
from openpyxl import load_workbook
|
||
from openpyxl.utils import get_column_letter
|
||
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, BackgroundTasks
|
||
from openpyxl.worksheet.datavalidation import DataValidation
|
||
from starlette.background import BackgroundTask
|
||
from starlette.responses import FileResponse
|
||
|
||
from App.Schemas import ModelSchemas
|
||
from Utils.DataBase.MongoHelperUtils import MongoHelper, get_mongodb
|
||
|
||
router = APIRouter(
|
||
prefix="/api/model_management"
|
||
)
|
||
|
||
|
||
def del_file(path):
|
||
os.remove(path)
|
||
|
||
|
||
def del_files(paths):
|
||
for path in paths:
|
||
os.remove(path)
|
||
|
||
|
||
# 模型管理流程 -> 上传模型excel文件 -> 保存文件至gridfs -> 下载excel文件至管理端查看 -> 使用excel插件对模型进行修改
|
||
|
||
# 模型使用流程 -> 读取数据库excel文件 -> pandas清洗excel数据提取问卷数据 -> 前端获取问卷 -> 用户填报 ->
|
||
# pandas将模型excel清洗成打分流程需要的数据格式 -> 带入填报数据计算指标数值 -> 带入计算好的指标值传入打分模型得出各指标的得分 ->
|
||
# 传入指标总分得出对应级别 -> 接口返回打分结果
|
||
|
||
|
||
@router.post("/upload_model_excel", summary="新模型Excel上传", tags=["模型管理"])
|
||
async def func(model_name: str = '中小商业企业信用模型', file: UploadFile = File(...),
|
||
mongo_db: MongoHelper = Depends(get_mongodb)):
|
||
contents = await file.read()
|
||
file_id = mongo_db.insert_file(db='中小商会企业信用评级', bucket='模型文件', file=contents, filename=model_name)
|
||
mongo_db.insert_data(db='中小商会企业信用评级', col="模型数据", data={"模型名称": model_name, "模型ID": file_id})
|
||
return {"message": "文件上传成功", "file_id": str(file_id)}
|
||
|
||
|
||
@router.post("/download_model_excel", summary="新模型Excel下载", tags=["模型管理"])
|
||
async def func(model_name: str = '中小商业企业信用模型', mongo_db: MongoHelper = Depends(get_mongodb)):
|
||
result = mongo_db.search_data(db='中小商会企业信用评级', col='模型数据', cd={"模型名称": model_name})
|
||
if not result:
|
||
raise HTTPException(status_code=404, detail="模型不存在")
|
||
fid = result.get('模型ID')
|
||
file = mongo_db.get_file(db='中小商会企业信用评级', bucket='模型文件', fid=str(fid))
|
||
file_name = "{}.xlsx".format(model_name)
|
||
path = os.path.join(os.getcwd(), 'Utils', 'File', 'generate', file_name)
|
||
myfile = open(path, mode='wb')
|
||
myfile.write(file)
|
||
task = BackgroundTask(del_file, path)
|
||
return FileResponse(path, filename=file_name, media_type='application/octet-stream', background=task)
|
||
|
||
|
||
@router.post("/save_model_excel", summary="保存修改后的模型excel", tags=["模型管理"])
|
||
async def func(model_name: str = '中小商业企业信用模型', file: UploadFile = File(...),
|
||
mongo_db: MongoHelper = Depends(get_mongodb)):
|
||
# 根据模型名称查询该模型是否存在
|
||
result = mongo_db.search_data(db='中小商会企业信用评级', col='模型数据', cd={"模型名称": model_name})
|
||
if not result:
|
||
raise HTTPException(status_code=404, detail="模型不存在")
|
||
fid = result.get('模型ID')
|
||
|
||
# 删除Gridfs数据
|
||
mongo_db.delete_file(db='中小商会企业信用评级', bucket='模型文件', fid=str(fid))
|
||
|
||
# 存储新文件返回fid再根据name进行更新
|
||
contents = await file.read()
|
||
file_id = mongo_db.insert_file(db='中小商会企业信用评级', bucket='模型文件', file=contents, filename=model_name)
|
||
result = mongo_db.update_data(db='中小商会企业信用评级', col='模型数据', cd={"模型名称": model_name},
|
||
ud={"模型ID": file_id})
|
||
return {"message": "文件更新成功", "file_id": str(result.get('模型ID'))}
|
||
|
||
|
||
@router.post("/new_process_file", summary="新建流程文件", tags=["流程文件管理"], include_in_schema=False)
|
||
def func(pid: int = 123456, model_name: str = '中小商业企业信用模型',
|
||
mongo_db: MongoHelper = Depends(get_mongodb)):
|
||
# 根据模型名称查询该模型是否存在
|
||
result = mongo_db.search_data(db='中小商会企业信用评级', col='模型数据', cd={"模型名称": model_name})
|
||
if not result:
|
||
raise HTTPException(status_code=404, detail="模型不存在")
|
||
fid = result.get('模型ID')
|
||
file = mongo_db.get_file(db='中小商会企业信用评级', bucket='模型文件', fid=str(fid))
|
||
|
||
# 保存模型文件流到数据库
|
||
file_id = mongo_db.insert_file(db='中小商会企业信用评级', bucket='流程文件', file=file, filename=pid)
|
||
mongo_db.insert_data(db='中小商会企业信用评级', col='流程数据', data={"流程ID": pid, "模型ID": file_id})
|
||
return {"message": "流程文件新建成功", "file_id": str(file_id)}
|
||
|
||
|
||
@router.post("/delete_process_file", summary="删除流程文件", tags=["流程文件管理"], include_in_schema=False)
|
||
def func(pid: int = 123456, mongo_db: MongoHelper = Depends(get_mongodb)):
|
||
# 根据processid查询对应fid
|
||
result = mongo_db.search_data(db='中小商会企业信用评级', col='流程数据', cd={"流程ID": pid})
|
||
if not result:
|
||
raise HTTPException(status_code=404, detail="模型不存在")
|
||
fid = result.get('模型ID')
|
||
|
||
# 删除Gridfs数据
|
||
mongo_db.delete_file(db='中小商会企业信用评级', bucket='流程文件', fid=str(fid))
|
||
# 删除数据表对应条目
|
||
mongo_db.delete_data(db='中小商会企业信用评级', col='流程数据', cd={"流程ID": pid})
|
||
return {"message": "删除成功"}
|
||
|
||
|
||
@router.post("/download_questionnaire", summary="下载模型问卷", tags=["问卷管理"])
|
||
async def func(model_name: str = '中小商业企业信用模型', mongo_db: MongoHelper = Depends(get_mongodb)):
|
||
result = mongo_db.search_data(db='中小商会企业信用评级', col='模型数据', cd={"模型名称": model_name})
|
||
if not result:
|
||
raise HTTPException(status_code=404, detail="模型不存在")
|
||
fid = result.get('模型ID')
|
||
file = mongo_db.get_file(db='中小商会企业信用评级', bucket='模型文件', fid=str(fid))
|
||
file_name = "{}.xlsx".format('填报问卷')
|
||
path = os.path.join(os.getcwd(), 'Utils', 'File', 'generate', file_name)
|
||
myfile = open(path, mode='wb')
|
||
myfile.write(file)
|
||
task = BackgroundTask(del_file, path)
|
||
|
||
wb = load_workbook(path)
|
||
|
||
# 检查要删除的工作表是否存在
|
||
for name in wb.sheetnames:
|
||
if '问卷' not in name:
|
||
# 获取要删除的工作表
|
||
sheet = wb[name]
|
||
# 删除工作表
|
||
wb.remove(sheet)
|
||
# 保存修改后的Excel文件
|
||
wb.save(path)
|
||
|
||
return FileResponse(path, filename=file_name, media_type='application/octet-stream', background=task)
|
||
|
||
|
||
@router.post("/upload_questionnaire", summary="模型问卷上传", tags=["问卷管理"], include_in_schema=False)
|
||
async def func(background_tasks: BackgroundTasks, pid: int = 123456, file: UploadFile = File(...),
|
||
mongo_db: MongoHelper = Depends(get_mongodb)):
|
||
# 将上传的问卷文件保存到本地
|
||
temp_file = os.path.join(os.getcwd(), 'Utils', 'File', 'generate', '{}填报问卷.xlsx'.format(pid))
|
||
with open(temp_file, "wb") as buffer:
|
||
buffer.write(await file.read())
|
||
|
||
# 数据库excel
|
||
result = mongo_db.search_data(db='中小商会企业信用评级', col='流程数据', cd={"流程ID": pid})
|
||
if not result:
|
||
raise HTTPException(status_code=404, detail="流程不存在")
|
||
fid = result.get('模型ID')
|
||
file = mongo_db.get_file(db='中小商会企业信用评级', bucket='流程文件', fid=str(fid))
|
||
file_name = "{}.xlsx".format(pid)
|
||
path = os.path.join(os.getcwd(), 'Utils', 'File', 'generate', file_name)
|
||
myfile = open(path, mode='wb')
|
||
myfile.write(file)
|
||
|
||
# 用上传的excel中的sheet去替换对应数据库excel 的sheet
|
||
db_workbook = load_workbook(path)
|
||
uploaded_workbook = load_workbook(temp_file)
|
||
|
||
for sheet in uploaded_workbook.sheetnames:
|
||
if sheet in db_workbook.sheetnames:
|
||
db_workbook.remove(db_workbook[sheet])
|
||
new_sheet = db_workbook.create_sheet(sheet)
|
||
source_sheet = uploaded_workbook[sheet]
|
||
for row in source_sheet:
|
||
for cell in row:
|
||
new_sheet[cell.coordinate].value = cell.value
|
||
|
||
db_workbook.save(path)
|
||
|
||
# 删除Gridfs数据
|
||
mongo_db.delete_file(db='中小商会企业信用评级', bucket='流程文件', fid=str(fid))
|
||
|
||
# 存储新文件返回fid再根据name进行更新
|
||
with open(path, 'rb') as f:
|
||
file_id = mongo_db.insert_file(db='中小商会企业信用评级', bucket='流程文件', file=f.read(), filename=pid)
|
||
mongo_db.update_data(db='中小商会企业信用评级', col="流程数据", cd={"流程ID": pid}, ud={"模型ID": file_id})
|
||
|
||
background_tasks.add_task(del_files, [path, temp_file])
|
||
|
||
return {"message": "上传成功"}
|
||
|
||
|
||
@router.post("/download_reference_data", summary="获取背调数据表", tags=["问卷管理"])
|
||
async def func(background_tasks: BackgroundTasks, model_name: str = '中小商业企业信用模型',
|
||
mongo_db: MongoHelper = Depends(get_mongodb)):
|
||
result = mongo_db.search_data(db='中小商会企业信用评级', col='模型数据', cd={"模型名称": model_name})
|
||
if not result:
|
||
raise HTTPException(status_code=404, detail="模型不存在")
|
||
fid = result.get('模型ID')
|
||
file = mongo_db.get_file(db='中小商会企业信用评级', bucket='模型文件', fid=str(fid))
|
||
file_name = "{}.xlsx".format(model_name)
|
||
path = os.path.join(os.getcwd(), 'Utils', 'File', 'generate', file_name)
|
||
myfile = open(path, mode='wb')
|
||
myfile.write(file)
|
||
|
||
wb = load_workbook(path)
|
||
sheet = wb['背调接口']
|
||
|
||
# 查找所有下拉选项的单元格
|
||
dropdown_cells = {}
|
||
for dv in sheet.data_validations.dataValidation:
|
||
if isinstance(dv, DataValidation):
|
||
cell_range = dv.cells.__str__()
|
||
dropdown_cells[cell_range] = dv.formula1.replace('"', "").split(',')
|
||
|
||
# 单元格位置
|
||
cells_position = dict()
|
||
df = pd.read_excel(path, sheet_name='背调接口')
|
||
df = df.replace([np.inf, -np.inf], np.nan)
|
||
df = df.fillna(value="")
|
||
for i in range(len(df)):
|
||
cell_address = df.iloc[i, 0]
|
||
cells_position[cell_address] = get_column_letter(3) + str(i + 2)
|
||
|
||
# 转换为字典列表
|
||
data = df.to_dict(orient='records')
|
||
|
||
# 转换为json字符串,设置ensure_ascii=False来保持非ASCII字符的原始形式
|
||
json_data = json.dumps(data, ensure_ascii=False)
|
||
|
||
# 使用 json.loads() 解析 JSON 字符串
|
||
dict_data = json.loads(json_data)
|
||
|
||
# 判断类型,并添加下拉属性
|
||
for index in dict_data:
|
||
address = cells_position[index['指标']]
|
||
dropdown = dropdown_cells.get(address)
|
||
if dropdown:
|
||
index['类型'] = '单选'
|
||
index['选项'] = dropdown
|
||
else:
|
||
index['类型'] = '填空'
|
||
|
||
background_tasks.add_task(del_file, path)
|
||
|
||
return {
|
||
"message": "查询成功",
|
||
"data": dict_data
|
||
}
|
||
|
||
|
||
@router.post("/upload_model_data", summary="提交模型打分所需数据返回评级结果", tags=["评级结果"])
|
||
async def func(background_tasks: BackgroundTasks, schemas: ModelSchemas.ModelScoreData,
|
||
mongo_db: MongoHelper = Depends(get_mongodb)):
|
||
# 查询模型文件并保存在项目本地
|
||
result = mongo_db.search_data(db='中小商会企业信用评级', col='模型数据', cd={"模型名称": '中小商业企业信用模型'})
|
||
if not result:
|
||
raise HTTPException(status_code=404, detail="该模型不存在")
|
||
fid = result.get('模型ID')
|
||
file = mongo_db.get_file(db='中小商会企业信用评级', bucket='模型文件', fid=str(fid))
|
||
file_name = "{}.xlsx".format('中小商业企业信用模型2')
|
||
path = os.path.join(os.getcwd(), 'Utils', 'File', 'generate', file_name)
|
||
myfile = open(path, mode='wb')
|
||
myfile.write(file)
|
||
wb = load_workbook(path)
|
||
|
||
# 将经营问卷填入文件
|
||
business_sheet = wb['经营问卷']
|
||
business_data = schemas.经营问卷.dict()
|
||
business_sheet_rows = business_sheet.max_row
|
||
for i in range(2, business_sheet_rows + 1):
|
||
key_cell = 'A{}'.format(i)
|
||
val_cell = 'D{}'.format(i)
|
||
cell_name = business_sheet[key_cell].value
|
||
if cell_name == '对外服务总次数(包括电话咨询、客户支持、产品售后等)':
|
||
value = business_data.get('对外服务总次数')
|
||
else:
|
||
value = business_data.get(cell_name)
|
||
business_sheet[val_cell] = value
|
||
|
||
# 财务问卷填入文件
|
||
financial_sheet = wb['财务问卷']
|
||
financial_data = schemas.财务问卷.dict()
|
||
financial_sheet_rows = financial_sheet.max_row - 5
|
||
for j in range(2, financial_sheet_rows + 1):
|
||
key_cell = 'B{}'.format(j)
|
||
val1_cell = 'C{}'.format(j)
|
||
val2_cell = 'D{}'.format(j)
|
||
val3_cell = 'E{}'.format(j)
|
||
cell_name = financial_sheet[key_cell].value
|
||
if cell_name == '减:累计折旧':
|
||
value = financial_data.get('累计折旧')
|
||
else:
|
||
value = financial_data.get(cell_name)
|
||
financial_sheet[val1_cell] = value[0]
|
||
financial_sheet[val2_cell] = value[1]
|
||
financial_sheet[val3_cell] = value[2]
|
||
audit = financial_data.get('是否审计')
|
||
financial_sheet['B35'] = audit
|
||
firm = financial_data.get('会计事务所')
|
||
financial_sheet['B36'] = firm
|
||
|
||
# 背调数据填入文件
|
||
backtrack_sheet = wb['背调接口']
|
||
backtrack_data = schemas.背调接口.dict()
|
||
backtrack_sheet_rows = backtrack_sheet.max_row
|
||
for i in range(2, backtrack_sheet_rows + 1):
|
||
key_cell = 'A{}'.format(i)
|
||
val_cell = 'C{}'.format(i)
|
||
cell_name = backtrack_sheet[key_cell].value
|
||
if cell_name == '开庭公告(被告-合同纠纷、劳动争议)':
|
||
value = backtrack_data.get('开庭公告被告合同纠纷劳动争议')
|
||
elif cell_name == '行政处罚(警告、通报批评、罚款)':
|
||
value = backtrack_data.get('行政处罚警告通报批评罚款')
|
||
elif cell_name == '行政处罚(没收违法所得、没收非法财务...)':
|
||
value = backtrack_data.get('行政处罚没收违法所得没收非法财务')
|
||
else:
|
||
value = backtrack_data.get(cell_name)
|
||
backtrack_sheet[val_cell] = value
|
||
wb.save('1.xlsx')
|
||
|
||
app = xw.App(visible=False, add_book=False)
|
||
# 不显示Excel消息框
|
||
app.display_alerts = False
|
||
# 关闭屏幕更新,可加快宏的执行速度
|
||
app.screen_updating = False
|
||
xwb = app.books.open('1.xlsx')
|
||
xwb.save()
|
||
xwb.close()
|
||
app.quit()
|
||
|
||
wb = load_workbook(path, data_only=True)
|
||
sheet_01 = wb['中小商业企业信用模型']
|
||
index_data = [
|
||
{"指标": row[2].value, "数值": row[3].value, "单位": row[4].value,
|
||
"权重": row[7].value, "得分": row[8].value} for row in sheet_01.iter_rows()]
|
||
index_data = index_data[2:42]
|
||
|
||
rating_result = {
|
||
"级别": sheet_01.cell(43, 6).value,
|
||
"总分": round(sheet_01.cell(43, 9).value, 2)
|
||
}
|
||
|
||
background_tasks.add_task(del_file, path)
|
||
return {
|
||
"message": "查询成功",
|
||
"data": {
|
||
"评级结果": rating_result,
|
||
"指标数值与得分": index_data
|
||
}
|
||
}
|
||
|
||
|
||
@router.post("/upload_reference_data", summary="提交背调数据", tags=["问卷管理"], include_in_schema=False)
|
||
async def func(background_tasks: BackgroundTasks, schemas: ModelSchemas.ReferencDataBody,
|
||
mongo_db: MongoHelper = Depends(get_mongodb)):
|
||
pid = schemas.pid
|
||
refrerence_data = schemas.referenc_data
|
||
result = mongo_db.search_data(db='中小商会企业信用评级', col='流程数据', cd={"流程ID": pid})
|
||
if not result:
|
||
raise HTTPException(status_code=404, detail="流程不存在")
|
||
fid = result.get('模型ID')
|
||
file = mongo_db.get_file(db='中小商会企业信用评级', bucket='流程文件', fid=str(fid))
|
||
file_name = "{}.xlsx".format(pid)
|
||
path = os.path.join(os.getcwd(), 'Utils', 'File', 'generate', file_name)
|
||
myfile = open(path, mode='wb')
|
||
myfile.write(file)
|
||
|
||
wb = load_workbook(path)
|
||
sheet = wb['背调接口']
|
||
|
||
# 单元格位置
|
||
cells_position = dict()
|
||
df = pd.read_excel(path, sheet_name='背调接口')
|
||
df = df.replace([np.inf, -np.inf], np.nan)
|
||
df = df.fillna(value="")
|
||
for i in range(len(df)):
|
||
cell_address = df.iloc[i, 0]
|
||
cells_position[cell_address] = get_column_letter(3) + str(i + 2)
|
||
|
||
# 处理背调数据格式
|
||
refrerence_dict = dict()
|
||
for item in refrerence_data:
|
||
refrerence_dict[item['指标']] = item.get('数值')
|
||
|
||
for key, val in cells_position.items():
|
||
sheet[val] = refrerence_dict.get(key)
|
||
|
||
wb.save(path)
|
||
|
||
# 删除Gridfs数据
|
||
mongo_db.delete_file(db='中小商会企业信用评级', bucket='流程文件', fid=str(fid))
|
||
|
||
# 存储新文件返回fid再根据name进行更新
|
||
with open(path, 'rb') as f:
|
||
file_id = mongo_db.insert_file(db='中小商会企业信用评级', bucket='流程文件', file=f.read(), filename=pid)
|
||
mongo_db.update_data(db='中小商会企业信用评级', col="流程数据", cd={"流程ID": pid}, ud={"模型ID": file_id})
|
||
|
||
background_tasks.add_task(del_file, path)
|
||
|
||
return {"message": "上传成功"}
|
||
|
||
|
||
@router.post("/questionnaire_data", summary="查看问卷数据", tags=["问卷管理"], include_in_schema=False)
|
||
async def func(pid: int = 123456, mongo_db: MongoHelper = Depends(get_mongodb)):
|
||
result = mongo_db.search_data(db='中小商会企业信用评级', col='流程数据', cd={"流程ID": pid})
|
||
if not result:
|
||
raise HTTPException(status_code=404, detail="流程不存在")
|
||
fid = result.get('模型ID')
|
||
file = mongo_db.get_file(db='中小商会企业信用评级', bucket='流程文件', fid=str(fid))
|
||
|
||
# 财务问卷
|
||
finance_df1 = pd.read_excel(file, sheet_name='财务问卷')
|
||
finance_df = finance_df1.dropna(subset=['科目', '2022年', '2021年'])
|
||
# 初始化结果字典
|
||
finance_data = {
|
||
"财务数据": {
|
||
"2022年": {},
|
||
"2021年": {}
|
||
}
|
||
}
|
||
|
||
# 初始化当前报表名
|
||
current_report = None
|
||
|
||
# 遍历DataFrame的行
|
||
for _, row in finance_df.iterrows():
|
||
report = row['报表']
|
||
subject = row['科目']
|
||
year_2022 = row['2022年']
|
||
year_2021 = row['2021年']
|
||
|
||
# 如果报表名不是NaN,那么就更新当前报表名
|
||
if not pd.isna(report):
|
||
current_report = report
|
||
|
||
# 如果当前报表名是None,那么跳过这一行
|
||
if current_report is None:
|
||
continue
|
||
|
||
# 如果当前报表名还没有添加到结果字典中,那么就添加它
|
||
if current_report not in finance_data["财务数据"]["2022年"]:
|
||
finance_data["财务数据"]["2022年"][current_report] = {}
|
||
finance_data["财务数据"]["2021年"][current_report] = {}
|
||
|
||
# 添加科目和对应的值到结果字典中
|
||
finance_data["财务数据"]["2022年"][current_report][subject] = year_2022
|
||
finance_data["财务数据"]["2021年"][current_report][subject] = year_2021
|
||
|
||
# 添加单位、是否审计和会计事务所到结果字典中
|
||
finance_data["单位"] = finance_df1.loc[12, '科目']
|
||
finance_data["是否审计"] = finance_df1.loc[13, '科目']
|
||
finance_data["会计事务所"] = finance_df1.loc[14, '科目']
|
||
|
||
business_df = pd.read_excel(file, sheet_name='经营问卷')
|
||
business_data = business_df.set_index('问题')['填写(文本类无则不填写)'].to_dict()
|
||
for k, v in business_data.items():
|
||
if isinstance(v, (float, int, np.number)):
|
||
if np.isnan(v):
|
||
business_data[k] = None
|
||
return {
|
||
"message": "查询成功",
|
||
"data": {
|
||
"财务问卷": finance_data,
|
||
"经营问卷": business_data
|
||
}
|
||
}
|
||
|
||
|
||
@router.post("/reference_data", summary="查看背调数据", tags=["问卷管理"], include_in_schema=False)
|
||
async def func(pid: int = 123456, mongo_db: MongoHelper = Depends(get_mongodb)):
|
||
result = mongo_db.search_data(db='中小商会企业信用评级', col='流程数据', cd={"流程ID": pid})
|
||
if not result:
|
||
raise HTTPException(status_code=404, detail="流程不存在")
|
||
fid = result.get('模型ID')
|
||
file = mongo_db.get_file(db='中小商会企业信用评级', bucket='流程文件', fid=str(fid))
|
||
|
||
df = pd.read_excel(file, sheet_name='背调接口')
|
||
data = df.set_index('指标')['数值'].to_dict()
|
||
for k, v in data.items():
|
||
if isinstance(v, (float, int, np.number)):
|
||
if np.isnan(v):
|
||
data[k] = None
|
||
return {
|
||
"message": "查询成功",
|
||
"data": data
|
||
}
|
||
|
||
|
||
@router.post("/score_result", summary="评级结果", tags=["评级结果"])
|
||
async def func(background_tasks: BackgroundTasks, pid: int = 123456,
|
||
mongo_db: MongoHelper = Depends(get_mongodb)):
|
||
result = mongo_db.search_data(db='中小商会企业信用评级', col='流程数据', cd={"流程ID": pid})
|
||
if not result:
|
||
raise HTTPException(status_code=404, detail="流程不存在")
|
||
fid = result.get('模型ID')
|
||
file = mongo_db.get_file(db='中小商会企业信用评级', bucket='流程文件', fid=str(fid))
|
||
|
||
def create_file(stream):
|
||
# 文件流写入文件
|
||
file_name = "{}.xlsx".format('填报问卷')
|
||
path = os.path.join(os.getcwd(), 'Utils', 'File', 'generate', file_name)
|
||
with open(path, mode='wb') as f:
|
||
f.write(stream)
|
||
return path
|
||
|
||
file_path = create_file(file)
|
||
|
||
wb = openpyxl.load_workbook(file_path, data_only=True)
|
||
|
||
sheet_01 = wb['中小商业企业信用模型']
|
||
index_data = [
|
||
{"指标": row[2].value, "数值": row[3].value, "单位": row[4].value,
|
||
"权重": row[7].value, "得分": row[8].value} for row in sheet_01.iter_rows()]
|
||
index_data = index_data[2:42]
|
||
|
||
rating_result = {
|
||
"级别": sheet_01.cell(43, 6).value,
|
||
"总分": round(sheet_01.cell(43, 9).value, 2)
|
||
}
|
||
|
||
background_tasks.add_task(del_file, file_path)
|
||
return {
|
||
"message": "查询成功",
|
||
"data": {
|
||
"评级结果": rating_result,
|
||
"指标数值与得分": index_data
|
||
}
|
||
}
|