import json import os import urllib.parse import uuid import openpyxl from django.apps import apps from django.core.exceptions import ValidationError from django.shortcuts import render from openpyxl import load_workbook from django.conf import settings from django.contrib.auth.decorators import login_required from django.contrib.staticfiles import finders from django.core.files.storage import default_storage from django.http import FileResponse, HttpResponseNotFound, JsonResponse from django.views.decorators.csrf import csrf_protect from rest_framework.serializers import ModelSerializer from application import fac_mgnt from application.fac_mgnt.models import InvoiceRecord from application.org_mgnt.models import SecondaryDepartment from application.pjt_mgnt.models import ProjectLedger def error_page(request): return render(request, 'error_page.html') @login_required def download_template(request, template_name): """ 该视图提供下载指定的Excel模板文件。 参数: request: HttpRequest对象。 template_name: 请求下载的Excel模板文件的名称,期望是一个字符串。 fields: 包含字段信息的字典,用于生成Excel文件的表头和格式。 返回: FileResponse对象,允许用户下载生成的Excel文件。 如果文件未找到,则返回一个HttpResponseNotFound响应。 """ # 使用finders.find()根据文件名在Django的静态文件目录中查找文件的绝对路径 file_path = finders.find(f'excels/{template_name}') # 如果文件路径不存在,返回404响应 if not file_path: return HttpResponseNotFound('

文件未找到

