This commit is contained in:
P3ngSaM 2023-08-08 16:38:07 +08:00
parent cb7e348fd5
commit 8d095225fe
4 changed files with 129 additions and 31 deletions

View File

@ -4,15 +4,13 @@ import os
import numpy as np import numpy as np
import openpyxl import openpyxl
import pandas as pd import pandas as pd
import pythoncom import xlwings as xw
import xlrd
from openpyxl import load_workbook from openpyxl import load_workbook
from openpyxl.utils import get_column_letter from openpyxl.utils import get_column_letter
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, BackgroundTasks from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, BackgroundTasks
from openpyxl.worksheet.datavalidation import DataValidation from openpyxl.worksheet.datavalidation import DataValidation
from starlette.background import BackgroundTask from starlette.background import BackgroundTask
from starlette.responses import FileResponse from starlette.responses import FileResponse
from win32com.client import Dispatch
from App.Schemas import ModelSchemas from App.Schemas import ModelSchemas
from Utils.DataBase.MongoHelperUtils import MongoHelper, get_mongodb from Utils.DataBase.MongoHelperUtils import MongoHelper, get_mongodb
@ -256,7 +254,7 @@ async def func(background_tasks: BackgroundTasks, schemas: ModelSchemas.ModelSco
raise HTTPException(status_code=404, detail="该模型不存在") raise HTTPException(status_code=404, detail="该模型不存在")
fid = result.get('模型ID') fid = result.get('模型ID')
file = mongo_db.get_file(db='中小商会企业信用评级', bucket='模型文件', fid=str(fid)) file = mongo_db.get_file(db='中小商会企业信用评级', bucket='模型文件', fid=str(fid))
file_name = "{}.xlsx".format('中小商业企业信用模型') file_name = "{}.xlsx".format('中小商业企业信用模型2')
path = os.path.join(os.getcwd(), 'Utils', 'File', 'generate', file_name) path = os.path.join(os.getcwd(), 'Utils', 'File', 'generate', file_name)
myfile = open(path, mode='wb') myfile = open(path, mode='wb')
myfile.write(file) myfile.write(file)
@ -315,22 +313,19 @@ async def func(background_tasks: BackgroundTasks, schemas: ModelSchemas.ModelSco
else: else:
value = backtrack_data.get(cell_name) value = backtrack_data.get(cell_name)
backtrack_sheet[val_cell] = value backtrack_sheet[val_cell] = value
wb.save(path) wb.save('1.xlsx')
def just_open(filename): app = xw.App(visible=False, add_book=False)
# 解决读取公式计算数据为None的问题 # 不显示Excel消息框
pythoncom.CoInitialize() app.display_alerts = False
xlApp = Dispatch("Excel.Application") # 关闭屏幕更新,可加快宏的执行速度
xlApp.Visible = False app.screen_updating = False
xlApp.DisplayAlerts = 0 xwb = app.books.open('1.xlsx')
xlBook = xlApp.Workbooks.Open(filename) xwb.save()
xlBook.SaveAs(filename) xwb.close()
xlBook.Close() app.quit()
just_open(path)
wb = openpyxl.load_workbook(path, data_only=True)
wb = load_workbook(path, data_only=True)
sheet_01 = wb['中小商业企业信用模型'] sheet_01 = wb['中小商业企业信用模型']
index_data = [ index_data = [
{"指标": row[2].value, "数值": row[3].value, "单位": row[4].value, {"指标": row[2].value, "数值": row[3].value, "单位": row[4].value,
@ -505,19 +500,8 @@ async def func(background_tasks: BackgroundTasks, pid: int = 123456,
f.write(stream) f.write(stream)
return path return path
def just_open(filename):
# 解决读取公式计算数据为None的问题
pythoncom.CoInitialize()
xlApp = Dispatch("Excel.Application")
xlApp.Visible = False
xlBook = xlApp.Workbooks.Open(filename)
xlBook.Save()
xlBook.Close()
file_path = create_file(file) file_path = create_file(file)
just_open(file_path)
wb = openpyxl.load_workbook(file_path, data_only=True) wb = openpyxl.load_workbook(file_path, data_only=True)
sheet_01 = wb['中小商业企业信用模型'] sheet_01 = wb['中小商业企业信用模型']

114
App/Router/ScoreRouter.py Normal file
View File

@ -0,0 +1,114 @@
# -*- coding: utf-8 -*-
import os
import shutil
import xlwings as xw
from openpyxl import load_workbook
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from App.Schemas import ModelSchemas
from Utils.DataBase.MongoHelperUtils import MongoHelper, get_mongodb
router = APIRouter(
prefix="/api/model_score"
)
def del_file(path):
os.remove(path)
@router.post("/model_score_reusult", summary="提交模型打分所需数据返回评级结果", tags=["评级结果"])
async def func(background_tasks: BackgroundTasks, schemas: ModelSchemas.ModelScoreData):
# 复制一份文件到指定文件夹
temp_file = os.path.join(os.getcwd(), 'Utils', 'File', 'template', '中小商业企业信用模型.xlsx')
dst_folder = os.path.join(os.getcwd(), 'Utils', 'File', 'generate')
shutil.copy(temp_file, dst_folder)
path = os.path.join(os.getcwd(), 'Utils', 'File', 'generate', '中小商业企业信用模型.xlsx')
# 打开模型excel
wb = load_workbook(path)
# 将经营问卷填入文件
business_sheet = wb['经营问卷']
business_data = schemas.经营问卷.dict()
business_sheet_rows = business_sheet.max_row
for i in range(2, business_sheet_rows + 1):
key_cell = 'A{}'.format(i)
val_cell = 'D{}'.format(i)
cell_name = business_sheet[key_cell].value
if cell_name == '对外服务总次数(包括电话咨询、客户支持、产品售后等)':
value = business_data.get('对外服务总次数')
else:
value = business_data.get(cell_name)
business_sheet[val_cell] = value
# 财务问卷填入文件
financial_sheet = wb['财务问卷']
financial_data = schemas.财务问卷.dict()
financial_sheet_rows = financial_sheet.max_row - 5
for j in range(2, financial_sheet_rows + 1):
key_cell = 'B{}'.format(j)
val1_cell = 'C{}'.format(j)
val2_cell = 'D{}'.format(j)
val3_cell = 'E{}'.format(j)
cell_name = financial_sheet[key_cell].value
if cell_name == '减:累计折旧':
value = financial_data.get('累计折旧')
else:
value = financial_data.get(cell_name)
financial_sheet[val1_cell] = value[0]
financial_sheet[val2_cell] = value[1]
financial_sheet[val3_cell] = value[2]
audit = financial_data.get('是否审计')
financial_sheet['B35'] = audit
firm = financial_data.get('会计事务所')
financial_sheet['B36'] = firm
# 背调数据填入文件
backtrack_sheet = wb['背调接口']
backtrack_data = schemas.背调接口.dict()
backtrack_sheet_rows = backtrack_sheet.max_row
for i in range(2, backtrack_sheet_rows + 1):
key_cell = 'A{}'.format(i)
val_cell = 'C{}'.format(i)
cell_name = backtrack_sheet[key_cell].value
if cell_name == '开庭公告(被告-合同纠纷、劳动争议)':
value = backtrack_data.get('开庭公告被告合同纠纷劳动争议')
elif cell_name == '行政处罚(警告、通报批评、罚款)':
value = backtrack_data.get('行政处罚警告通报批评罚款')
elif cell_name == '行政处罚(没收违法所得、没收非法财务...':
value = backtrack_data.get('行政处罚没收违法所得没收非法财务')
else:
value = backtrack_data.get(cell_name)
backtrack_sheet[val_cell] = value
wb.save('1.xlsx')
app = xw.App(visible=False, add_book=False)
# 不显示Excel消息框
app.display_alerts = False
# 关闭屏幕更新,可加快宏的执行速度
app.screen_updating = False
xwb = app.books.open('1.xlsx')
xwb.save()
xwb.close()
app.quit()
wb = load_workbook('1.xlsx', data_only=True)
sheet_01 = wb['中小商业企业信用模型']
index_data = [
{"指标": row[2].value, "数值": row[3].value, "单位": row[4].value,
"权重": row[7].value, "得分": row[8].value} for row in sheet_01.iter_rows()]
index_data = index_data[2:42]
rating_result = {
"级别": sheet_01.cell(43, 6).value,
"总分": round(sheet_01.cell(43, 9).value, 2)
}
background_tasks.add_task(del_file, path)
background_tasks.add_task(del_file, '1.xlsx')
return {
"message": "计算成功",
"data": {
"评级结果": rating_result,
"指标数值与得分": index_data
}
}

View File

@ -3,7 +3,7 @@ import uvicorn
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from App.Router import ModelRouter, ReportRouter from App.Router import ReportRouter, ScoreRouter
app = FastAPI( app = FastAPI(
title="中小商业企业信用评级", title="中小商业企业信用评级",
@ -18,7 +18,7 @@ app.add_middleware(
allow_methods=["*"] allow_methods=["*"]
) )
app.include_router(ModelRouter.router) app.include_router(ScoreRouter.router)
app.include_router(ReportRouter.router) app.include_router(ReportRouter.router)
if __name__ == '__main__': if __name__ == '__main__':