user-wsc/User/crud.py

75 lines
2.4 KiB
Python

from sqlalchemy import or_
from werkzeug.security import generate_password_hash
from sqlalchemy.orm import Session
from Utils.EncryptProcess import EncyptUtil
from Utils.UniqueCoder.TimeSerialNumUtils import create_time_serial_num
from . import models, schemas
def get_user_by_name(db: Session, name: str):
return db.query(models.User).with_entities(
models.User.id,
models.User.name,
models.User.email,
models.User.role,
models.User.depart,
models.User.is_active
).filter(models.User.name == name).first()
def get_user_by_id(db: Session, uid: str):
return db.query(models.User).with_entities(
models.User.id,
models.User.name,
models.User.email,
models.User.role,
models.User.depart,
models.User.is_active
).filter(models.User.id == uid).first()
def get_user_by_email(db: Session, email: str):
return db.query(models.User).filter(models.User.email == email).first()
def get_users(db: Session, body: schemas.UserSearch, skip, limit):
return db.query(models.User).filter(
or_(models.User.id == body.id, body.id is None),
or_(models.User.email.like("%" + "" if body.email is None else body.email + "%"), body.email is None),
or_(models.User.name.like("%" + "" if body.name is None else body.name + "%"), body.name is None),
or_(models.User.role == body.role, body.role is None),
or_(models.User.depart == body.depart, body.depart is None),
or_(models.User.is_active == body.is_active, body.is_active is None),
).offset(skip).limit(limit).all()
def create_user(db: Session, body: schemas.UserCreate):
body.passwd = generate_password_hash(EncyptUtil.decrypt_data(encrypt_msg=body.passwd))
item = models.User(**body.dict())
item.id = create_time_serial_num(prefix="UID", suffix="")
db.add(item)
db.commit()
db.refresh(item)
return item
def delete_user(db: Session, uid: str):
db.query(models.User).filter_by(id=uid).delete()
db.commit()
return True
def edit_user(db: Session, uid: str, body: schemas.UserEdit):
db.query(models.User).filter_by(id=uid).update(body.dict())
db.commit()
return True
def reset_user_pwd(db: Session, uid: str, passwd: schemas.PasswdRegex):
passwd = generate_password_hash(passwd)
db.query(models.User).filter_by(id=uid).update({"passwd": passwd})
db.commit()
return True