from sqlalchemy.orm import Session from Models.DepartmentModel import Department from Models.PostModel import Post from Models.UserModel import User, UserInfo from Schemas import UserSchemas import json from Utils.SqlAlchemyUtils import get_db_i def get_user_info(db: Session, openid: str): user = db.query(User).filter(openid=openid).first() return user def create_user(db: Session, openid: str): user = User(openid=openid) db.commit(user) db.refresh(user) return user def change_user_info(db: Session, openid: str, user_info_change: UserSchemas.UserInfoChange): db.query(User).filter(openid=openid).update(user_info_change.dict()) db.commit() return db.query(User).filter(openid=openid).first() # 根据email从用户信息表内配置用户数据,相当于用户接入后的数据库操作 def update_user_info_from_email(db: Session, openid: str, email: str): user_info = db.query(UserInfo).filter_by(email=email).first() if user_info: db.query(User).filter_by(openid=openid).update( {'email': email, "name": user_info.name, "post": user_info.post, "department": user_info.department, 'registered': True}) else: db.query(User).filter_by(openid=openid).update( {'email': email, 'registered': True}) db.commit() def load_company_config_to_db(): db = get_db_i() try: data = json.load(open('Config/company.json', 'r', encoding="utf-8")) db.query(Department).delete() db.query(Post).delete() db.commit() for department_type in data['部门'].keys(): for department_name in data['部门'][department_type]: department = Department(name=department_name, type=department_type) db.add(department) db.commit() department_name_dic = {item.name: item.id for item in db.query(Department).all()} for department_name, posts in data['职务'].items(): for post_name in posts: db.add(Post(name=post_name, belong=department_name_dic[department_name])) db.commit() except Exception as e: print(e) finally: db.close()