140 lines
4.2 KiB
Python
140 lines
4.2 KiB
Python
from bson import ObjectId
|
|
from .Types import RateRes
|
|
from Utils.DataUtils import JsDict
|
|
from Utils.MongoUtils import MongoConnect
|
|
|
|
|
|
class Rate:
|
|
def __init__(self, mongo_db: MongoConnect, doc_id: str = None, db_name: str = "评级数据", create_new=False):
|
|
that = self
|
|
if doc_id:
|
|
self.doc_id = ObjectId(doc_id)
|
|
else:
|
|
if create_new:
|
|
self.doc_id = self.create_rate_doc()
|
|
else:
|
|
raise Exception("doc_id为空")
|
|
self.mongo_db = mongo_db
|
|
self.db_name = db_name
|
|
|
|
class RateData:
|
|
org_excel_data = JsDict({})
|
|
index_excel_data = JsDict({})
|
|
org_form_data = JsDict({})
|
|
index_form_data = JsDict({})
|
|
api_data = JsDict({})
|
|
man_data = JsDict({})
|
|
rate_res:RateRes = JsDict({})
|
|
|
|
def __init__(self):
|
|
pass
|
|
|
|
def load_data(self):
|
|
data = that.mongo_db.db[that.db_name].find_one({'_id': that.doc_id})
|
|
if not data:
|
|
raise Exception(f"{that.doc_id}文档未创建")
|
|
for key in self.__dict__.keys():
|
|
super().__setattr__(key, data.get(key))
|
|
|
|
def __setattr__(self, key, value):
|
|
if value:
|
|
value = JsDict(value)
|
|
that.mongo_db.db[that.db_name].find_one_and_update({'_id': that.doc_id}, {"$set": {key: value}})
|
|
super().__setattr__(key, value)
|
|
|
|
self.rate_data = RateData()
|
|
self.rate_data.load_data()
|
|
# 配置部分
|
|
self.index_model = None
|
|
|
|
# 需要实现的方法
|
|
@staticmethod
|
|
def parse_excel_file_to_data(file_path):
|
|
"""
|
|
解析excel文件数据
|
|
:param file_path:
|
|
:return:
|
|
"""
|
|
org_excel_data = {}
|
|
return org_excel_data
|
|
|
|
@staticmethod
|
|
def parse_org_excel_data_func(org_excel_data):
|
|
"""
|
|
解析原始excel数据为指标所需数据
|
|
:param org_excel_data:
|
|
:return:
|
|
"""
|
|
index_excel_data = {}
|
|
return index_excel_data
|
|
|
|
@staticmethod
|
|
def parse_org_form_data_func(org_from_data):
|
|
"""
|
|
解析原始填报表单数据为指标所需数据
|
|
:param org_from_data:
|
|
:return:
|
|
"""
|
|
index_form_data = {}
|
|
return index_form_data
|
|
|
|
@staticmethod
|
|
def load_api_data_func():
|
|
"""
|
|
加载三方api数据
|
|
:return:
|
|
"""
|
|
api_data = {}
|
|
return api_data
|
|
|
|
def create_rate_doc(self):
|
|
new_doc = self.mongo_db.db[self.db_name].insert_one({})
|
|
return new_doc.inserted_id
|
|
|
|
def cal_index_func(self, data):
|
|
"""
|
|
评级接口
|
|
:param data:
|
|
:return:
|
|
"""
|
|
rate_res = {}
|
|
return rate_res
|
|
|
|
# 以下为业务接口,无需自定义实现
|
|
|
|
def parse_org_excel_data(self, org_excel_data):
|
|
index_excel_data = self.parse_org_excel_data_func(org_excel_data)
|
|
self.rate_data.index_excel_data = index_excel_data
|
|
self.rate_data.org_excel_data = org_excel_data
|
|
return self.rate_data.index_excel_data
|
|
|
|
def parse_org_form_data(self, org_from_data):
|
|
index_form_data = self.parse_org_form_data(org_from_data)
|
|
self.rate_data.index_form_data = index_form_data
|
|
self.rate_data.org_form_data = org_from_data
|
|
return self.rate_data.index_form_data
|
|
|
|
def load_api_data(self):
|
|
api_data = self.load_api_data_func()
|
|
self.rate_data.api_data = api_data
|
|
return self.rate_data.api_data
|
|
|
|
def merge_index_data(self):
|
|
def filter_not_none(data):
|
|
return (data and {k: v for k, v in data.items() if v is not None}) or {}
|
|
|
|
mgr_data = {
|
|
**filter_not_none(self.rate_data.index_form_data),
|
|
**filter_not_none(self.rate_data.index_form_data),
|
|
**filter_not_none(self.rate_data.api_data),
|
|
**filter_not_none(self.rate_data.man_data)
|
|
}
|
|
return mgr_data
|
|
|
|
# 进行打分
|
|
def cal_index(self):
|
|
mgr_data = self.rate_data.index_form_data
|
|
rate_res = self.cal_index_func(mgr_data)
|
|
self.rate_data.rate_res = rate_res
|
|
return self.rate_data.rate_res
|