urban-investment-research/Mods/User/Crud.py

131 lines
4.3 KiB
Python

import os
from hashlib import md5
from pathlib import Path
from fastapi import HTTPException, UploadFile
from sqlalchemy.orm import Session
from .Models import User, Post, Department, AuthRule, UserFile
from . import Schemas
def get_user_info(db: Session, email: str):
user = db.query(User).filter_by(email=email).first()
return user
def get_user_info_by_phone(db: Session, phone: str):
user = db.query(User).filter_by(phone=phone).first()
if not user:
return None
departments = []
posts = []
if user.department:
department_id_list = [int(department) for department in user.department.split(",")]
departments = [item.to_dict() for item in
db.query(Department).filter(Department.id.in_(department_id_list))]
if user.post:
post_id_list = [int(post) for post in user.post.split(",")]
posts = [item.to_dict() for item in db.query(Post).filter(Post.id.in_(post_id_list))]
user_data = user.to_dict()
user_data['department_list'] = departments
user_data['post_list'] = posts
return user_data
def get_full_user_info(db: Session, email: str):
user = db.query(User).filter_by(email=email).first()
if not user:
return None
departments = []
posts = []
if user.department:
department_id_list = [int(department) for department in user.department.split(",")]
departments = [item.to_dict() for item in
db.query(Department).filter(Department.id.in_(department_id_list))]
if user.post:
post_id_list = [int(post) for post in user.post.split(",")]
posts = [item.to_dict() for item in db.query(Post).filter(Post.id.in_(post_id_list))]
user_data = user.to_dict()
user_data['department_list'] = departments
user_data['post_list'] = posts
return user_data
def create_user(db: Session, email: str):
user = User(email=email)
db.add(user)
db.commit()
db.refresh(user)
return user
def marge_auth_data(auth_data_list):
auth_data = []
for item in auth_data_list:
if item:
auth_data.extend(item.split(','))
auth_data = list(set(auth_data))
auth_data.sort()
auth_data_str = ",".join(auth_data)
return auth_data_str
def get_user_auth(db: Session, email):
"""
获取用户权限信息
"""
user = db.query(User).filter_by(email=email).first()
user_auth_data_str = user.auth_data
if not user.auth_data:
if user.department:
department_id_list = [int(d_id) for d_id in user.department.split(',')]
department_auth_data_list = [
item.auth_data
for item in db.query(Department).filter(Department.id.in_(department_id_list))]
user_auth_data_str = marge_auth_data(department_auth_data_list)
return user_auth_data_str
def file_md5(head_body):
"""
:param head_body:file head 2048 bytes
:return: str
"""
md = md5()
md.update(head_body)
return md.hexdigest()
def create_file(db: Session, file: UploadFile, user_id):
common_type, file_type = file.content_type.split('/')
if common_type not in {'image', 'application'}:
raise HTTPException(detail='不允许上传该类型文件', status_code=403)
content_start = file.file.read(1024)
file_md = file_md5(content_start)
save_path = Path(f"static/upload_files/{common_type}")
file_save_path = save_path / file_md
file_url = f'/static/upload_files/{common_type}/{file_md}'
if not os.path.exists(save_path):
os.makedirs(save_path)
has_uploaded = db.query(UserFile).filter(UserFile.md == file_md, UserFile.user_id == user_id).first()
if not os.path.exists(file_save_path):
with open(file_save_path, 'wb') as f:
f.write(content_start)
while True:
content = file.file.read(10240)
if content:
f.write(content)
else:
break
if not has_uploaded:
user_file = UserFile(md=file_md, mime_type=file.content_type, file_name=file.filename, user_id=user_id,
common_type=common_type, file_url=file_url)
db.add(user_file)
db.commit()
db.refresh(user_file)
return user_file
else:
return has_uploaded