from sqlalchemy import or_ from werkzeug.security import generate_password_hash from sqlalchemy.orm import Session from . import models, schemas def get_user_by_id(db: Session, uid: int): return db.query(models.User).filter(models.User.id == uid).first() def get_user_by_email(db: Session, email: str): return db.query(models.User).filter(models.User.email == email).first() def get_users(db: Session, body: schemas.UserSearch, skip, limit): return db.query(models.User).filter( or_(models.User.id == body.id, body.id is None), or_(models.User.email.like("%" + "" if body.email is None else body.email + "%"), body.email is None), or_(models.User.name.like("%" + "" if body.name is None else body.name + "%"), body.name is None), or_(models.User.role == body.role, body.role is None), or_(models.User.depart == body.depart, body.depart is None), or_(models.User.is_active == body.is_active, body.is_active is None), ).offset(skip).limit(limit).all() def create_user(db: Session, body: schemas.UserCreate): body.passwd = generate_password_hash(body.passwd) item = models.User(**body.dict()) db.add(item) db.commit() db.refresh(item) return item def delete_user(db: Session, _id: int): db.query(models.User).filter_by(id=_id).delete() db.commit() return True def edit_user(db: Session, uid: int, body: schemas.UserEdit): db.query(models.User).filter_by(id=uid).update(body.dict()) db.commit() return True def reset_user_pwd(db: Session, uid: int, passwd: schemas.PasswdRegex): passwd = generate_password_hash(passwd) db.query(models.User).filter_by(id=uid).update({"passwd": passwd}) db.commit() return True