241 lines
8.7 KiB
Python
241 lines
8.7 KiB
Python
|
from fastapi import APIRouter, Depends, HTTPException
|
||
|
from sqlalchemy.orm import Session
|
||
|
from sqlalchemy.exc import IntegrityError
|
||
|
|
||
|
from BM02_指标仓库.DataBase import get_mysqldb
|
||
|
from BM02_指标仓库.Schemas import IndexStoreSchemas as Schemas
|
||
|
from BM02_指标仓库.Crud import IndexStoreCrud as Crud
|
||
|
|
||
|
from BS01_通用工具.访问权限.AccessUtil import rbac
|
||
|
|
||
|
|
||
|
router = APIRouter(
|
||
|
dependencies=[Depends(rbac)]
|
||
|
)
|
||
|
|
||
|
|
||
|
# 新建指标
|
||
|
@router.post("/create", summary='新建指标', tags=["指标"], response_model=Schemas.CreateIndicatorResBody)
|
||
|
def func(schema: Schemas.CreateIndicatorReqBody, db: Session = Depends(get_mysqldb)):
|
||
|
index_obj = Crud.get_indicator_by_ename(db, ename=schema.ename)
|
||
|
if index_obj:
|
||
|
raise HTTPException(status_code=202, detail="Indicator Already Registered")
|
||
|
|
||
|
category_obj = Crud.get_category_by_id(db=db, category_id=schema.category_id)
|
||
|
if not category_obj:
|
||
|
raise HTTPException(status_code=404, detail="Category Not Found")
|
||
|
|
||
|
index_obj = Crud.create_indicator(db=db, schema=schema)
|
||
|
|
||
|
parameters_item = schema.parameters
|
||
|
for parameter in parameters_item:
|
||
|
Crud.create_parameter(db=db, schema=parameter, _iid=index_obj.iid)
|
||
|
|
||
|
return {"iid": index_obj.iid}
|
||
|
|
||
|
|
||
|
# 删除指标
|
||
|
@router.post("/delete/{iid}", summary='删除指标', tags=["指标"])
|
||
|
def func(iid: str, db: Session = Depends(get_mysqldb)):
|
||
|
index_obj = Crud.get_indicator_by_iid(db=db, iid=iid)
|
||
|
if not index_obj:
|
||
|
raise HTTPException(status_code=404, detail="Indicator Not Existed")
|
||
|
|
||
|
Crud.delete_indicator(db=db, iid=iid)
|
||
|
|
||
|
return {"info": "Success"}
|
||
|
|
||
|
|
||
|
# 编辑指标
|
||
|
@router.post("/edit/{iid}", summary='编辑指标', tags=["指标"])
|
||
|
def func(iid: str, schema: Schemas.EditIndicatorReqBody, db: Session = Depends(get_mysqldb)):
|
||
|
index_obj = Crud.get_indicator_by_iid(db=db, iid=iid)
|
||
|
if not index_obj:
|
||
|
raise HTTPException(status_code=404, detail="Indicator Not Found")
|
||
|
|
||
|
category_obj = Crud.get_category_by_id(db=db, category_id=schema.category_id)
|
||
|
if not category_obj:
|
||
|
raise HTTPException(status_code=404, detail="Category Not Found")
|
||
|
|
||
|
Crud.edit_indicator(db=db, schema=schema, iid=iid)
|
||
|
|
||
|
return {"info": "Success"}
|
||
|
|
||
|
|
||
|
# 查看指标
|
||
|
@router.get("/view/{iid}", summary='查看指标', tags=["指标"], response_model=Schemas.ViewIndicatorResBody)
|
||
|
def func(iid: str, db: Session = Depends(get_mysqldb)):
|
||
|
index_obj = Crud.get_indicator_by_iid(db, iid=iid)
|
||
|
if index_obj is None:
|
||
|
raise HTTPException(status_code=404, detail="Indicator Not Found")
|
||
|
|
||
|
return index_obj.to_dict()
|
||
|
|
||
|
|
||
|
@router.get("/view_by_ename/{ename}", summary='查看指标', tags=["指标"])
|
||
|
def func(ename: str, db: Session = Depends(get_mysqldb)):
|
||
|
index_obj = Crud.get_indicator_by_ename(db, ename=ename)
|
||
|
if index_obj is None:
|
||
|
raise HTTPException(status_code=404, detail="Indicator Not Found")
|
||
|
|
||
|
return {"id": index_obj.ename, "name": index_obj.cname}
|
||
|
|
||
|
|
||
|
# 查询指标描述
|
||
|
@router.post("/describe/{iid}", summary='查看指标描述', tags=["指标"], response_model=Schemas.IndicatorDescribeResBody)
|
||
|
def func(iid: str, db: Session = Depends(get_mysqldb)):
|
||
|
index_obj = Crud.get_indicator_by_iid(db, iid=iid)
|
||
|
if not index_obj:
|
||
|
raise HTTPException(status_code=404, detail="Indicator Not Found")
|
||
|
|
||
|
return index_obj.to_dict()
|
||
|
|
||
|
|
||
|
# 查询指标
|
||
|
@router.post("/search", summary='查询指标', tags=["指标"], response_model=Schemas.SearchIndicatorResBody)
|
||
|
def func(schema: Schemas.SearchIndicatorReqBody, db: Session = Depends(get_mysqldb)):
|
||
|
indicators, total = Crud.search_indicators(db, schema=schema)
|
||
|
items = [Schemas.ViewIndicatorResBody(**indicator.to_dict()).dict() for indicator in indicators]
|
||
|
return {"items": items, "total": total}
|
||
|
|
||
|
|
||
|
@router.post("/register", summary='注册指标', tags=["指标"])
|
||
|
def func(schema: Schemas.RegisterIndicatorReqBody, db: Session = Depends(get_mysqldb)):
|
||
|
index_obj = Crud.get_indicator_by_ename(db, ename=schema.ename)
|
||
|
if index_obj:
|
||
|
raise HTTPException(status_code=202, detail="Indicator Already Registered")
|
||
|
|
||
|
category_obj = Crud.get_category_by_name(db=db, title=schema.category)
|
||
|
if not category_obj:
|
||
|
raise HTTPException(status_code=404, detail="Category Not Found")
|
||
|
|
||
|
real_schema = schema.dict()
|
||
|
real_schema.pop("category")
|
||
|
real_schema.update({"category_id": category_obj.id})
|
||
|
real_schema = Schemas.CreateIndicatorReqBody(**real_schema)
|
||
|
|
||
|
index_obj = Crud.create_indicator(db=db, schema=real_schema)
|
||
|
|
||
|
parameters_item = schema.parameters
|
||
|
for parameter in parameters_item:
|
||
|
Crud.create_parameter(db=db, schema=parameter, _iid=index_obj.iid)
|
||
|
|
||
|
return {"info": "Success"}
|
||
|
|
||
|
|
||
|
# 到处指标配置
|
||
|
@router.post("/export", summary='指标配置导出', tags=["指标"])
|
||
|
def func(iid_list: list, db: Session = Depends(get_mysqldb)):
|
||
|
result = dict()
|
||
|
result['得分级别标准'] = dict()
|
||
|
result['档位得分标准'] = dict()
|
||
|
result['指标及权重'] = list()
|
||
|
result['评级调整'] = list()
|
||
|
result['数据填报'] = dict()
|
||
|
|
||
|
for iid in iid_list:
|
||
|
indicator = Crud.get_indicator_by_iid(db, iid)
|
||
|
if not indicator:
|
||
|
raise HTTPException(status_code=404, detail="Indicator Not Found")
|
||
|
index = dict()
|
||
|
index['指标'] = indicator.ename
|
||
|
index['指标结构'] = [indicator.cname]
|
||
|
index['指标类型'] = indicator.nature.value
|
||
|
index['档位'] = None
|
||
|
index['权重'] = None
|
||
|
index['计算请求'] = None
|
||
|
index['指标参数'] = list()
|
||
|
params = indicator.parameters
|
||
|
for param in params:
|
||
|
param_dict = dict()
|
||
|
param_dict['参数'] = param.ename
|
||
|
path = param.path
|
||
|
if path:
|
||
|
param_dict['加载路径'] = param.path.split("-")
|
||
|
else:
|
||
|
param_dict['加载路径'] = None
|
||
|
index['指标参数'].append(param_dict)
|
||
|
result['指标及权重'].append(index)
|
||
|
|
||
|
for items in result['指标及权重']:
|
||
|
item = items.get('指标参数')
|
||
|
for itm in item:
|
||
|
sheet = itm.get('加载路径')
|
||
|
if sheet:
|
||
|
result['数据填报'][sheet[0]] = list()
|
||
|
|
||
|
for items in result['指标及权重']:
|
||
|
item = items.get('指标参数')
|
||
|
for itm in item:
|
||
|
sheet = itm.get('加载路径')
|
||
|
if sheet:
|
||
|
result['数据填报'][sheet[0]].append(sheet[-1])
|
||
|
|
||
|
return result
|
||
|
|
||
|
|
||
|
# 新建参数
|
||
|
@router.post("/param/create/{iid}", summary='新建参数', tags=["指标参数"])
|
||
|
def func(iid: str, body: Schemas.EditParamReqBody, db: Session = Depends(get_mysqldb)):
|
||
|
index_obj = Crud.get_indicator_by_iid(db=db, iid=iid)
|
||
|
if not index_obj:
|
||
|
raise HTTPException(status_code=404, detail="Indicator Not Found")
|
||
|
|
||
|
Crud.create_parameter(db=db, schema=body, _iid=iid)
|
||
|
|
||
|
return {"info": "Success"}
|
||
|
|
||
|
|
||
|
@router.post("/param/edit/{pid}", summary='编辑参数', tags=["指标参数"])
|
||
|
def func(pid: str, schema: Schemas.EditParamReqBody, db: Session = Depends(get_mysqldb)):
|
||
|
data = Crud.get_parameter_by_pid(db=db, pid=pid)
|
||
|
if not data:
|
||
|
raise HTTPException(status_code=404, detail="Parameter Not Existed")
|
||
|
|
||
|
Crud.edit_parameter(db=db, pid=pid, schema=schema)
|
||
|
|
||
|
return {"info": "Success"}
|
||
|
|
||
|
|
||
|
# 删除参数
|
||
|
@router.post("/param/delete/{pid}", summary='删除参数', tags=["指标参数"])
|
||
|
def func(pid: str, db: Session = Depends(get_mysqldb)):
|
||
|
data = Crud.get_parameter_by_pid(db=db, pid=pid)
|
||
|
if not data:
|
||
|
raise HTTPException(status_code=404, detail="Parameter Not Existed")
|
||
|
|
||
|
Crud.delete_parameter(db=db, pid=pid)
|
||
|
|
||
|
return {"info": "Success"}
|
||
|
|
||
|
|
||
|
@router.post("/category/create", summary="新建分类", tags=["指标分类"])
|
||
|
def func(schema: Schemas.CreateCategoryReqBody, db: Session = Depends(get_mysqldb)):
|
||
|
data = Crud.get_category_by_name(db=db, title=schema.title)
|
||
|
if data:
|
||
|
raise HTTPException(status_code=202, detail="Category Already Existed")
|
||
|
|
||
|
Crud.create_category(db=db, schema=schema)
|
||
|
|
||
|
return {"info": "Success"}
|
||
|
|
||
|
|
||
|
@router.post("/category/delete", summary="删除分类", tags=["指标分类"])
|
||
|
def func(schema: Schemas.DeleteCategoryReqBody, db: Session = Depends(get_mysqldb)):
|
||
|
data = Crud.get_category_by_id(db=db, category_id=schema.category_id)
|
||
|
if not data:
|
||
|
raise HTTPException(status_code=404, detail="Category Not Found")
|
||
|
|
||
|
try:
|
||
|
Crud.delete_category(db=db, category_id=schema.category_id)
|
||
|
except IntegrityError:
|
||
|
raise HTTPException(status_code=202, detail="Category Is Being Used")
|
||
|
|
||
|
return {"info": "Success"}
|
||
|
|
||
|
|
||
|
@router.post("/category/list", summary="分类列表", tags=["指标分类"])
|
||
|
def func(db: Session = Depends(get_mysqldb)):
|
||
|
category_objs = Crud.list_category(db=db)
|
||
|
return [obj.to_kv() for obj in category_objs]
|