import datetime import math import pandas as pd from fastapi import APIRouter, Depends, UploadFile, File, HTTPException from sqlalchemy.orm import Session from APPData.Crud import CompanyCrud, BalanceCrud from Utils.DataBase.MongoHelperUtils import get_mongodb from Utils.DataBase.SqlAlchemyUtils import get_db from Utils.TycApi.CommonUtil import CommonUtils router = APIRouter( tags=["资产负债表"], prefix="/api/balance_sheet" ) @router.post("/upload", summary="上传数据") async def func01(name: str, excel: UploadFile = File(...), db: Session = Depends(get_db)): # 根据name查询企业是否存在 data = CompanyCrud.get_company_by_name(db=db, name=name) if not data: # 不能存在新建企业再返回cid data = CompanyCrud.create_company_by_name(db=db, name=name) cid = data.id contents = await excel.read() data = pd.read_excel(contents, skiprows=3) # 行数 row = data.shape[0] # 列数 col = data.shape[1] # 财报数据 for num in range(1, col): finance_dict = dict() finance_dict['企业ID'] = cid finance_dict['更新时间'] = CommonUtils.get_current_time() for row_num in range(row): try: if isinstance(data.iloc[row_num, 0], float): finance_dict['截止日期'] = data.iloc[row_num, num].strftime("%Y-%m-%d") continue if isinstance(data.iloc[row_num, num], datetime.datetime): finance_dict[data.iloc[row_num, 0].strip()] = data.iloc[row_num, num].strftime("%Y-%m-%d") else: finance_dict[data.iloc[row_num, 0].strip()] = data.iloc[row_num, num] except AttributeError: if isinstance(data.iloc[row_num, num], datetime.datetime): try: finance_dict[data.iloc[row_num, 0].strip()] = data.iloc[row_num, num].strftime("%Y-%m-%d") except AttributeError: finance_dict[data.iloc[row_num, 0]] = data.iloc[row_num, num].strftime("%Y-%m-%d") else: finance_dict[data.iloc[row_num, 0]] = data.iloc[row_num, num] for key, value in finance_dict.items(): try: if math.isnan(value): finance_dict[key] = None except TypeError: continue mongodb = get_mongodb() BalanceCrud.create_balance_by_id(db=db, mongodb=mongodb, _id=cid, reportdate=finance_dict['截止日期'], condition={"企业ID": cid, "截止日期": finance_dict['截止日期']}, content=finance_dict) mongodb.client.close() return {"info": "Success"} @router.post("/view", summary="查看数据") def func02(cid: str, db: Session = Depends(get_db)): mongodb = get_mongodb() # 查询mysql数据库中的_id db_item = CompanyCrud.get_company_by_id(db, _id=cid) if not db_item: raise HTTPException(status_code=204, detail="Company Not Found") # 根据_id查询mongodb数据库 data = BalanceCrud.find_balance_by_id(db=db, mongodb=mongodb, _id=cid) return data