daily/Crud/UserCrud.py

113 lines
3.8 KiB
Python

from sqlalchemy.orm import Session
from sqlalchemy import func
from Models.DepartmentModel import Department
from Models.PostModel import Post
from Models.UserModel import User, UserInfo
from Schemas import UserSchemas
import json
from Utils.SqlAlchemyUtils import get_db_i
def get_user_info(db: Session, email: str):
user = db.query(User).filter_by(email=email).first()
return user
def get_user_info_by_phone(db: Session, phone: str):
user = db.query(User).filter_by(phone=phone).first()
return user
def get_full_user_info(db: Session, email: str):
user = db.query(User).filter_by(email=email).first()
if not user:
return None
departments = []
posts = []
if user.department:
department_id_list = [int(department) for department in user.department.split(",")]
departments = [item.to_dict() for item in
db.query(Department).filter(Department.id.in_(department_id_list))]
if user.post:
post_id_list = [int(post) for post in user.post.split(",")]
posts = [item.to_dict() for item in db.query(Post).filter(Post.id.in_(post_id_list))]
user_data = user.to_dict()
user_data['department_list'] = departments
user_data['post_list'] = posts
return user_data
def create_user(db: Session, email: str):
user = User(email=email)
db.add(user)
db.commit()
db.refresh(user)
return user
def change_user_info(db: Session, email: str, user_info_change: UserSchemas.UserInfoChange):
db.query(User).filter_by(email=email).update(user_info_change.dict())
db.commit()
return db.query(User).filter_by(email=email).first()
# 根据email从用户信息表内配置用户数据,相当于用户接入后的数据库操作
def update_user_info_from_email(db: Session, email: str):
user_info = db.query(UserInfo).filter_by(email=email).first()
if user_info:
db.query(User).filter_by(email=email).update(
{'email': email, "name": user_info.name, "post": user_info.post,
"department": user_info.department, 'registered': True})
else:
db.query(User).filter_by(email=email).update(
{'email': email, 'registered': True})
db.commit()
def load_company_config_to_db():
db = get_db_i()
try:
data = json.load(open('Config/company.json', 'r', encoding="utf-8"))
db.query(Department).delete()
db.query(Post).delete()
db.commit()
for department_type in data['部门'].keys():
for department_name in data['部门'][department_type]:
department = Department(name=department_name, type=department_type)
db.add(department)
db.commit()
department_name_dic = {item.name: item.id for item in db.query(Department).all()}
for department_name, posts in data['职务'].items():
for post_name in posts:
db.add(Post(name=post_name, belong=department_name_dic[department_name]))
db.commit()
except Exception as e:
print(e)
finally:
db.close()
def get_user_by_departments(db: Session, departments: []):
set_emails = set()
users = []
for department in departments:
res = db.query(User).filter(func.find_in_set(department, User.department))
for item in res:
if item.email not in set_emails:
set_emails.add(item.email)
users.append(item.to_dict())
return users
def get_user_by_department_type(db: Session, department_type: str):
departments = [item.id for item in db.query(Department).filter_by(type=department_type)]
users = get_user_by_departments(db, departments)
users = [get_full_user_info(db, user['email']) for user in users if user["name"] != '邢军']
return users
def get_department_list(db: Session):
return db.query(Department).all()