api-datamanager/Modules/Questionnaire/Questionnairelmpl.py

375 lines
15 KiB
Python
Raw Normal View History

import time
from DBHelper.MongoHelperInstance import DB_TEST
2022-07-18 19:59:11 +08:00
from Modules.Indicators.Tags.TagsImpl import TagsImpl
from Modules.Questionnaire.QuestionnaireObj import QuestionnaireObj
2022-07-18 19:59:11 +08:00
from Modules.Questionnaire.QuestionnaireUtils import QuestionnaireUtils
from Modules.Questions.QuestionsImpl import QuestionImpl
class QuestionnaireImpl(object):
"""问卷类"""
@staticmethod
def new_questionnaire(**kwargs):
"""新建问卷"""
data = kwargs['data']
quest = QuestionnaireObj()
2022-07-18 19:59:11 +08:00
quest.questionnaire_id = QuestionnaireUtils.make_new_cid()
quest.questionnaire_name = data['questionnaire_name']
quest.questionnaire_score = data['questionnaire_score']
quest.author = data['author']
quest.date = time.strftime('%Y-%m-%d', time.localtime())
2022-07-18 19:59:11 +08:00
quest.tags = list()
for tag in data['tags']:
tags = quest.Tags()
tags.tag_name = tag['tag_name']
tags.tag_id = tag['tag_id']
quest.tags.append(tags)
quest.section = list()
for section_ in data['section']:
section = quest.Section()
section.section_name = section_['section_name']
section.total = section_['total']
section.section_content = list()
for content_ in section_['section_content']:
content = section.Content()
content.topic = content_['topic']
content.version = content_['version']
content.question_id = content_['question_id']
content.weight = content_['weight']
section.section_content.append(content)
quest.section.append(section)
DB_TEST.upsert_single_data(
'模型数据',
'问卷数据',
{'问卷名称': data['questionnaire_name']},
quest.fields_toggle()
)
2022-07-18 19:59:11 +08:00
func_data = quest.fields_toggle()
impl = TagsImpl()
parameter = {
"associate_id": func_data['问卷ID'],
"associate_name": func_data['问卷名称'],
"associate_type": "questionnaire",
"tags_id": [item['标签ID'] for item in func_data['标签']]
}
impl.associate_tags(data=parameter)
return '新建成功'
2022-07-19 15:35:03 +08:00
@staticmethod
def check_questionnaire(**kwargs):
"""校验问卷"""
data = kwargs['data']
preset_score = data['questionnaire_score']
actual_score = 0
result_dict = dict()
result_dict['总分'] = 0
2022-08-09 16:53:08 +08:00
# 校验预设分数与实际总分是否一致
for section in data['section']:
for question in section['section_content']:
actual_score += question['weight']
if actual_score != preset_score:
return '实际得分与预设分数不同'
# 校验每道题得分是否正常
for section in data['section']:
for question in section['section_content']:
result = QuestionImpl.check_question(data=question)
result_dict[question['question_name']] = result * question['weight']
result_dict['总分'] += result_dict[question['question_name']]
return result_dict
2022-07-19 15:35:03 +08:00
@staticmethod
def search_questionnaire(**kwargs):
"""查询问卷"""
def make_search_body(param):
body = dict()
2022-07-26 17:02:06 +08:00
if len(param['questionnaire_name']):
body['问卷名称'] = {"$regex": param['questionnaire_name']}
2022-07-26 17:02:06 +08:00
if len(param['author']):
body['作者'] = param['author']
2022-07-26 17:02:06 +08:00
if len(param['tags']):
2022-07-21 16:23:41 +08:00
tags_list = list()
for tags in param['tags']:
tags_dict = dict()
tags_dict['标签名称'] = tags['tag_name']
tags_dict['标签ID'] = tags['tag_id']
tags_list.append(tags_dict)
body['标签'] = {"$in": tags_list}
return body
def make_sort_body(param):
if param == 'asc':
2022-07-26 17:02:06 +08:00
body = {"_id": 1}
else:
2022-07-26 17:02:06 +08:00
body = {"_id": -1}
return body
data = kwargs['data']
sort = data['sort']
page_size = int(data['page_size'])
page_no = int(data['page_no'])
search_body = make_search_body(data)
sort_body = make_sort_body(sort)
page_size = 10 if page_size > 10 else page_size
total = DB_TEST.find_all_data_with_count(
'模型数据',
'问卷数据',
search_body
)
records = DB_TEST.find_data_by_page_with_sort(
'模型数据',
'问卷数据',
search_body,
['问卷ID', '问卷名称', '作者', '日期', '标签'],
sort_body,
page_size,
page_no
)
return_list = list()
if records:
for record in records:
return_dict = dict()
return_dict['questionnaire_id'] = record['问卷ID']
return_dict['questionnaire_name'] = record['问卷名称']
return_dict['author'] = record['作者']
return_dict['date'] = record['日期']
return_dict['tags'] = list()
for tag in record['标签']:
tag_dict = dict()
tag_dict['tag_name'] = tag['标签名称']
tag_dict['tag_id'] = tag['标签ID']
return_dict['tags'].append(tag_dict)
return_list.append(return_dict)
result = {
"total": total,
"records": return_list
}
return result
@staticmethod
def view_questionnaire(**kwargs):
"""查看问卷"""
qid = kwargs['qid']
record = DB_TEST.find_single_data(
'模型数据',
'问卷数据',
{'问卷ID': qid},
['问卷ID', '问卷名称', '问卷分值', '标签', '小节', '日期', '作者']
)
return_dict = dict()
if record:
return_dict['questionnaire_id'] = record['问卷ID']
return_dict['questionnaire_name'] = record['问卷名称']
return_dict['questionnaire_score'] = record['问卷分值']
2022-07-20 15:00:14 +08:00
return_dict['tags'] = list()
for tag in record['标签']:
tag_dict = dict()
tag_dict['tag_name'] = tag['标签名称']
tag_dict['tag_id'] = tag['标签ID']
return_dict['tags'].append(tag_dict)
return_dict['date'] = record['日期']
return_dict['author'] = record['作者']
return_dict['section'] = list()
for section in record['小节']:
section_ = dict()
section_['section_name'] = section['小节名称']
section_['total'] = section['合计']
section_['section_content'] = list()
for content in section['小节内容']:
content_ = dict()
content_['topic'] = content['题目']
content_['version'] = content['题型']
content_['question_id'] = content['问题ID']
content_['weight'] = content['权重']
section_['section_content'].append(content_)
return_dict['section'].append(section_)
return return_dict
@staticmethod
def edit_questionnaire(**kwargs):
"""编辑问卷"""
data = kwargs['data']
question_name = DB_TEST.find_single_column(
'模型数据',
'问卷数据',
{'问卷ID': data['questionnaire_id']},
'问卷名称'
)
if not question_name:
return '问卷不存在'
quest = QuestionnaireObj()
quest.questionnaire_id = data['questionnaire_id']
quest.questionnaire_name = data['questionnaire_name']
quest.questionnaire_score = data['questionnaire_score']
quest.author = data['author']
quest.date = time.strftime('%Y-%m-%d', time.localtime())
quest.tags = list()
for tag in data['tags']:
tags = quest.Tags()
tags.tag_name = tag['tag_name']
tags.tag_id = tag['tag_id']
quest.tags.append(tags)
quest.section = list()
for section_ in data['section']:
section = quest.Section()
section.section_name = section_['section_name']
section.total = section_['total']
section.section_content = list()
for content_ in section_['section_content']:
content = section.Content()
content.topic = content_['topic']
content.version = content_['version']
content.question_id = content_['question_id']
content.weight = content_['weight']
section.section_content.append(content)
quest.section.append(section)
DB_TEST.update_single_data(
'模型数据',
'问卷数据',
{'问卷ID': data['questionnaire_id']},
quest.fields_toggle()
)
return '编辑成功'
@staticmethod
def delete_questionnaire(**kwargs):
"""删除问卷"""
qid = kwargs['qid']
tags = DB_TEST.find_single_column(
'模型数据',
'问卷数据',
{'问卷ID': qid},
'标签'
)
# 删除标签关联的问题
for tag in tags:
DB_TEST.update_single_data_by_pull(
'模型数据',
'标签数据',
2022-07-21 16:23:41 +08:00
{'标签ID': tag['标签ID']},
{"关联数据": {"问卷ID": qid}}
)
# 删除问题
DB_TEST.delete_single_data(
'模型数据',
'问卷数据',
{'问卷ID': qid}
)
return '操作成功'
2022-07-29 10:29:37 +08:00
@staticmethod
def view_detail_questionnaire(**kwargs):
"""查看问卷详情"""
qid = kwargs['qid']
record = DB_TEST.find_single_data(
'模型数据',
'问卷数据',
{'问卷ID': qid},
['问卷名称', '问卷ID', '问卷分值', '标签', '小节', '日期', '作者']
)
return_dict = dict()
if record:
return_dict['questionnaire_name'] = record['问卷名称']
return_dict['questionnaire_id'] = record['问卷ID']
return_dict['questionnaire_score'] = record['问卷分值']
return_dict['tags'] = list()
for tag in record['标签']:
tag_dict = dict()
tag_dict['tag_name'] = tag['标签名称']
tag_dict['tag_id'] = tag['标签ID']
return_dict['tags'].append(tag_dict)
return_dict['date'] = record['日期']
return_dict['author'] = record['作者']
return_dict['section'] = list()
for section in record['小节']:
section_ = dict()
section_['section_name'] = section['小节名称']
section_['total'] = section['合计']
section_['section_content'] = list()
for content in section['小节内容']:
question_id = content['问题ID']
question = DB_TEST.find_single_data(
'模型数据',
'问题数据',
{'问题ID': question_id},
['问题名称', '问题ID', '问题类型', '题目描述', '题目选项', '判定设置', '输入设置']
)
question_dict = dict()
question_dict['weight'] = content['权重']
question_dict['question_name'] = question['问题名称']
question_dict['question_id'] = question['问题ID']
question_dict['question_type'] = question['问题类型']
question_dict['describe'] = question['题目描述']
if question_dict['question_type'] == '单选题-多解' or question_dict['question_type'] == '多选题-多解':
question_dict['options'] = list()
for option in question['题目选项']:
option_ = dict()
option_['percentage'] = option['得分比']
option_['describe'] = option['描述']
question_dict['options'].append(option_)
elif question_dict['question_type'] == '单选题-单解' or question_dict['question_type'] == '多选题-单解':
question_dict['options'] = list()
for option in question['题目选项']:
option_ = dict()
option_['correct'] = option['正确']
option_['describe'] = option['描述']
question_dict['options'].append(option_)
elif question_dict['question_type'] == '填空题-单解':
question_dict['decision_setting'] = dict()
question_dict['decision_setting']['answer'] = question['判定设置']['正确答案']
elif question_dict['question_type'] == '填空题-包含多解':
question_dict['decision_setting'] = dict()
question_dict['decision_setting']['contain'] = question['判定设置']['包含']
question_dict['decision_setting']['score'] = question['判定设置']['得分比']
elif question_dict['question_type'] == '填空题-档位多解':
question_dict['decision_setting'] = dict()
question_dict['decision_setting']['interval'] = question['判定设置']['区间']
question_dict['decision_setting']['score'] = question['判定设置']['得分比']
elif question_dict['question_type'] == '填空题-指定处理':
question_dict['decision_setting'] = dict()
question_dict['decision_setting']['assigned'] = question['判定设置']['指定处理']
elif question_dict['question_type'] == '自定义问题':
question_dict['decision_setting'] = dict()
question_dict['decision_setting']['assigned'] = question['判定设置']['指定处理']
question_dict['input_setting'] = list()
for input_ in question['输入设置']:
_input = dict()
_input['input'] = input_['输入']
_input['type'] = input_['类型']
_input['mark'] = input_['备注']
question_dict['input_setting'].append(_input)
section_['section_content'].append(question_dict)
return_dict['section'].append(section_)
return return_dict