import time from DBHelper.MongoHelperInstance import DB_TEST from Modules.CodeExecutor.PythonCodeExecutor import PythonCodeExecutor from Modules.Functions.FinIndex.FinIndexObj import FinIndexObj from Modules.Functions.FinIndex.FinIndexUtils import FinanceIndexUtils from Modules.Indicators.Tags.TagsImpl import TagsImpl from Utils.ErrorUtil import JustThrowError, APIReturnError class FinanceIndexImpl(object): @staticmethod def new_calculate_func(**kwargs): """新建计算函数""" data = kwargs['data'] func = FinIndexObj() func.func_id = FinanceIndexUtils.make_new_cid() func.func_name = data['func_name'] func.describe = data['describe'] func.method = data['method'] func.param = list() for params in data['params']: param = func.Param() param.name = params['name'] param.type = params['type'] param.mark = params['mark'] func.param.append(param) func.tags = list() for tag in data['tags']: tags = func.Tags() tags.tag_name = tag['tag_name'] tags.tag_id = tag['tag_id'] func.tags.append(tags) func.edit_date = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) func.status = 'design' DB_TEST.insert_single_data( '模型数据', '计算函数', func.fields_toggle() ) func_data = func.fields_toggle() impl = TagsImpl() parameter = { "associate_id": func_data['函数ID'], "associate_name": func_data['函数名称'], "associate_type": "func", "tags_id": [item['标签ID'] for item in func_data['标签']] } impl.associate_tags(data=parameter) return '新建成功' @staticmethod def check_calculate_func(**kwargs): """校验计算函数""" func_name = kwargs['data']['func_name'] func_method = kwargs['data']['func_method'] params = kwargs['data']['params'] param_dict = dict() for param in params: param_dict[param['name']] = param['input'] try: my_exec = PythonCodeExecutor(func_name=func_name, func_text=func_method, params=param_dict) result = my_exec.result if isinstance(result, (float, int)): return result else: return '校验成功,结果为{}'.format(result) except JustThrowError as e: raise APIReturnError(error_info=e.__str__(), status_code=202) @staticmethod def search_calculate_func(**kwargs): """查询指标函数""" def make_search_body(param): body = dict() if len(param['name']): body['函数名称'] = {"$regex": param['name']} if len(param['tags']): tags_list = list() for tags in param['tags']: tags_dict = dict() tags_dict['标签名称'] = tags['tag_name'] tags_dict['标签ID'] = tags['tag_id'] tags_list.append(tags_dict) body['标签'] = {"$in": tags_list} if len(param['status']): body['状态'] = 'published' if param['status'] == '已发布' else 'design' return body def make_sort_body(param): if param == 'asc': body = {"编辑时间": 1} else: body = {"编辑时间": -1} return body data = kwargs['data'] sort = data['sort'] page_size = int(data['page_size']) page_no = int(data['page_no']) search_body = make_search_body(data) sort_body = make_sort_body(sort) page_size = 10 if page_size > 10 else page_size total = DB_TEST.find_all_data_with_count( '模型数据', '计算函数', search_body ) records = DB_TEST.find_data_by_page_with_sort( '模型数据', '计算函数', search_body, ['函数ID', '函数名称', '说明', '标签', '编辑时间', '状态'], sort_body, page_size, page_no ) return_list = list() if records: for record in records: return_dict = dict() return_dict['func_id'] = record['函数ID'] return_dict['func_name'] = record['函数名称'] return_dict['describe'] = record['说明'] return_dict['tags'] = list() for tag in record['标签']: tag_dict = dict() tag_dict['tag_name'] = tag['标签名称'] tag_dict['tag_id'] = tag['标签ID'] return_dict['tags'].append(tag_dict) return_dict['edit_date'] = record['编辑时间'] return_dict['status'] = record['状态'] return_list.append(return_dict) result = { "total": total, "records": return_list } return result @staticmethod def lock_calculate_func(**kwargs): """锁定函数""" fid = kwargs['fid'] func = FinIndexObj() func.status = 'published' DB_TEST.update_single_data( '模型数据', '计算函数', {'函数ID': fid}, func.fields_toggle() ) return '锁定成功' @staticmethod def view_calculate_func(**kwargs): fid = kwargs['fid'] record = DB_TEST.find_single_data( '模型数据', '计算函数', {'函数ID': fid}, ['函数名称', '说明', '方法', '参数', '标签', '状态'] ) return_dict = dict() if record: return_dict['func_name'] = record['函数名称'] return_dict['describe'] = record['说明'] return_dict['method'] = record['方法'] return_dict['params'] = list() if record['参数']: for item in record['参数']: param_dict = dict() param_dict['name'] = item['参数名'] param_dict['mark'] = item['备注'] try: param_dict['type'] = item['类型'] except KeyError: pass return_dict['params'].append(param_dict) return_dict['tags'] = list() for tag in record['标签']: tag_dict = dict() tag_dict['tag_name'] = tag['标签名称'] tag_dict['tag_id'] = tag['标签ID'] return_dict['tags'].append(tag_dict) return_dict['status'] = record['状态'] return return_dict @staticmethod def edit_calculate_func(**kwargs): data = kwargs['data'] fid = data['func_id'] func = FinIndexObj() func.func_name = data['func_name'] func.describe = data['describe'] func.method = data['method'] func.param = list() for params in data['params']: param = func.Param() param.name = params['name'] param.type = params['type'] param.mark = params['mark'] func.param.append(param) func.tags = list() for tag in data['tags']: tags = func.Tags() tags.tag_name = tag['tag_name'] tags.tag_id = tag['tag_id'] func.tags.append(tags) func.edit_date = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) update_data = func.fields_toggle() status = DB_TEST.find_single_column( '模型数据', '计算函数', {'函数ID': fid}, '状态' ) if status == 'design': DB_TEST.update_single_data( '模型数据', '计算函数', {'函数ID': fid}, update_data ) return '修改成功' else: return '函数已锁定' @staticmethod def delete_calculate_func(**kwargs): """删除函数""" fid = kwargs['fid'] tags = DB_TEST.find_single_column( '模型数据', '计算函数', {'函数ID': fid}, '标签' ) # 删除标签关联的函数 for tag in tags: DB_TEST.update_single_data_by_pull( '模型数据', '标签数据', {'标签ID': tag['标签ID']}, {"关联数据": {"函数ID": fid}} ) # 删除函数 DB_TEST.delete_single_data( '模型数据', '计算函数', {'函数ID': fid} ) return '操作成功'