from sqlalchemy import or_ from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session from App.Model import RatingConfigModel as Model from App.Schema import ScoreCardSchema as Schema from Utils.DataBase.MongoHelperUtils import MongoHelper from Utils.UniqueCoder.MysqlColumnIDUtils import set_next_id def create_scorecard(db: Session, schema: Schema.CreateScoreCardReqBody): item = Model.ScoreCard(**schema.dict()) item.id = set_next_id(db=db, num_len=6, prefix="SCORECARD_", model=Model.ScoreCard) db.add(item) db.commit() db.refresh(item) return item def delete_scorecard(db: Session, mongodb: MongoHelper, _id: str): try: item = db.query(Model.ScoreCard).filter_by(id=_id).first() mongodb.delete_data_by_id(dbname="打分卡配置", sheet="数据表", _id=item.data_table.obj_id) mongodb.delete_data_by_id(dbname="打分卡配置", sheet="指标表", _id=item.index_table.obj_id) mongodb.delete_data_by_id(dbname="打分卡配置", sheet="评级表", _id=item.rank_table.obj_id) mongodb.delete_data_by_id(dbname="打分卡配置", sheet="流程表", _id=item.process_table.obj_id) db.query(Model.ScoreCard).filter_by(id=_id).delete() db.commit() return True except IntegrityError: return False except AttributeError: db.query(Model.ScoreCard).filter_by(id=_id).delete() db.commit() return True def update_scorecard(db: Session, _id: str, schema: Schema.EditScoreCardReqBody): item = schema.del_null_value() if not item: return False db.query(Model.ScoreCard).filter_by(id=_id).update(item) db.commit() return True def get_scorecard_by_id(db: Session, _id: str): item = db.query(Model.ScoreCard).filter_by(id=_id).first() return item def get_scorecard_by_name(db: Session, name: str): item = db.query(Model.ScoreCard).filter_by(name=name).first() return item def search_scorecard(db: Session, schema: Schema.SearchScoreCardReqBody): page = 1 if schema.page < 1 else (100 if schema.page > 100 else schema.page) pagesize = 5 if schema.pagesize < 5 else (20 if schema.pagesize > 20 else schema.pagesize) items = db.query(Model.ScoreCard).filter( or_(Model.ScoreCard.name.like("%{}%".format("" if schema.name is None else schema.name)), schema.name is None), or_(Model.ScoreCard.major_category_id == schema.major_category_id, schema.major_category_id is None), or_(Model.ScoreCard.subcategory_id == schema.subcategory_id, schema.subcategory_id is None) ).offset((page-1)*pagesize).limit(pagesize).all() count = db.query(Model.ScoreCard).filter( or_(Model.ScoreCard.name.like("%{}%".format("" if schema.name is None else schema.name)), schema.name is None), or_(Model.ScoreCard.major_category_id == schema.major_category_id, schema.major_category_id is None), or_(Model.ScoreCard.subcategory_id == schema.subcategory_id, schema.subcategory_id is None) ).count() return items, count def get_sub_table_data(mongodb: MongoHelper, tablename: str, obj_id: str): Schema.GetSubTableDataQueryBody(**{"tablename": tablename, "obj_id": obj_id}) item = mongodb.find_data_by_id(dbname="打分卡配置", sheet=tablename, _id=obj_id) if not item: return None return item.get("data")