import os from pathlib import Path from fastapi import APIRouter, UploadFile, File, Depends, HTTPException, Header from sqlalchemy.orm import Session from Crud import FileCrud from Schemas.UserSchemas import TokenData from Utils.CommonUtils import file_md5 from Utils.SqlAlchemyUtils import get_db from Schemas import FileSechemas from Utils.AuthUtils import token_data_depend from base64 import b64decode router = APIRouter(prefix="/api/daily/file", tags=["文件"]) allow_file_type = { 'image', 'audio', 'video', "application" } @router.post('/file_upload', response_model=FileSechemas.UserFileInfo, summary='文件上次') def file_upload(name: str = Header(None), file: UploadFile = File(...), db: Session = Depends(get_db), token_data: TokenData = Depends(token_data_depend)): file_org_name = file.filename if name: file_org_name = b64decode(name.encode('utf-8')).decode('utf-8') mime_type = file.content_type common_type, file_type = file.content_type.split('/') file_type_end = '' if '.' in file_org_name: file_type_end = '.' + file_org_name.split('.')[-1].lower() if common_type not in allow_file_type: raise Exception('不允许上传该类型文件') content_start = file.file.read(1024) file_md = file_md5(content_start) save_path = Path(f"static_data/upload_files/{common_type}/{file_type}") file_save_path = save_path / (file_md + file_type_end) file_url = f'/static_data/upload_files/{common_type}/{file_type}/{file_md + file_type_end}' if not os.path.exists(save_path): os.makedirs(save_path) user_file = FileCrud.get_file_by_md(db, file_md) if not user_file: if not os.path.exists(file_save_path): with open(file_save_path, 'wb') as f: f.write(content_start) while True: content = file.file.read(10240) if content: f.write(content) else: break user_file = FileCrud.add_file(db, FileSechemas.UserFileCreate(md=file_md, file_name=file_org_name, mime_type=mime_type, user_id=token_data.email, common_type=common_type, file_url=file_url)) print(user_file.to_dict(),'文件上传成功') return user_file.to_dict() @router.post("/query", response_model=FileSechemas.UserFileQueryRes, summary='文件查询') def query(req: FileSechemas.UserFileQuery, token_data: TokenData = Depends(token_data_depend), db: Session = Depends(get_db)): # if req.user_id and req.user_id != token_data.email: # raise HTTPException(detail='无权查询其他用户文件', status_code=403) count, items = FileCrud.file_query(db, req) item_list = [item.to_dict() for item in items] return FileSechemas.UserFileQueryRes(count=count, item_list=item_list) @router.post("/get", response_model=FileSechemas.UserFileInfo, summary='文件获取') def query(req: FileSechemas.UserFileGet, db: Session = Depends(get_db)): file = FileCrud.file_get(db, req.md) if not file: raise HTTPException(detail="未找到文件", status_code=404) return file.to_dict()