import json import re import time import requests from DBHelper.MongoHelperInstance import DB_TEST from Modules.CodeExecutor.PythonCodeExecutor import PythonCodeExecutor from Modules.Models.Score.ScoreObj import ScoreModelObj class ScoreModelImpl(object): @staticmethod def search_score_model(**kwargs): """搜索打分模型""" total = DB_TEST.find_all_data_with_count( '模型数据', '打分模型', {'模型名称': {"$regex": kwargs['model_name']}} ) records = DB_TEST.find_all_data_with_single_sort( '模型数据', '打分模型', {'模型名称': {"$regex": kwargs['model_name']}}, ['模型名称', '作者', '日期', '标签'], {'日期': -1}, kwargs['page_size'], kwargs['page_no'] ) result = { "total": total, "records": records } return result @staticmethod def new_score_model(**kwargs): """构建打分模型""" data = kwargs['data'] case = DB_TEST.find_single_column( '模型数据', '打分模型', {'模型ID': data['model_id']}, '模型ID' ) if case: return '模型ID已存在' model = ScoreModelObj() model.model_id = data['model_id'] model.model_name = data['model_name'] model.author = data['author'] model.tags = data['tags'] model.status = 'design' model.date = time.strftime('%Y-%m-%d', time.localtime()) model.dimension = list() model.level_setting = list() # 级别设置 for level in data['level_setting']: setting = model.LevelSetting() setting.level = level['level'] setting.score = level['score'] setting.position = level['position'] model.level_setting.append(setting) # 维度 for dim in data['dimention']: dimension = model.Dimension() dimension.name = dim['dimention_name'] dimension.first_level_index = list() for first in dim['first_index']: first_level = dimension.FirstIndex() first_level.name = first['index_name'] first_level.secondary_index = list() for second in first['second_index']: second_level = first_level.SecondaryIndex() second_level.name = second['index_name'] second_level.standard_score = second['standard_score'] # 打分设置 for score_set in second['score_setting']: score_setting = second_level.ScoreSetting() score_setting.pattern = score_set['pattern'] score_setting.interval = score_set['interval'] score_setting.score_position = score_set['score_position'] second_level.score_setting = score_setting # 构建指标 for index in second['bind_index']: bind_index = second_level.BindIndex() bind_index.index_name = index['index_name'] bind_set = bind_index.BindSet() bind_set.name = index['bind_set']['name'] bind_set.type = index['bind_set']['type'] bind_set.params = list() params_data = index['bind_set']['params'] for param in params_data: params = bind_set.Params params.param = param['param'] params.describe = param['describe'] data_bind = params.DataBind() filter_range = data_bind.FilterRange() filter_range.data_base = param['data_bind']['filter_range']['data_base'] filter_range.data_sheet = param['data_bind']['filter_range']['data_sheet'] filter_range.data_field = param['data_bind']['filter_range']['data_field'] data_bind.filter_range = filter_range filter_criteria = data_bind.FilterCriteria() filter_criteria.company_id_filter = param['data_bind']['filter_criteria']['company_id_filter'] filter_criteria.report_period_filter = param['data_bind']['filter_criteria']['report_period_filter'] filter_criteria.customization_filter = param['data_bind']['filter_criteria']['customization_filter'] data_bind.filter_criteria = filter_criteria setting = data_bind.Setting() setting_data = param['data_bind']['setting'] def dispose_setting(dispose_data): """处理异常值""" dispose_obj = setting.DisposeExcept() dispose_obj.null_type = dispose_data['numeric_type']['value'] numeric_type = dispose_obj.NumericType() numeric_type.value = dispose_data['null_type']['value'] numeric_type.bit = dispose_data['null_type']['bit'] dispose_obj.numeric_type = numeric_type dispose_obj.text_type = dispose_data['text_type'] return dispose_obj if 'dispose_none_string' in setting_data: setting.dispose_none_string = dispose_setting(setting_data['dispose_none_string']) if 'dispose_none_value' in setting_data: setting.dispose_none_value = dispose_setting(setting_data['dispose_none_value']) if 'dispose_not_exist' in setting_data: setting.dispose_not_exist = dispose_setting(setting_data['dispose_not_exist']) data_bind.setting = setting params.data_bind = data_bind bind_set.params.append(params) bind_index.bind_set = bind_set second_level.bind_index = bind_index first_level.secondary_index.append(second_level) dimension.first_level_index.append(first_level) model.dimension.append(dimension) return model.fields_toggle() @staticmethod def save_score_model(**kwargs): model = kwargs['model'] DB_TEST.upsert_single_data( '模型数据', '打分模型', {'模型名称': model['模型名称']}, model ) return '构建成功' @staticmethod def compute_second_score(**kwargs): """二级指标计算""" cid = kwargs['cid'] second_index = kwargs['second_index'] # 传入参数 params_dict = dict() for item in second_index: """ 1.根据数据绑定的内容查询参数 2.查询函数 3.传入函数参数得到结果 """ params = item['构建指标']['绑定设置']['传入参数'] for param in params: """ 1.筛选范围 2.筛选条件 3.设置 """ filter_range = param['数据绑定']['筛选范围'] filter_criteria = param['数据绑定']['筛选条件'] filter_setting = param['数据绑定']['设置'] # 构建筛选条件 condition = dict() if '企业ID筛选' in filter_criteria: condition['企业ID'] = cid if '报告期筛选' in filter_criteria: period = filter_criteria['报告期筛选'] current_year = int(time.strftime('%Y', time.localtime())) if period == '当期': condition['报告期'] = str(current_year) + '-12-31' elif period == '上一期': condition['报告期'] = str(current_year-1) + '-12-31' elif period == '上二期': condition['报告期'] = str(current_year - 2) + '-12-31' elif period == '上三期': condition['报告期'] = str(current_year - 3) + '-12-31' elif period == '下一期': condition['报告期'] = str(current_year + 1) + '-12-31' elif period == '下二期': condition['报告期'] = str(current_year + 2) + '-12-31' elif period == '下三期': condition['报告期'] = str(current_year + 3) + '-12-31' if '自定义筛选' in filter_criteria: pass # 查询数据 column = DB_TEST.find_single_column( filter_range['数据库'], filter_range['数据表'], condition, filter_range['字段'] ) # 处理异常值 def handle_except(dispose_conditon): if '空值' in dispose_conditon: return None elif '数值型' in dispose_conditon: return round(dispose_conditon['数值型']['值'], dispose_conditon['数值型']['保留位数']) elif '文本型' in dispose_conditon: return dispose_conditon['文本型'] # 异常设置 if len(column) == 0 and '处理空字符串' in filter_setting: column = handle_except(filter_setting['处理空字符串']) if column is None and '处理空值' in filter_setting: column = handle_except(filter_setting['处理空值']) if column is False and '处理不存在' in filter_setting: column = handle_except(filter_setting['处理不存在']) params_dict[param['参数']] = column # 调用函数or接口进行计算 if item['构建指标']['绑定设置']['类型'] == '函数': func_name = item['构建指标']['绑定设置']['名称'] func_text = DB_TEST.find_single_column( '模型数据', '计算函数', {'函数名称': func_name}, '方法' ) my_exec = PythonCodeExecutor(func_name=func_name, func_text=func_text, params=params_dict).result else: api_name = item['构建指标']['绑定设置']['名称'] address = DB_TEST.find_single_column( '模型数据', '计算接口', {'接口名称': api_name}, '地址' ) res = requests.get(url=address, params=params_dict) my_exec = json.loads(res.text) # 打分设置 score_set = item['打分设置'] length = len(score_set['区间']) for index in range(length): interval = score_set['区间'][index].format(my_exec) if interval: if score_set['模式'] == '档位打分': return score_set['打分位'][index] elif score_set['模式'] == '百分比打分': """ 百分比打分两种情况: 1.区间中只包含一个数字,如:{}>100 或者 50>{},通常出现在首尾 2.区间中包含两个数字,如 100>{}>80,通常出现在中间 """ interval_ = score_set['区间'][index] number_list = re.findall("\d+\.?\d*", interval_) number_list = list(map(int, number_list)) if len(number_list) != 1: number_gap = number_list[0] - number_list[1] score_gap = score_set['打分位'][index+1]-score_set['打分位'][index] return number_gap / score_gap * my_exec else: return score_set['打分位'][index] else: return score_set['打分位'][index] def compute_index(self, **kwargs): """计算指标""" # 企业名称 name = kwargs['name'] # 企业ID cid = kwargs['cid'] # 级别设置 model = kwargs['model'] result = dict() result['企业名称'] = name result['维度得分'] = list() result['合计'] = 0 result['级别'] = None """维度得分""" for dim in model['维度']: dim_score = dict() dim_score['维度名称'] = dim['维度名称'] dim_score['一级指标'] = list() dim_score['合计'] = 0 for first_index in dim['一级指标']: first_dict = dict() first_dict['指标名称'] = first_index['指标名称'] first_dict['二级指标'] = list() first_dict['合计'] = 0 for second_index in first_index['二级指标']: second_score = dict() second_score['指标名称'] = second_index['指标名称'] second_score['合计'] = self.compute_second_score(cid=cid, second_index=second_index) first_dict['合计'] += second_score['合计'] first_dict['二级指标'].append(second_score) dim_score['一级指标'].append(first_dict) dim_score['合计'] += first_dict['合计'] result['维度得分'].append(dim_score) result['合计'] += dim_score['合计'] return result def check_score_model(self, **kwargs): model = kwargs['model'] company = kwargs['company'] test_result = list() for item in company: name = item['name'] cid = item['cid'] result = self.compute_index(name=name, cid=cid, model=model) test_result.append(result)