ForeignKey('data_collection.id', ondelete='CASCADE', onupdate='CASCADE'))
name: Mapped[str] = mapped_column(String(255), comment="收集名称")
uploaded: Mapped[bool] = mapped_column(Boolean, default=False, comment="上传状态")
upload_time: Mapped[datetime] = mapped_column(DateTime, nullable=True,comment='上传时间')
org_file_path: Mapped[str] = mapped_column(String(255), comment="原始文件路径", nullable=True)
org_file_name: Mapped[str] = mapped_column(String(255), comment="原始文件名称", nullable=True)
upload_file_path: Mapped[str] = mapped_column(String(255), comment="上传文件路径", nullable=True)
__tablename__ = "data_collection"
id: Mapped[str] = mapped_column(String(128), primary_key=True, default=lambda: uuid.uuid4().hex, comment='ID')
url: Mapped[str] = mapped_column(String(255), unique=True, comment='收集链接')
code: Mapped[str] = mapped_column(String(255), unique=True, comment='收集链接的编码')
passwd: Mapped[str] = mapped_column(String(255), comment='访问密码')
update_time: Mapped[datetime] = mapped_column(DateTime, server_default=func.now(), onupdate=func.now(),

from fastapi import APIRouter
import os
from datetime import datetime
from pathlib import Path
from fastapi import APIRouter, Depends, HTTPException, Form, File
from sqlalchemy.orm import Session
from fastapi import UploadFile as FastApiUploadFile
from utils.common_utils import file_md5
from utils.random_utils import get_random_letter_and_num_code
from ..upload_file.crud import types_dic_rev
from ...common import get_db
from .models import DataCollection, UploadFileItem
from . import schemas
base_url = ''
router = APIRouter(tags=["数据收集接口"], prefix='/data_collection')
need_files = [
{"name": "资产负债表", "path": ""},
{"name": "", "path": ""},
{"name": "", "path": ""},
{"name": "", "path": ""},
{"name": "", "path": ""},
{"name": "资产负债表", "file_name": "资产负债表.xlsx", "path": ""},
{"name": "利润表", "file_name": "利润表.xlsx", "path": ""},
def data_collection_create(req: schemas.DataCollectionCreateReq):
def data_collection_create(req: schemas.DataCollectionCreateReq, db: Session = Depends(get_db)):
code = get_random_letter_and_num_code(8)
pwd = get_random_letter_and_num_code(4)
item = DataCollection(code=code, passwd=pwd)
item.files = [UploadFileItem(org_file_path=file['path'], name=file['name'], org_file_name=file['file_name']) for
file in need_files]
return schemas.DataCollectionCreateRes(**item.to_full_dict())
def data_collection_get(req: schemas.DataCollectionGetReq, db: Session = Depends(get_db)):
item = db.query(DataCollection).filter(DataCollection.id == req.id).first()
if not item:
raise HTTPException(status_code=303, detail='未找到文件')
return schemas.DataCollectionGetRes(**item.to_full_dict())
def data_collection_upload(collection_id: str = Form(...), name: str = Form(...),
file: FastApiUploadFile = File(...),
db: Session = Depends(get_db)):
content = b""
while True:
byte = file.file.read(10240)
if byte:
content += byte
file_type = types_dic_rev.get(file.content_type)
common_type, _ = file.content_type.split('/')
content_start = content[0:10240]
file_md = file_md5(content_start)
file_save_name = file_md + f".{file_type}"
save_path = Path(f"static/upload_files/{common_type}/{file_type}")
file_save_path = save_path / file_save_name
file_url = f'/static/upload_files/{common_type}/{file_type}/{file_save_name}'
if not os.path.exists(save_path):
if not os.path.exists(file_save_path):
with open(file_save_path, 'wb') as f:
item = db.query(UploadFileItem).filter(UploadFileItem.collection_id == collection_id,
UploadFileItem.name == name).first()
if not item:
raise HTTPException(status_code=303, detail="不存在该名称")
item.uploaded = True
item.upload_file_name = file.filename
item.upload_file_path = file_url
item.upload_time = datetime.now()
return schemas.DataCollectionUploadRes(**item.to_dict())
def data_collection_file_remove(req: schemas.DataCollectionFileRemoveReq, db: Session = Depends(get_db)):
item: UploadFileItem = db.query(UploadFileItem).filter(UploadFileItem.id == req.id).first()
if not item:
raise HTTPException(status_code=303, detail='未找到文件')
item.upload_file_path = None
item.upload_file_name = None
item.uploaded = False
item.upload_time = None
return "成功"

from enum import Enum
from typing import List, Optional
from datetime import datetime
from pydantic import BaseModel
# class FileNeedInfoItem(BaseModel):
# id: str
# name: str
class FileNeedInfoItem(BaseModel):
name: str
org_file_path: Optional[str] = None
org_file_name: Optional[str] = None
uploaded: Optional[bool] = None
upload_time: Optional[datetime] = None
upload_file_path: Optional[str] = None
upload_file_name: Optional[str] = None
class DataCollectionCreateReq(BaseModel):
rate_id: str
serve_id: str
class DataCollectionCreateRes(BaseModel):
id: str
passwd: str
code: str
files: List[FileNeedInfoItem]
class DataCollectionGetReq(BaseModel):
id: str
class DataCollectionGetRes(BaseModel):
id: str
passwd: str
code: str
files: List[FileNeedInfoItem]
class DataCollectionUploadRes(FileNeedInfoItem):
id: str
class DataCollectionFileRemoveReq(BaseModel):
id: str

from .mods.rate_data_preparation_node.router import router as rate_data_preparation_node_router
from .mods.upload_file.router import router as upload_file_router
from .client.router import router as client_router
from .mods.data_collection.router import router as data_collection_router
router = APIRouter(prefix="/rate")
config = {
"url": "",
"passwd": "",
"files": [
{"org_path": "", "name": "资产负债表", "upload_path": ""},
{"org_path": "", "name": "利润表", "upload_path": ""},
{"org_path": "", "name": "现金流量表", "upload_path": ""},
{"org_path": "", "name": "经营情况表", "upload_path": ""},

def get_random_letter_and_num_code(length=4):
return ''.join(random.sample(string.ascii_letters + string.digits, 4))
return ''.join(random.sample(string.ascii_letters + string.digits, length))