115 lines
5.1 KiB
Python
115 lines
5.1 KiB
Python
import os
|
|
from pathlib import Path
|
|
from fastapi import APIRouter, UploadFile, File, Depends, HTTPException, Header
|
|
from sqlalchemy.orm import Session
|
|
from . import crud
|
|
from utils.common_utils import file_md5
|
|
from context.common import common_db, company_user_auth_util, CompanyUserTokenDataModel
|
|
from . import schemas
|
|
from base64 import b64decode
|
|
|
|
router = APIRouter(prefix="/form_file", tags=["文件"])
|
|
allow_file_type = {
|
|
"application",
|
|
'image'
|
|
}
|
|
|
|
|
|
@router.post('/license_file_upload', response_model=schemas.FormFileInfo, summary='营业执照文件上传')
|
|
def license_file_upload(name: str = Header(None), file: UploadFile = File(...),
|
|
db: Session = Depends(common_db.get_db)
|
|
# token_data: CompanyUserTokenDataModel = Depends(company_user_auth_util.token_data_depend)
|
|
):
|
|
if name:
|
|
name = b64decode(name.encode('utf-8')).decode('utf-8')
|
|
mime_type = file.content_type
|
|
common_type, file_type = file.content_type.split('/')
|
|
if common_type not in {'image'}:
|
|
raise Exception('请上传图片类型文件')
|
|
content_start = file.file.read(10240)
|
|
file_md = file_md5(content_start)
|
|
save_path = Path(f"static/upload_files/{common_type}/{file_type}")
|
|
file_save_path = save_path / file_md
|
|
file_url = f'/static/upload_files/{common_type}/{file_type}/{file_md}'
|
|
if not os.path.exists(save_path):
|
|
os.makedirs(save_path)
|
|
form_file = crud.get_file_by_md(db, file_md)
|
|
if not form_file:
|
|
if not os.path.exists(file_save_path):
|
|
with open(file_save_path, 'wb') as f:
|
|
f.write(content_start)
|
|
while True:
|
|
content = file.file.read(10240)
|
|
if content:
|
|
f.write(content)
|
|
else:
|
|
break
|
|
# user_id=token_data.id
|
|
user_id = ""
|
|
form_file = crud.add_file(db,
|
|
schemas.FormFileCreate(md=file_md, file_name=name or file.filename,
|
|
mime_type=mime_type,
|
|
user_id=user_id, common_type=common_type,
|
|
file_url=file_url))
|
|
return form_file.to_dict()
|
|
|
|
|
|
@router.post('/form_file_upload', response_model=schemas.FormFileInfo, summary='表单文件上传')
|
|
def form_file_upload(name: str = Header(None), file: UploadFile = File(...),
|
|
db: Session = Depends(common_db.get_db),
|
|
token_data: CompanyUserTokenDataModel = Depends(company_user_auth_util.token_data_depend)
|
|
):
|
|
if name:
|
|
name = b64decode(name.encode('utf-8')).decode('utf-8')
|
|
mime_type = file.content_type
|
|
if mime_type != 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet':
|
|
raise Exception('不允许上传该类型文件')
|
|
common_type, file_type = file.content_type.split('/')
|
|
# if common_type not in allow_file_type:
|
|
# raise Exception('不允许上传该类型文件')
|
|
content_start = file.file.read(10240)
|
|
file_md = file_md5(content_start)
|
|
save_path = Path(f"static/upload_files/{common_type}/{file_type}")
|
|
file_save_path = save_path / file_md
|
|
file_url = f'/static/upload_files/{common_type}/{file_type}/{file_md}'
|
|
if not os.path.exists(save_path):
|
|
os.makedirs(save_path)
|
|
form_file = crud.get_file_by_md(db, file_md)
|
|
if not form_file:
|
|
if not os.path.exists(file_save_path):
|
|
with open(file_save_path, 'wb') as f:
|
|
f.write(content_start)
|
|
while True:
|
|
content = file.file.read(10240)
|
|
if content:
|
|
f.write(content)
|
|
else:
|
|
break
|
|
user_id = token_data.id
|
|
# user_id = ""
|
|
form_file = crud.add_file(db,
|
|
schemas.FormFileCreate(md=file_md, file_name=name or file.filename,
|
|
mime_type=mime_type,
|
|
user_id=user_id, common_type=common_type,
|
|
file_url=file_url))
|
|
return form_file.to_dict()
|
|
|
|
|
|
@router.post("/query", response_model=schemas.FormFileQueryRes, summary='文件查询')
|
|
def query(req: schemas.FormFileQuery,
|
|
db: Session = Depends(common_db.get_db)):
|
|
# if req.user_id and req.user_id != token_data.email:
|
|
# raise HTTPException(detail='无权查询其他用户文件', status_code=403)
|
|
count, items = crud.file_query(db, req)
|
|
item_list = [item.to_dict() for item in items]
|
|
return schemas.FormFileQueryRes(count=count, item_list=item_list)
|
|
|
|
|
|
@router.post("/get", response_model=schemas.FormFileInfo, summary='文件获取')
|
|
def query(req: schemas.FormFileGet,
|
|
db: Session = Depends(common_db.get_db)):
|
|
file = crud.file_get(db, req.md)
|
|
if not file:
|
|
raise HTTPException(detail="未找到文件", status_code=404)
|
|
return file.to_dict()
|