') # 创建FileResponse对象,设置为以附件形式下载,设置MIME类型为Excel文件 response = FileResponse(open(file_path, 'rb'), content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') # 设置Content-Disposition头,告诉浏览器这是一个需要下载的文件 # 使用urllib.parse.quote对文件名进行URL编码,以确保文件名中包含的特殊字符不会引起问题 response['Content-Disposition'] = f'attachment; filename="{urllib.parse.quote(template_name)}"' return response @login_required def download_excel_template(request, template_name, fields): """ 该视图提供下载指定的Excel模板文件。 参数: request: HttpRequest对象。 template_name: 请求下载的Excel模板文件的名称,期望是一个字符串。 fields: 包含字段信息的字典,用于生成Excel文件的表头和格式。 返回: FileResponse对象,允许用户下载生成的Excel文件。 如果文件未找到,则返回一个HttpResponseNotFound响应。 """ fields = json.loads(urllib.parse.unquote(fields).replace("'", '"')) # 创建一个新的工作簿 wb = openpyxl.Workbook() ws = wb.active # 设置表头行 headers = [field['label'] for field in fields if field['is_add']] ws.append(headers) # 保存生成的Excel文件到指定路径 static_dir = os.path.join(settings.BASE_DIR, 'static', 'excels') if not os.path.exists(static_dir): os.makedirs(static_dir) temp_file_path = os.path.join(static_dir, template_name) wb.save(temp_file_path) # 创建FileResponse对象,设置为以附件形式下载,设置MIME类型为Excel文件 response = FileResponse(open(temp_file_path, 'rb'), content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet') response['Content-Disposition'] = f'attachment; filename="{urllib.parse.quote(template_name)}"' return response @csrf_protect @login_required def common_excel_parse(request): """ 该函数用于解析上传的Excel文件并返回数据。 Args: request: HTTP请求对象,包含上传的Excel文件。 Returns: JsonResponse: 包含解析后的Excel数据的JSON响应,或包含错误消息的JSON响应。 Raises: N/A """ # 如果是POST请求并且有上传的文件 if request.method == 'POST' and request.FILES: # 获取上传的Excel文件 excel_file = request.FILES.get('excel_file') # 如果没有提供文件,返回错误响应 if not excel_file: return JsonResponse({'error': '请先选择文件。'}, status=400) # 获取模型名称 model_config = request.POST.get('model_config', '') # 动态获取模型 try: app_label, model_name = model_config.split('.') model = apps.get_model(app_label, model_name) except (LookupError, KeyError, ValueError): return JsonResponse({'error': '模型配置错误。'}, status=400) # 保存文件到服务器 file_name = default_storage.save(excel_file.name, excel_file) file_path = os.path.join(settings.MEDIA_ROOT, file_name) try: # 打开并解析Excel文件 workbook = load_workbook(file_path, data_only=True) sheet = workbook.active # 读取第一行作为表头 header_row = [cell.value for cell in sheet[1]] # 获取字段名和 verbose_name,排除索引字段 model_fields = [field.name for field in model._meta.fields if not field.primary_key and field.verbose_name in header_row] model_verbose_name = [field.verbose_name for field in model._meta.fields if not field.primary_key and field.verbose_name in header_row] if not all(item in model_verbose_name for item in header_row): return JsonResponse({'error': '表头不匹配,请使用正确的Excel上传模板。'}, status=400) # 组成 fields_map fields_map = dict(zip(model_fields, header_row)) fields_map_nf = dict(zip(header_row, model_fields)) # 创建一个映射,将Excel表头映射到模型字段名 header_to_field_map = {header: fields_map_nf[header] for header in header_row} instance_list = [] for row in sheet.iter_rows(min_row=2, values_only=True): if not all(value is None for value in row): instance_list.append({header_to_field_map[header]: value for header, value in zip(header_row, row)}) # 清理,删除上传的文件 os.remove(file_path) return JsonResponse({"table_data": instance_list, "fields_map": fields_map}, safe=False) except Exception as e: # 清理,删除上传的文件 os.remove(file_path) return JsonResponse({'error': f'解析文件时出错: {str(e)}'}, status=500) return JsonResponse({'error': '请求错误'}, status=400) @csrf_protect @login_required def save_excel_table_data(request): if request.method == 'POST': try: data = json.loads(request.body) model_config = data.get('model_config') table_data = data.get('table_data') if not model_config or not table_data: return JsonResponse({'error': '缺少必要的参数'}, status=400) # 分割 model_config 以获取 app_label 和 model_name try: app_label, model_name = model_config.split('.') Model = apps.get_model(app_label, model_name) except (ValueError, LookupError): return JsonResponse({'error': '无效的 model_config'}, status=400) instances = [] for row_data in table_data: instance_data = {} for field_name, value in row_data.items(): field = Model._meta.get_field(field_name) if field.is_relation: related_model = field.related_model related_field_name_list = related_model._meta.fields for related_field_name in related_field_name_list: try: related_instance = related_model.objects.get(**{related_field_name.name: value}) value = related_instance instance_data[field_name] = value break except Exception: continue else: instance_data[field_name] = value instance = Model(**instance_data) try: instance.full_clean() instances.append(instance) except ValidationError as e: return JsonResponse({'error': f'数据校验错误: {e.message_dict}'}, status=400) # 批量保存数据 Model.objects.bulk_create(instances) return JsonResponse({'success': '数据保存成功'}, status=201) except json.JSONDecodeError: return JsonResponse({'error': '无效的JSON数据'}, status=400) except Exception as e: return JsonResponse({'error': f'保存数据时出错: {str(e)}'}, status=500) return JsonResponse({'error': '请求错误'}, status=400) @login_required def load_secondary_departments(request): primary_department_id = request.GET.get('primary_department_id') secondary_departments = SecondaryDepartment.objects.filter(primary_department_id=primary_department_id).order_by( 'secondary_department_name') return JsonResponse(list(secondary_departments.values('secondary_department_id', 'secondary_department_name')), safe=False) @csrf_protect @login_required def save_excel_table_data_fac_inv(request): if request.method == 'POST': try: data = json.loads(request.body) model_config = data.get('model_config') table_data = data.get('table_data') if not model_config or not table_data: return JsonResponse({'error': '缺少必要的参数'}, status=400) # 分割 model_config 以获取 app_label 和 model_name try: app_label, model_name = model_config.split('.') Model = apps.get_model(app_label, model_name) except (ValueError, LookupError): return JsonResponse({'error': '无效的 model_config'}, status=400) instances = [] for row_data in table_data: row_data['project_id'] = ProjectLedger.objects.get(project_name=row_data['project_name']) instance = Model(**row_data) try: instance.full_clean() instances.append(instance) except ValidationError as e: return JsonResponse({'error': f'数据校验错误: {e.message_dict}'}, status=400) # 批量保存数据 Model.objects.bulk_create(instances) return JsonResponse({'success': '数据保存成功'}, status=201) except json.JSONDecodeError: return JsonResponse({'error': '无效的JSON数据'}, status=400) except Exception as e: return JsonResponse({'error': f'保存数据时出错: {str(e)}'}, status=500) return JsonResponse({'error': '请求错误'}, status=400) @csrf_protect @login_required def save_excel_table_data_fac_rep(request): if request.method == 'POST': try: data = json.loads(request.body) model_config = data.get('model_config') table_data = data.get('table_data') if not model_config or not table_data: return JsonResponse({'error': '缺少必要的参数'}, status=400) # 分割 model_config 以获取 app_label 和 model_name try: app_label, model_name = model_config.split('.') Model = apps.get_model(app_label, model_name) except (ValueError, LookupError): return JsonResponse({'error': '无效的 model_config'}, status=400) instances = [] for row_data in table_data: row_data['project_id'] = ProjectLedger.objects.get(project_name=row_data['project_name']) row_data['invoice'] = InvoiceRecord.objects.get(project_name=row_data['project_name']) instance = Model(**row_data) try: instance.full_clean() instances.append(instance) except ValidationError as e: return JsonResponse({'error': f'数据校验错误: {e.message_dict}'}, status=400) # 批量保存数据 Model.objects.bulk_create(instances) return JsonResponse({'success': '数据保存成功'}, status=201) except json.JSONDecodeError: return JsonResponse({'error': '无效的JSON数据'}, status=400) except Exception as e: return JsonResponse({'error': f'保存数据时出错: {str(e)}'}, status=500) return JsonResponse({'error': '请求错误'}, status=400) @csrf_protect @login_required def save_excel_table_data_inventory(request): if request.method == 'POST': try: data = json.loads(request.body) model_config = data.get('model_config') def convert_replenishment_alert(td): for item in td: item['replenishment_alert'] = True if item['replenishment_alert'] == '是' else False return td table_data = convert_replenishment_alert(data.get('table_data')) if not model_config or not table_data: return JsonResponse({'error': '缺少必要的参数'}, status=400) # 分割 model_config 以获取 app_label 和 model_name try: app_label, model_name = model_config.split('.') Model = apps.get_model(app_label, model_name) except (ValueError, LookupError): return JsonResponse({'error': '无效的 model_config'}, status=400) instances = [] for row_data in table_data: instance_data = {} for field_name, value in row_data.items(): field = Model._meta.get_field(field_name) if field.is_relation: related_model = field.related_model related_field_name_list = related_model._meta.fields for related_field_name in related_field_name_list: try: related_instance = related_model.objects.get(**{related_field_name.name: value}) value = related_instance instance_data[field_name] = value break except Exception: continue else: instance_data[field_name] = value instance = Model(**instance_data) try: instance.full_clean() instances.append(instance) except ValidationError as e: return JsonResponse({'error': f'数据校验错误: {e.message_dict}'}, status=400) # 批量保存数据 Model.objects.bulk_create(instances) return JsonResponse({'success': '数据保存成功'}, status=201) except json.JSONDecodeError: return JsonResponse({'error': '无效的JSON数据'}, status=400) except Exception as e: return JsonResponse({'error': f'保存数据时出错: {str(e)}'}, status=500) return JsonResponse({'error': '请求错误'}, status=400) @csrf_protect @login_required def save_excel_table_data_svc(request): if request.method == 'POST': try: data = json.loads(request.body) model_config = data.get('model_config') table_data = data.get('table_data') if not model_config or not table_data: return JsonResponse({'error': '缺少必要的参数'}, status=400) # 分割 model_config 以获取 app_label 和 model_name try: app_label, model_name = model_config.split('.') Model = apps.get_model(app_label, model_name) except (ValueError, LookupError): return JsonResponse({'error': '无效的 model_config'}, status=400) instances = [] for row_data in table_data: instance_data = {'usage_records': 0} for field_name, value in row_data.items(): field = Model._meta.get_field(field_name) if field.is_relation: related_model = field.related_model related_field_name_list = related_model._meta.fields for related_field_name in related_field_name_list: try: related_instance = related_model.objects.get(**{related_field_name.name: value}) value = related_instance instance_data[field_name] = value break except Exception: continue else: instance_data[field_name] = value instance = Model(**instance_data) try: instance.full_clean() instances.append(instance) except ValidationError as e: return JsonResponse({'error': f'数据校验错误: {e.message_dict}'}, status=400) # 批量保存数据 Model.objects.bulk_create(instances) return JsonResponse({'success': '数据保存成功'}, status=201) except json.JSONDecodeError: return JsonResponse({'error': '无效的JSON数据'}, status=400) except Exception as e: return JsonResponse({'error': f'保存数据时出错: {str(e)}'}, status=500) return JsonResponse({'error': '请求错误'}, status=400) @csrf_protect @login_required def save_excel_table_data_ctt(request): if request.method == 'POST': try: data = json.loads(request.body) model_config = data.get('model_config') def convert_replenishment_alert(td): for item in td: item['payment_agreement_needed'] = True if item['payment_agreement_needed'] == '是' else False item['signed'] = True if item['signed'] == '是' else False return td table_data = convert_replenishment_alert(data.get('table_data')) if not model_config or not table_data: return JsonResponse({'error': '缺少必要的参数'}, status=400) # 分割 model_config 以获取 app_label 和 model_name try: app_label, model_name = model_config.split('.') Model = apps.get_model(app_label, model_name) except (ValueError, LookupError): return JsonResponse({'error': '无效的 model_config'}, status=400) instances = [] for row_data in table_data: instance_data = {} for field_name, value in row_data.items(): field = Model._meta.get_field(field_name) if field.is_relation: related_model = field.related_model related_field_name_list = related_model._meta.fields for related_field_name in related_field_name_list: try: related_instance = related_model.objects.get(**{related_field_name.name: value}) value = related_instance instance_data[field_name] = value break except Exception: continue else: instance_data[field_name] = value instance = Model(**instance_data) try: instance.full_clean() instances.append(instance) except ValidationError as e: return JsonResponse({'error': f'数据校验错误: {e.message_dict}'}, status=400) # 批量保存数据 Model.objects.bulk_create(instances) return JsonResponse({'success': '数据保存成功'}, status=201) except json.JSONDecodeError: return JsonResponse({'error': '无效的JSON数据'}, status=400) except Exception as e: return JsonResponse({'error': f'保存数据时出错: {str(e)}'}, status=500) return JsonResponse({'error': '请求错误'}, status=400) @csrf_protect @login_required def save_excel_table_data_fix(request): if request.method == 'POST': try: data = json.loads(request.body) model_config = data.get('model_config') table_data = data.get('table_data') if not model_config or not table_data: return JsonResponse({'error': '缺少必要的参数'}, status=400) # 分割 model_config 以获取 app_label 和 model_name try: app_label, model_name = model_config.split('.') Model = apps.get_model(app_label, model_name) except (ValueError, LookupError): return JsonResponse({'error': '无效的 model_config'}, status=400) instances = [] for row_data in table_data: instance_data = {'asset_id': str(uuid.uuid4())} for field_name, value in row_data.items(): field = Model._meta.get_field(field_name) if field.is_relation: related_model = field.related_model related_field_name_list = related_model._meta.fields for related_field_name in related_field_name_list: try: related_instance = related_model.objects.get(**{related_field_name.name: value}) value = related_instance instance_data[field_name] = value break except Exception: continue else: instance_data[field_name] = value instance = Model(**instance_data) try: instance.full_clean() instances.append(instance) except ValidationError as e: return JsonResponse({'error': f'数据校验错误: {e.message_dict}'}, status=400) # 批量保存数据 Model.objects.bulk_create(instances) return JsonResponse({'success': '数据保存成功'}, status=201) except json.JSONDecodeError: return JsonResponse({'error': '无效的JSON数据'}, status=400) except Exception as e: return JsonResponse({'error': f'保存数据时出错: {str(e)}'}, status=500) return JsonResponse({'error': '请求错误'}, status=400)