69 lines
2.2 KiB
Python
69 lines
2.2 KiB
Python
from typing import Union
|
|
|
|
from sqlalchemy import or_
|
|
from sqlalchemy.orm import Session
|
|
|
|
from Model.UserModel import User
|
|
from Schemas.UserManageSchemas import UserListQueryParams
|
|
from Utils.EncyptUtil import get_encrypt_password
|
|
|
|
|
|
def get_user_by_email(db: Session, email: str) -> Union[User, None]:
|
|
user_obj = db.query(User).filter_by(email=email).first()
|
|
return user_obj
|
|
|
|
|
|
def get_user_by_id(db: Session, user_id: str) -> Union[User, None]:
|
|
user_obj = db.query(User).filter_by(id=user_id).first()
|
|
return user_obj
|
|
|
|
|
|
def create_user(db: Session, email: str, password: str) -> User:
|
|
user = User(email=email, password=get_encrypt_password(password))
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user
|
|
|
|
|
|
def rest_user_password(db: Session, email: str, password: str):
|
|
db.query(User).filter_by(email=email).update({"password": get_encrypt_password(password)})
|
|
db.commit()
|
|
|
|
|
|
def change_user_info(db: Session, user_id, user_data):
|
|
db.query(User).filter_by(id=user_id).update(user_data)
|
|
db.commit()
|
|
|
|
|
|
def change_user_info_by_key(db: Session, user_id: str, key: str, value):
|
|
db.query(User).filter_by(id=user_id).update({key: key})
|
|
db.commit()
|
|
|
|
|
|
def query_user(db: Session, query_params: UserListQueryParams):
|
|
query = db.query(User)
|
|
if query_params.id:
|
|
query = query.filter(User.id.like(f'%{query_params.id}%'))
|
|
if query_params.email:
|
|
query = query.filter(User.email.like(f'%{query_params.email}%'))
|
|
if query_params.name:
|
|
query = query.filter(User.name.like(f'%{query_params.name}%'))
|
|
if query_params.role:
|
|
query = query.filter(User.name == query_params.role)
|
|
if query_params.post:
|
|
v = query_params.post
|
|
query = query.filter(User.post.op('regexp')(rf'^{v}$|^{v},|,{v},'))
|
|
if query_params.department:
|
|
v = query_params.department
|
|
query = query.filter(User.department.op('regexp')(rf'^{v}$|^{v},|,{v},'))
|
|
count = query.count()
|
|
query = query.offset((query_params.page - 1) * query_params.page_size).limit(query_params.page_size).all()
|
|
query = [item.as_info() for item in query]
|
|
return query, count
|
|
|
|
|
|
def delete_user(db: Session, user_id: str):
|
|
db.query(User).filter_by(id=user_id).delete()
|
|
db.commit()
|