import time from DBHelper.MongoHelperInstance import DB_TEST from Modules.CodeExecutor.PythonCodeExecutor import PythonCodeExecutor from Modules.Functions.FinIndex.FinIndexObj import FinIndexObj from Utils.CommonUtil import CommonUtils from Utils.ErrorUtil import JustThrowError, APIReturnError class FinanceIndexImpl(object): @staticmethod def new_calculate_func(**kwargs): """新建计算函数""" data = kwargs['data'] case = DB_TEST.find_single_column( "模型数据", "计算函数", {"函数ID": data['func_id']}, "函数ID" ) if case: return '函数ID已存在' func = FinIndexObj() func.func_id = data['func_id'] func.func_name = data['func_name'] func.describe = data['describe'] func.method = data['method'] func.param = list() for params in data['params']: param = func.Param() param.name = params['name'] param.type = params['type'] param.mark = params['mark'] func.param.append(param) func.tags = data['tags'] func.edit_date = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) func.status = 'design' DB_TEST.insert_single_data( '模型数据', '计算函数', func.fields_toggle() ) return '新建成功' @staticmethod def check_calculate_func(**kwargs): """校验计算函数""" func_name = kwargs['data']['func_name'] func_method = kwargs['data']['func_method'] params = kwargs['data']['params'] param_dict = dict() for param in params: param_dict[param['name']] = param['input'] try: my_exec = PythonCodeExecutor(func_name=func_name, func_text=func_method, params=param_dict) return my_exec.result except JustThrowError as e: raise APIReturnError(error_info=e.__str__(), status_code=202) @staticmethod def search_calculate_func(**kwargs): """查询指标函数""" def make_search_body(param): body = dict() search_keys = list(param.keys()) if 'name' in search_keys: body['函数名称'] = {"$regex": param['name']} if 'tags' in search_keys: body['标签'] = {"$in": param['tags']} return body def make_sort_body(param): if param == 'asc': body = {"编辑时间": 1} else: body = {"编辑时间": -1} return body data = kwargs['data'] sort = data['sort'] page_size = int(data['page_size']) page_no = int(data['page_no']) search_body = make_search_body(data) sort_body = make_sort_body(sort) page_size = 10 if page_size > 10 else page_size total = DB_TEST.find_all_data_with_count( '模型数据', '计算函数', search_body ) records = DB_TEST.find_data_by_page_with_sort( '模型数据', '计算函数', search_body, ['函数ID', '函数名称', '说明', '标签', '编辑时间', '状态'], sort_body, page_size, page_no ) return_list = list() if records: for record in records: return_dict = dict() return_dict['func_id'] = record['函数ID'] return_dict['func_name'] = record['函数名称'] return_dict['describe'] = record['说明'] return_dict['tags'] = record['标签'] return_dict['edit_date'] = record['编辑时间'] return_dict['status'] = record['状态'] return_list.append(return_dict) result = { "total": total, "records": return_list } return result @staticmethod def lock_calculate_func(**kwargs): """锁定函数""" fid = kwargs['fid'] func = FinIndexObj() func.status = 'published' DB_TEST.update_single_data( '模型数据', '计算函数', {'函数ID': fid}, func.fields_toggle() ) return '锁定成功' @staticmethod def view_calculate_func(**kwargs): fid = kwargs['fid'] record = DB_TEST.find_single_data( '模型数据', '计算函数', {'函数ID': fid}, ['函数名称', '说明', '方法', '参数', '标签', '状态'] ) return_dict = dict() if record: return_dict['func_name'] = record['函数名称'] return_dict['describe'] = record['说明'] return_dict['method'] = record['方法'] return_dict['params'] = list() if record['参数']: for item in record['参数']: param_dict = dict() param_dict['name'] = item['参数名'] param_dict['mark'] = item['备注'] try: param_dict['type'] = item['类型'] except KeyError: pass return_dict['params'].append(param_dict) return_dict['tags'] = record['标签'] return_dict['status'] = record['状态'] return return_dict @staticmethod def edit_calculate_func(**kwargs): data = kwargs['data'] fid = data['func_id'] func = FinIndexObj() func.func_name = data['func_name'] func.describe = data['describe'] func.method = data['method'] func.param = list() for params in data['params']: param = func.Param() param.name = params['name'] param.type = params['type'] param.mark = params['mark'] func.param.append(param) func.tags = data['tags'] func.edit_date = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) update_data = func.fields_toggle() status = DB_TEST.find_single_column( '模型数据', '计算函数', {'函数ID': fid}, '状态' ) if status == 'design': DB_TEST.update_single_data( '模型数据', '计算函数', {'函数ID': fid}, update_data ) return '修改成功' else: return '函数已锁定'