import datetime import math import pandas as pd from fastapi import APIRouter, Depends, UploadFile, File, HTTPException from sqlalchemy.orm import Session from APPData.Crud import CompanyCrud, BalanceCrud from Utils.AccessControl.AccessUtil import rbac from Utils.DataBase.MongoHelperUtils import get_mongodb from Utils.DataBase.SqlAlchemyUtils import get_db from Utils.TycApi.CommonUtil import CommonUtils router = APIRouter( tags=["资产负债表"], prefix="/api/wide_data/balance_sheet", dependencies=[Depends(rbac)] ) @router.post("/upload", summary="上传数据") async def func01(name: str, excel: UploadFile = File(...), db: Session = Depends(get_db)): # 根据name查询企业是否存在 data = CompanyCrud.get_company_by_name(db=db, name=name) if not data: # 不能存在新建企业再返回cid data = CompanyCrud.create_company_by_name(db=db, name=name) cid = data.id contents = await excel.read() data = pd.read_excel(contents, skiprows=3) # 行数 row = data.shape[0] # 列数 col = data.shape[1] # 财报数据 for num in range(1, col): finance_dict = dict() finance_dict['企业ID'] = cid finance_dict['更新时间'] = CommonUtils.get_current_time() for row_num in range(row): try: if isinstance(data.iloc[row_num, 0], float): finance_dict['截止日期'] = data.iloc[row_num, num].strftime("%Y-%m-%d") continue if isinstance(data.iloc[row_num, num], datetime.datetime): finance_dict[data.iloc[row_num, 0].strip()] = data.iloc[row_num, num].strftime("%Y-%m-%d") else: finance_dict[data.iloc[row_num, 0].strip()] = data.iloc[row_num, num] except AttributeError: if isinstance(data.iloc[row_num, num], datetime.datetime): try: finance_dict[data.iloc[row_num, 0].strip()] = data.iloc[row_num, num].strftime("%Y-%m-%d") except AttributeError: finance_dict[data.iloc[row_num, 0]] = data.iloc[row_num, num].strftime("%Y-%m-%d") else: finance_dict[data.iloc[row_num, 0]] = data.iloc[row_num, num] for key, value in finance_dict.items(): try: if math.isnan(value): finance_dict[key] = None except TypeError: continue mongodb = get_mongodb() BalanceCrud.create_balance_by_id(db=db, mongodb=mongodb, _id=cid, reportdate=finance_dict['截止日期'], condition={"企业ID": cid, "截止日期": finance_dict['截止日期']}, content=finance_dict) mongodb.client.close() return {"info": "Success"} @router.post("/view", summary="查看数据") def func02(cid: str, db: Session = Depends(get_db)): mongodb = get_mongodb() # 查询mysql数据库中的_id db_item = CompanyCrud.get_company_by_id(db, _id=cid) if not db_item: raise HTTPException(status_code=204, detail="Company Not Found") # 根据_id查询mongodb数据库 data = BalanceCrud.find_balance_by_id(db=db, mongodb=mongodb, _id=cid) return data