api-datamanager/Modules/Models/Score/ScoreImpl.py

644 lines
28 KiB
Python

import json
import re
import time
import requests
from DBHelper.MongoHelperInstance import DB_TEST
from Modules.CodeExecutor.PythonCodeExecutor import PythonCodeExecutor
from Modules.Indicators.Tags.TagsImpl import TagsImpl
from Modules.Models.Score.ScoreObj import ScoreModelObj
from Modules.Models.Score.ScoreUtils import ScoreModelUtils
class ScoreModelImpl(object):
@staticmethod
def search_score_model(**kwargs):
"""搜索打分模型"""
def make_search_body(param):
body = dict()
search_keys = list(param.keys())
if 'questionnaire_name' in search_keys:
body['模型名称'] = {"$regex": param['model_name']}
if 'author' in search_keys:
body['作者'] = param['author']
if 'tags' in search_keys:
tags_list = list()
for tags in param['tags']:
tags_dict = dict()
tags_dict['标签名称'] = tags['tag_name']
tags_dict['标签ID'] = tags['tag_id']
tags_list.append(tags_dict)
body['标签'] = {"$in": tags_list}
return body
def make_sort_body(param):
if param == 'asc':
body = {"日期": 1}
else:
body = {"日期": -1}
return body
data = kwargs['data']
sort = data['sort']
page_size = int(data['page_size'])
page_no = int(data['page_no'])
search_body = make_search_body(data)
sort_body = make_sort_body(sort)
page_size = 10 if page_size > 10 else page_size
total = DB_TEST.find_all_data_with_count(
'模型数据',
'打分模型',
search_body
)
records = DB_TEST.find_data_by_page_with_sort(
'模型数据',
'打分模型',
search_body,
['模型ID', '模型名称', '作者', '日期', '标签'],
sort_body,
page_size,
page_no
)
return_list = list()
if records:
for record in records:
return_dict = dict()
return_dict['model_id'] = record['模型ID']
return_dict['model_name'] = record['模型名称']
return_dict['author'] = record['作者']
return_dict['date'] = record['日期']
return_dict['tags'] = list()
for tag in record['标签']:
tag_dict = dict()
tag_dict['tag_name'] = tag['标签名称']
tag_dict['tag_id'] = tag['标签ID']
return_dict['tags'].append(tag_dict)
return_list.append(return_dict)
result = {
"total": total,
"records": return_list
}
return result
@staticmethod
def new_score_model(**kwargs):
"""构建打分模型"""
data = kwargs['data']
model = ScoreModelObj()
if 'model_id' in data:
model.model_id = data['model_id']
status = DB_TEST.find_single_column(
'模型数据',
'打分模型',
{'模型ID': data['model_id']},
'状态'
)
model.status = status
else:
model.model_id = ScoreModelUtils.make_new_cid()
model.status = 'design'
model.model_name = data['model_name']
model.author = data['author']
model.date = time.strftime('%Y-%m-%d', time.localtime())
# 标签
model.tags = list()
for tag in data['tags']:
tags = model.Tags()
tags.tag_name = tag['tag_name']
tags.tag_id = tag['tag_id']
model.tags.append(tags)
# 维度
model.dimension = list()
# 级别设置
model.level_setting = list()
# 级别设置
for level in data['level_setting']:
setting = model.LevelSetting()
setting.level = level['level']
if 'score' in level:
setting.score = level['score']
if 'position' in level:
setting.position = level['position']
model.level_setting.append(setting)
# 维度
for dim in data['dimension']:
# 维度
dimension = model.Dimension()
dimension.name = dim['dimension_name']
dimension.first_index = list()
for first in dim['first_index']:
# 一级指标
first_level = dimension.FirstIndex()
first_level.name = first['index_name']
first_level.secondary_index = list()
for second in first['second_index']:
# 二级指标
second_level = first_level.SecondaryIndex()
second_level.name = second['index_name']
# 标准分
second_level.standard_score = second['standard_score']
# 打分设置
score_set = second['score_setting']
score_setting = second_level.ScoreSetting()
# 打分模式
score_setting.pattern = score_set['pattern']
# 区间
score_setting.interval = score_set['interval']
# 打分位
score_setting.score_position = score_set['score_position']
second_level.score_setting = score_setting
# 构建指标
index_data = second['bind_index']
bind_index = second_level.BindIndex()
# 指标名称
bind_index.index_name = index_data['index_name']
# 绑定设置
bind_set = bind_index.BindSet()
# 绑定类型-函数、接口
bind_set.name = index_data['bind_set']['name']
bind_set.type = index_data['bind_set']['type']
# 传入参数
bind_set.params = list()
params_data = index_data['bind_set']['params']
for param in params_data:
params = bind_set.Params()
# 参数值
params.param = param['param']
# 说明
params.describe = param['describe']
# 数据绑定
data_bind = params.DataBind()
# 筛选范围
filter_range = data_bind.FilterRange()
# 筛选范围
filter_range.data_base = param['data_bind']['filter_range']['data_base']
filter_range.data_sheet = param['data_bind']['filter_range']['data_sheet']
filter_range.data_field = param['data_bind']['filter_range']['data_field']
if 'data_sub_field' in param['data_bind']['filter_range']:
filter_range.data_sub_field = param['data_bind']['filter_range']['data_sub_field']
data_bind.filter_range = filter_range
# 筛选条件
search_case = data_bind.SearchCase()
# 按企业ID筛选
if 'by_cid' in param['data_bind']['search_case']:
by_cid = search_case.ByCid()
by_cid.cid = param['data_bind']['search_case']['by_cid']['cid']
search_case.by_cid = by_cid
# 按报告期筛选
if 'by_report_date' in param['data_bind']['search_case']:
by_report_date = search_case.ByReportDate()
current_year = int(time.strftime('%Y', time.localtime()))
period = param['data_bind']['search_case']['by_report_date']['report_date']
if period == '当期':
by_report_date.report_date = str(current_year) + '-12-31'
elif period == '上一期':
by_report_date.report_date = str(current_year - 1) + '-12-31'
elif period == '上二期':
by_report_date.report_date = str(current_year - 2) + '-12-31'
elif period == '上三期':
by_report_date.report_date = str(current_year - 3) + '-12-31'
elif period == '下一期':
by_report_date.report_date = str(current_year + 1) + '-12-31'
elif period == '下二期':
by_report_date.report_date = str(current_year + 2) + '-12-31'
elif period == '下三期':
by_report_date.report_date = str(current_year + 3) + '-12-31'
search_case.by_report_date = by_report_date
# 自定义筛选条件
if 'by_diy_case' in param['data_bind']['search_case']:
search_case.by_diy_case = param['data_bind']['search_case']['by_diy_case']
data_bind.search_case = search_case
# 取值设置
set_value = data_bind.SetValue()
setting_data = param['data_bind']['set_value']
def dispose_setting(dispose_data):
"""处理异常值"""
dispose_obj = set_value.DisposeExcept()
if 'null_type' in dispose_data:
dispose_obj.null_type = None
if 'numeric_type' in dispose_data:
numeric_type = dispose_obj.NumericType()
numeric_type.value = dispose_data['numeric_type']['value']
numeric_type.bit = dispose_data['numeric_type']['bit']
dispose_obj.numeric_type = numeric_type
if 'text_type' in dispose_data:
dispose_obj.text_type = dispose_data['text_type']
return dispose_obj
# 字段为空字符串
if 'case_none_str' in setting_data:
set_value.case_none_str = dispose_setting(setting_data['case_none_str'])
# 字段为空值
if 'case_none' in setting_data:
set_value.case_none = dispose_setting(setting_data['case_none'])
# 字段不存在字段
if 'case_no_key' in setting_data:
set_value.case_no_key = dispose_setting(setting_data['case_no_key'])
data_bind.set_value = set_value
params.data_bind = data_bind
bind_set.params.append(params)
bind_index.bind_set = bind_set
second_level.bind_index = bind_index
first_level.secondary_index.append(second_level)
dimension.first_index.append(first_level)
model.dimension.append(dimension)
return model.fields_toggle()
@staticmethod
def save_score_model(**kwargs):
model = kwargs['model']
DB_TEST.upsert_single_data(
'模型数据',
'打分模型',
{'模型名称': model['模型名称']},
model
)
# 关联标签
impl = TagsImpl()
parameter = {
"associate_id": model['模型ID'],
"associate_name": model['模型名称'],
"associate_type": "model",
"tags_id": [item['标签ID'] for item in model['标签']]
}
impl.associate_tags(data=parameter)
return '构建成功'
@staticmethod
def eidt_score_model(**kwargs):
model = kwargs['model']
model_id = kwargs['model_id']
status = DB_TEST.find_single_column(
'模型数据',
'打分模型',
{'模型ID': model_id},
'状态'
)
if not status:
return '模型ID不存在'
if status == 'published':
return '模型已锁定,无法更改'
DB_TEST.update_single_data(
'模型数据',
'打分模型',
{'模型ID': model_id},
model
)
# 关联标签
impl = TagsImpl()
parameter = {
"associate_id": model['模型ID'],
"associate_name": model['模型名称'],
"associate_type": "model",
"tags_id": [item['标签ID'] for item in model['标签']]
}
impl.associate_tags(data=parameter)
return '编辑成功'
@staticmethod
def compute_second_score(**kwargs):
"""二级指标计算"""
cid = kwargs['cid']
second_index = kwargs['second_index']
# 传入参数
params_dict = dict()
for item in second_index:
"""
1.根据数据绑定的内容查询参数
2.查询函数
3.传入函数参数得到结果
"""
params = item['构建指标']['绑定设置']['传入参数']
for param in params:
"""
1.筛选范围
2.筛选条件
3.设置
"""
filter_range = param['数据绑定']['筛选范围']
filter_criteria = param['数据绑定']['筛选条件']
filter_setting = param['数据绑定']['设置']
# 构建筛选条件
condition = dict()
if '企业ID筛选' in filter_criteria:
condition['企业ID'] = cid
if '报告期筛选' in filter_criteria:
period = filter_criteria['报告期筛选']
current_year = int(time.strftime('%Y', time.localtime()))
if period == '当期':
condition['报告期'] = str(current_year) + '-12-31'
elif period == '上一期':
condition['报告期'] = str(current_year - 1) + '-12-31'
elif period == '上二期':
condition['报告期'] = str(current_year - 2) + '-12-31'
elif period == '上三期':
condition['报告期'] = str(current_year - 3) + '-12-31'
elif period == '下一期':
condition['报告期'] = str(current_year + 1) + '-12-31'
elif period == '下二期':
condition['报告期'] = str(current_year + 2) + '-12-31'
elif period == '下三期':
condition['报告期'] = str(current_year + 3) + '-12-31'
if '自定义筛选' in filter_criteria:
pass
# 查询数据
column = DB_TEST.find_single_column(
filter_range['数据库'],
filter_range['数据表'],
condition,
filter_range['字段']
)
# 处理异常值
def handle_except(dispose_conditon):
if '空值' in dispose_conditon:
return None
elif '数值型' in dispose_conditon:
return round(dispose_conditon['数值型'][''], dispose_conditon['数值型']['保留位数'])
elif '文本型' in dispose_conditon:
return dispose_conditon['文本型']
# 异常设置
if len(column) == 0 and '处理空字符串' in filter_setting:
column = handle_except(filter_setting['处理空字符串'])
if column is None and '处理空值' in filter_setting:
column = handle_except(filter_setting['处理空值'])
if column is False and '处理不存在' in filter_setting:
column = handle_except(filter_setting['处理不存在'])
params_dict[param['参数']] = column
# 调用函数or接口进行计算
if item['构建指标']['绑定设置']['类型'] == '函数':
func_name = item['构建指标']['绑定设置']['名称']
func_text = DB_TEST.find_single_column(
'模型数据',
'计算函数',
{'函数名称': func_name},
'方法'
)
my_exec = PythonCodeExecutor(func_name=func_name, func_text=func_text, params=params_dict).result
else:
api_name = item['构建指标']['绑定设置']['名称']
address = DB_TEST.find_single_column(
'模型数据',
'计算接口',
{'接口名称': api_name},
'地址'
)
res = requests.get(url=address, params=params_dict)
my_exec = json.loads(res.text)
# 打分设置
score_set = item['打分设置']
length = len(score_set['区间'])
for index in range(length):
interval = score_set['区间'][index].format(my_exec)
if interval:
if score_set['模式'] == '档位打分':
return score_set['打分位'][index]
elif score_set['模式'] == '百分比打分':
"""
百分比打分两种情况:
1.区间中只包含一个数字,如:{}>100 或者 50>{},通常出现在首尾
2.区间中包含两个数字,如 100>{}>80,通常出现在中间
"""
interval_ = score_set['区间'][index]
number_list = re.findall("\d+\.?\d*", interval_)
number_list = list(map(int, number_list))
if len(number_list) != 1:
number_gap = number_list[0] - number_list[1]
score_gap = score_set['打分位'][index + 1] - score_set['打分位'][index]
return number_gap / score_gap * my_exec
else:
return score_set['打分位'][index]
else:
return score_set['打分位'][index]
def compute_index(self, **kwargs):
"""计算指标"""
# 企业名称
name = kwargs['name']
# 企业ID
cid = kwargs['cid']
# 级别设置
model = kwargs['model']
result = dict()
result['企业名称'] = name
result['维度得分'] = list()
result['合计'] = 0
result['级别'] = None
"""维度得分"""
for dim in model['维度']:
dim_score = dict()
dim_score['维度名称'] = dim['维度名称']
dim_score['一级指标'] = list()
dim_score['合计'] = 0
for first_index in dim['一级指标']:
first_dict = dict()
first_dict['指标名称'] = first_index['指标名称']
first_dict['二级指标'] = list()
first_dict['合计'] = 0
for second_index in first_index['二级指标']:
second_score = dict()
second_score['指标名称'] = second_index['指标名称']
second_score['合计'] = self.compute_second_score(cid=cid, second_index=second_index)
first_dict['合计'] += second_score['合计']
first_dict['二级指标'].append(second_score)
dim_score['一级指标'].append(first_dict)
dim_score['合计'] += first_dict['合计']
result['维度得分'].append(dim_score)
result['合计'] += dim_score['合计']
return result
def check_score_model(self, **kwargs):
model = kwargs['model']
company = kwargs['company']
test_result = list()
for item in company:
name = item['name']
cid = item['cid']
result = self.compute_index(name=name, cid=cid, model=model)
test_result.append(result)
@staticmethod
def view_score_model(**kwargs):
"""查看模型"""
mid = kwargs['mid']
record = DB_TEST.find_single_data(
'模型数据',
'打分模型',
{'模型ID': mid},
['模型ID', '模型名称', '维度', '级别设置', '标签', '日期', '作者', '状态']
)
return_dict = dict()
if record:
return_dict['model_id'] = record['模型ID']
return_dict['model_name'] = record['模型名称']
return_dict['author'] = record['作者']
return_dict['date'] = record['日期']
return_dict['tags'] = list()
for tag in record['标签']:
tag_dict = dict()
tag_dict['tag_name'] = tag['标签名称']
tag_dict['tag_id'] = tag['标签ID']
return_dict['tags'].append(tag_dict)
for level in record['级别设置']:
level_dict = dict()
level_dict['level'] = level['级别']
if '得分' in level:
level_dict['score'] = level['得分']
if '分位' in level:
level_dict['position'] = level['分位']
return_dict['dimension'] = list()
for dim in record['维度']:
dim_dict = dict()
dim_dict['name'] = dim['维度名称']
dim_dict['first_index'] = list()
for first in dim['一级指标']:
first_dict = dict()
first_dict['name'] = first['指标名称']
first_dict['secondary_index'] = list()
for second in first['二级指标']:
second_dict = dict()
second_dict['name'] = second['指标名称']
second_dict['standard_score'] = second['标准分']
second_dict['score_setting'] = dict()
second_dict['score_setting']['pattern'] = second['打分设置']['模式']
second_dict['score_setting']['interval'] = second['打分设置']['区间']
second_dict['score_setting']['score_position'] = second['打分设置']['打分位']
second_dict['bind_index'] = dict()
second_dict['bind_index']['index_name'] = second['构建指标']['指标名称']
second_dict['bind_index']['bind_set'] = dict()
second_dict['bind_index']['bind_set']['name'] = second['构建指标']['绑定设置']['名称']
second_dict['bind_index']['bind_set']['type'] = second['构建指标']['绑定设置']['类型']
second_dict['bind_index']['bind_set']['params'] = list()
for param in second['构建指标']['绑定设置']['传入参数']:
param_dict = dict()
param_dict['param'] = param['参数']
param_dict['describe'] = param['说明']
param_dict['data_bind'] = dict()
param_dict['data_bind']['filter_range'] = dict()
param_dict['data_bind']['filter_range']['data_base'] = param['数据绑定']['筛选范围']['数据库']
param_dict['data_bind']['filter_range']['data_sheet'] = param['数据绑定']['筛选范围']['数据表']
param_dict['data_bind']['filter_range']['data_field'] = param['数据绑定']['筛选范围']['字段']
if '子字段' in param['数据绑定']['筛选范围']:
param_dict['data_bind']['filter_range']['data_sub_field'] = param['数据绑定']['筛选范围']['子字段']
param_dict['data_bind']['search_case'] = dict()
if '企业ID筛选' in param['数据绑定']['筛选条件']:
param_dict['data_bind']['search_case']['by_cid'] = dict()
param_dict['data_bind']['search_case']['by_cid']['cid'] = \
param['数据绑定']['筛选条件']['企业ID筛选']['企业ID']
if '报告期筛选' in param['数据绑定']['筛选条件']:
param_dict['data_bind']['search_case']['by_report_date'] = dict()
param_dict['data_bind']['search_case']['by_report_date']['report_date'] = \
param['数据绑定']['筛选条件']['报告期筛选']['报告期']
if '自定义筛选' in param['数据绑定']['筛选条件']:
param_dict['data_bind']['search_case']['by_diy_case'] = param['数据绑定']['筛选条件']['自定义筛选']
param_dict['data_bind']['set_value'] = dict()
def set_value(value_data):
value_dict = dict()
if '空值' in value_data:
value_dict['null_type'] = None
if '数值型' in value_data:
value_dict['numeric_type'] = dict()
value_dict['numeric_type']['value'] = value_data['数值型']['']
value_dict['numeric_type']['bit'] = value_data['数值型']['保留位数']
if '文本型' in value_data:
value_dict['text_type'] = value_data['文本型']
return value_dict
param_dict['data_bind']['set_value']['case_none_str'] = set_value(
param['数据绑定']['取值设置']['处理空字符串'])
param_dict['data_bind']['set_value']['case_none'] = set_value(param['数据绑定']['取值设置']['处理空值'])
param_dict['data_bind']['set_value']['case_no_key'] = set_value(
param['数据绑定']['取值设置']['处理不存在'])
second_dict['bind_index']['bind_set']['params'].append(param_dict)
first_dict['secondary_index'].append(second_dict)
dim_dict['first_index'].append(first_dict)
return_dict['dimension'].append(dim_dict)
return return_dict
@staticmethod
def lock_score_model(**kwargs):
"""锁定模型"""
mid = kwargs['mid']
model = ScoreModelObj()
model.status = 'published'
DB_TEST.update_single_data(
'模型数据',
'打分模型',
{'模型ID': mid},
model.fields_toggle()
)
return '锁定成功'
@staticmethod
def delete_score_model(**kwargs):
"""删除模型"""
mid = kwargs['mid']
tags = DB_TEST.find_single_column(
'模型数据',
'打分模型',
{'模型ID': mid},
'标签'
)
# 删除标签关联的函数
for tag in tags:
DB_TEST.update_single_data_by_pull(
'模型数据',
'标签数据',
{'标签ID': tag['标签ID']},
{"关联数据": {"模型ID": mid}}
)
# 删除函数
DB_TEST.delete_single_data(
'模型数据',
'打分模型',
{'模型ID': mid}
)
return '操作成功'