api-datamanager/Modules/Filters/FiltersImpl.py

208 lines
6.7 KiB
Python
Raw Normal View History

2022-07-29 10:29:37 +08:00
import time
2022-08-01 16:45:47 +08:00
from Modules.Filters.FiltersObj import FiltersObj, data_base, DataBaseRange, DataSheetRange, FieldRange, SubFieldRange
2022-07-19 15:12:44 +08:00
from Modules.Filters.FiltersUtils import FilterUtils
from DBHelper.MongoHelperInstance import DB_TEST
class FilterImpl(object):
2022-07-29 10:29:37 +08:00
def new_filter(self, **kwargs):
2022-07-19 15:12:44 +08:00
data = kwargs['data']
2022-07-29 10:29:37 +08:00
name = DB_TEST.find_single_column(
2022-07-19 15:12:44 +08:00
'模型数据',
'筛选器数据',
{'筛选器名称': data['filter_name']},
2022-07-29 10:29:37 +08:00
'筛选器名称'
)
if name:
return '筛选器名称已存在'
2022-08-01 16:45:47 +08:00
filters = FiltersObj()
filters.filter_id = FilterUtils.make_new_cid()
filters.filter_name = data['filter_name']
filters.date = time.strftime("%Y-%m-%d", time.localtime())
# 处理筛选范围
filters.filter_range = list()
for base in data['filter_range']:
# 数据库名称
database = DataBaseRange()
database.value = base['value']
if len(base['choices']):
database.choices = list()
for sheet in base['choices']:
# 数据表名称
datasheet = DataSheetRange()
database.value = sheet['value']
if len(sheet['choices']):
datasheet.choices = list()
for field in sheet['choices']:
datafield = FieldRange()
datafield.value = field['value']
if 'choices' in field and len(field['choices']):
datafield.choices = list()
for sub in field['choices']:
subfiled = SubFieldRange()
subfiled.value = sub['value']
datafield.choices.append(subfiled)
datasheet.choices.append(datafield)
database.choices.append(datasheet)
filters.filter_range.append(database)
2022-07-29 10:29:37 +08:00
2022-08-01 16:45:47 +08:00
DB_TEST.upsert_single_data(
'模型数据',
'筛选器数据',
{'筛选器名称': data['filter_name']},
filters.fields_toggle()
)
return '新建成功'
2022-07-29 10:29:37 +08:00
def eidt_filter(self, **kwargs):
"""编辑筛选器"""
data = kwargs['data']
case1 = DB_TEST.find_single_column(
'模型数据',
'筛选器数据',
{'筛选器ID': data['filter_id']},
'筛选器名称'
)
case2 = DB_TEST.find_all_data(
'模型数据',
'筛选器数据',
{},
['筛选器名称']
)
case2 = [item['筛选器名称'] for item in case2]
case2.remove(case1)
if data['filter_name'] != case1 and data['filter_name'] in case2:
return '筛选器名称已存在'
verify_result = self.verify_data(data['filter_range'])
if verify_result == '校验通过':
filters = FiltersObj()
filters.filter_id = data['filter_id']
filters.filter_name = data['filter_name']
filters.filter_range = data['filter_range']
filters.date = time.strftime("%Y-%m-%d", time.localtime())
DB_TEST.update_single_data(
'模型数据',
'筛选器数据',
{'筛选器ID': data['filter_id']},
filters.fields_toggle()
)
return '修改成功'
else:
return verify_result
@staticmethod
def delete_filter(**kwargs):
"""删除筛选器"""
filter_id = kwargs['filter_id']
DB_TEST.delete_single_data(
'模型数据',
'筛选器数据',
{'筛选器ID': filter_id}
)
return '操作成功'
@staticmethod
def search_filter(**kwargs):
"""查询筛选器"""
filter_name = kwargs['filter_name']
page_no = kwargs['page_no']
page_size = kwargs['page_size']
if len(filter_name):
total = DB_TEST.find_all_data_with_count(
'模型数据',
'筛选器数据',
{"筛选器名称": filter_name}
)
records = DB_TEST.find_data_by_page_with_sort(
'模型数据',
'筛选器数据',
{"筛选器名称": filter_name},
[],
{'_id': -1},
page_size,
page_no
)
else:
total = DB_TEST.find_all_data_with_count(
'模型数据',
'筛选器数据',
{}
)
records = DB_TEST.find_data_by_page_with_sort(
'模型数据',
'筛选器数据',
{},
[],
{'_id': -1},
page_size,
page_no
)
record_list = list()
for record in records:
record_dict = dict()
record_dict['filter_name'] = record['筛选器名称']
if '筛选器ID' in record:
record_dict['filter_id'] = record['筛选器ID']
record_dict['filter_range'] = record['筛选范围']
record_dict['date'] = record['创建日期']
record_list.append(record_dict)
result = {
"total": total,
"records": record_list
}
return result
def search_db_gather(self, **kwargs):
"""查询数据库所有集合名"""
db_name = kwargs['db_name']
if len(db_name):
records = DB_TEST.find_database_all_gather(db_name)
return records
else:
db_name = None
records = DB_TEST.find_database_all_gather(db_name)
return_list = list()
for record in records:
if record in self.filter_list:
return_list.append(record)
return return_list
@staticmethod
def search_db_field(**kwargs):
"""查询数据集中的所有字段"""
database = kwargs['database']
datasheet = kwargs['datasheet']
records = DB_TEST.find_single_data(
database,
datasheet,
{},
[]
2022-07-19 15:12:44 +08:00
)
2022-07-29 10:29:37 +08:00
return_list = list()
for key, value in records.items():
if type(value) == dict:
return_dict = dict()
return_dict[key] = value
return_list.append(return_dict)
continue
return_list.append(key)
return return_list