ratingprocess/App/Router/RatingFlowRouter.py

148 lines
5.7 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Header
from sqlalchemy.orm import Session
from App.Crud import RatingFlowCrud as Crud
from App.Schemas import RatingFlowSchema
from Utils.Authentication.TokenUtil import decode_token
from Utils.Common.RegisterUtils import request_to_get, request_to_post
from Utils.DataBase.MongoHelperUtils import get_mongodb, MongoHelper
from Utils.DataBase.SqlAlchemyUtils import get_db
router = APIRouter(
prefix="/api/rating_flow"
)
@router.post("/create", summary='新建评级流程', tags=["评级流程"])
def func(schema: RatingFlowSchema.CreateRatingFlowReqBody, token: str = Header(...),
db: Session = Depends(get_db), mongodb: MongoHelper = Depends(get_mongodb)):
# 存储新建的流程
rating_flow_obj = Crud.create_rating_flow(db=db, schema=schema)
# 新建流程节点
nodes = ["填报数据", "基本信用状况", "个体信用状况", "主体信用等级"]
for i in range(len(nodes)):
node_num = i + 1
Crud.create_rating_flow_node(db=db, node_num=node_num, node_name=nodes[i], rating_process_id=rating_flow_obj.id)
# 存储创建用户信息
decoded_info = decode_token(token=token)
user_info = decoded_info.get("user_info")
participant_schema = RatingFlowSchema.CreateParticipantReqBody()
participant_schema.participant = user_info.get("name")
participant_schema.user_id = user_info.get("uid")
participant_schema.rating_process_id = rating_flow_obj.id
Crud.create_rating_flow_participant(db=db, schema=participant_schema)
# 返回流程信息
nodes = Crud.get_rating_flow_all_nodes(db=db, mongodb=mongodb, _id=rating_flow_obj.id)
if not nodes:
raise HTTPException(status_code=400, detail="RatingFlow Not Found")
return nodes
@router.post("/get", summary="所有流程节点", tags=["评级流程"])
def func(_id: str, db: Session = Depends(get_db), mongodb: MongoHelper = Depends(get_mongodb)):
# 返回流程信息
nodes = Crud.get_rating_flow_all_nodes(db=db, mongodb=mongodb, _id=_id)
if not nodes:
raise HTTPException(status_code=400, detail="RatingFlow Not Found")
return nodes
@router.post("/save", summary="保存节点数据", tags=["评级流程"])
def func(schema: RatingFlowSchema.SaveRatingFlowNodeReqBody, db: Session = Depends(get_db),
mongodb: MongoHelper = Depends(get_mongodb)):
# 保存流程节点数据
rating_process_id = Crud.save_rating_flow_node(db=db, mongodb=mongodb, node_id=schema.node_id, data=schema.data)
if not rating_process_id:
raise HTTPException(status_code=400, detail="RatingFlowNode Not Found")
# 返回流程信息
nodes = Crud.get_rating_flow_all_nodes(db=db, mongodb=mongodb, _id=rating_process_id)
if not nodes:
raise HTTPException(status_code=400, detail="RatingFlow Not Found")
return nodes
@router.post("/import/input_data", summary="导入填报数据", tags=["评级节点"])
def func(schema: RatingFlowSchema.GetRatingDataReqBody, db: Session = Depends(get_db),
mongodb: MongoHelper = Depends(get_mongodb)):
score_card_id = schema.scorecard_id
# 获取打分卡配置
score_card_url = 'http://test.fecribd.com:5004/api/model_store/scorecard/view?_id={}'.format(score_card_id)
res = request_to_get(score_card_url)
if not res:
raise HTTPException(status_code=400, detail="ScoreCard Not Found")
data_table = res.get('data_table')
if not data_table:
raise HTTPException(status_code=400, detail="DataTable Not Found")
# 获取填报数据
data_url = 'http://test.fecribd.com:5001/api/query/fiancial_data'
data = {"cid": schema.company_id, "report_date": schema.report_date}
result = request_to_post(data_url, data)
if not result:
raise HTTPException(status_code=400, detail="Data Not Found")
# 整合结果
data_result = dict()
for name, content in data_table.items():
if name == '资产负债表':
balance = list()
for sheet in result['balance_sheet_data']:
insert = dict()
for cont in content:
insert['报告期'] = sheet['截止日期']
try:
insert[cont] = sheet[cont]
except KeyError:
insert[cont] = None
balance.append(insert)
data_result['资产负债表'] = balance
if name == '利润表':
income = list()
for sheet in result['income_sheet_data']:
insert = dict()
for cont in content:
insert['报告期'] = sheet['截止日期']
try:
insert[cont] = sheet[cont]
except KeyError:
insert[cont] = None
income.append(insert)
data_result['利润表'] = income
if name == '现金流量表':
cashflow = list()
for sheet in result['cashflow_sheet_data']:
insert = dict()
for cont in content:
insert['报告期'] = sheet['截止日期']
try:
insert[cont] = sheet[cont]
except KeyError:
insert[cont] = None
cashflow.append(insert)
data_result['现金流量表'] = cashflow
return data_result
@router.post("/import/index_data", summary="导入定性定量指标", tags=["评级节点"])
def func(schema):
pass
@router.post("/import/adjust_factors", summary="导入调整因素", tags=["评级节点"])
def func(schema):
pass
@router.post("/import/external_support", summary="导入外部支持", tags=["评级节点"])
def func(schema):
pass