import json from django.http import JsonResponse, Http404 from django.shortcuts import render, get_object_or_404 from django.template.loader import render_to_string from django.urls import reverse from django.views.decorators.csrf import csrf_protect from rest_framework import status from rest_framework.decorators import api_view, permission_classes from rest_framework.pagination import PageNumberPagination from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response from django.contrib.auth.decorators import login_required from application.org_mgnt.forms import CompanyEntityForm, PrimaryDepartmentForm, SecondaryDepartmentForm from application.org_mgnt.models import PrimaryDepartment, SecondaryDepartment, CompanyEntity, CompanyBankAccount from application.org_mgnt.serializers import EntityChangeRecordSerializer, CompanyBankAccountSerializer from common.auth import custom_permission_required from common.utils.page_helper import paginate_query_and_assign_numbers from django.http import JsonResponse from django.core.files.storage import default_storage from django.conf import settings from openpyxl import load_workbook from django.apps import apps from rest_framework.serializers import ModelSerializer from django.core.exceptions import ValidationError import os class StandardResultsSetPagination(PageNumberPagination): page_size = 5 page_size_query_param = 'page_size' max_page_size = 100 @login_required @custom_permission_required('org_mgnt.view_companyentity') def eir_list_view(request): # 声明查询集 query_set = CompanyEntity.objects.filter().order_by('-entity_id') # 获取查询参数 company_name = request.GET.get('company_name', '') business_status = request.GET.get('business_status', '') taxpayer_identification_number = request.GET.get('taxpayer_identification_number', '') # 根据提供的参数进行筛选 if company_name: query_set = query_set.filter(company_name__icontains=company_name) if business_status: query_set = query_set.filter(business_status=business_status) if taxpayer_identification_number: query_set = query_set.filter(taxpayer_identification_number__icontains=taxpayer_identification_number) # 对查询结果进行分页,每页10条记录 items = paginate_query_and_assign_numbers( request=request, queryset=query_set, per_page=10 ) # 构建上下文查询参数字符串 query_params = '&company_name={}&business_status={}&taxpayer_identification_number={}'.format( company_name, business_status, taxpayer_identification_number ) # Excel上传模板 template_name = "组织管理-公司主体信息登记-Excel上传模板.xlsx" # 构建上下文 context = { "model_config": 'org_mgnt.CompanyEntity', "items": items, "breadcrumb_list": [ {"title": "首页", "name": "index"}, {"title": "组织管理", "name": "index"}, {"title": "公司主体信息登记表", "name": "eir_list"} ], "filters": [ {"type": "text", "id": "company_name", "name": "company_name", "label": "公司名称", "placeholder": "请输入公司名称"}, {"type": "select", "id": "business_status", "name": "business_status", "label": "公司经营状态", "options": [{"value": "存续", "display": "存续"}, {"value": "注销", "display": "注销"}]}, {"type": "text", "id": "taxpayer_identification_number", "name": "taxpayer_identification_number", "label": "纳税人识别号", "placeholder": "请输入纳税人识别号"} ], "table_exclude_field_name": ['entity_id'], "excel_upload_config": { "template_url": reverse("download_template", kwargs={'template_name': template_name}), "parse_url": reverse("common_excel_parse"), "save_url": reverse("save_excel_table_data"), "fields_preview_config": { "company_type": {"type": "text", "width": "180px"}, "company_name": {"type": "text", "width": "180px"}, "registration_address": {"type": "text", "width": "180px"}, "registered_capital": {"type": "text", "width": "180px"}, "capital_paid_time": {"type": "date", "width": "180px"}, "capital_paid": {"type": "text", "width": "180px"}, "establishment_time": {"type": "date", "width": "180px"}, "operation_period": {"type": "text", "width": "180px"}, "taxpayer_identification_number": {"type": "text", "width": "180px"}, "business_status": {"type": "select", "width": "180px", "options": ["存续", "注销"]}, "scope_of_business": {"type": "text", "width": "180px"}, "purpose_of_company": {"type": "text", "width": "180px"}, "shareholders_and_stakes": {"type": "text", "width": "180px"}, "chairman": {"type": "text", "width": "180px"}, "directors": {"type": "text", "width": "180px"}, "supervisor_chairman": {"type": "text", "width": "180px"}, "employee_supervisor": {"type": "text", "width": "180px"}, "supervisors": {"type": "text", "width": "180px"}, "general_manager": {"type": "text", "width": "180px"}, "financial_officer": {"type": "text", "width": "180px"} } }, "query_params": query_params, "form_action_url": 'eir_list', "modify_url": reverse("eir_list_modify"), "add_url": reverse("eir_list_add"), "delete_url": reverse("eir_list_delete"), } return render(request, 'ce_list.html', context) @login_required @custom_permission_required('org_mgnt.add_companyentity') def eir_list_add(request): if request.method == 'POST': form = CompanyEntityForm(request.POST) if form.is_valid(): form.save() return JsonResponse({"message": "添加成功"}) else: form_html = render_to_string('form_partial.html', {'form': form}, request) return JsonResponse({"form_html": form_html, "errors": form.errors}, status=400) elif request.method == 'GET': form = CompanyEntityForm() form_html = render_to_string('form_partial.html', {'form': form}, request) return JsonResponse({"form_html": form_html}) else: return JsonResponse({"message": "无效的请求方法"}, status=405) @login_required @custom_permission_required('org_mgnt.change_companyentity') def eir_list_modify(request): if request.method == 'POST': if 'id' in request.POST: instance = CompanyEntity.objects.get(entity_id=request.POST['id']) form = CompanyEntityForm(request.POST, instance=instance) else: form = CompanyEntityForm(request.POST) if form.is_valid(): form.save() return JsonResponse({"message": "保存成功"}) else: form_html = render_to_string('form_partial.html', {'form': form}, request) return JsonResponse({"form_html": form_html, "errors": form.errors}, status=400) elif request.method == 'GET': if 'id' in request.GET: try: instance = CompanyEntity.objects.get(entity_id=request.GET['id']) form = CompanyEntityForm(instance=instance) except CompanyEntity.DoesNotExist: raise Http404("对象不存在") else: form = CompanyEntityForm() form_html = render_to_string('form_partial.html', {'form': form}, request) return JsonResponse({"form_html": form_html}) else: return JsonResponse({"message": "无效的请求方法"}, status=405) @login_required @custom_permission_required('org_mgnt.delete_companyentity') def eir_list_delete(request): if request.method == 'GET': target_id = request.GET.get('target_id') CompanyEntity.objects.filter(entity_id=target_id).delete() return JsonResponse({"message": "删除成功"}) return JsonResponse({"message": "无效的请求方法"}, status=405) @api_view(['GET']) @login_required @custom_permission_required('org_mgnt.view_entitychangerecord') def change_record_list(request, entity_id): entity = get_object_or_404(CompanyEntity, pk=entity_id) change_records = entity.change_records.all().order_by('record_id') paginator = StandardResultsSetPagination() paginated_change_records = paginator.paginate_queryset(change_records, request) serializer = EntityChangeRecordSerializer(paginated_change_records, many=True) return paginator.get_paginated_response(serializer.data) @api_view(['GET']) @login_required @custom_permission_required('org_mgnt.view_companybankaccount') def bank_account_list(request, entity_id): entity = get_object_or_404(CompanyEntity, pk=entity_id) bank_accounts = entity.bank_accounts.all().order_by('account_id') paginator = StandardResultsSetPagination() paginated_bank_accounts = paginator.paginate_queryset(bank_accounts, request) serializer = CompanyBankAccountSerializer(paginated_bank_accounts, many=True) response = paginator.get_paginated_response(serializer.data) response_data = response.data response_data['next_page'] = paginator.page.next_page_number() if paginator.page.has_next() else None response_data['previous_page'] = paginator.page.previous_page_number() if paginator.page.has_previous() else None return Response(response_data) @api_view(['GET']) @permission_classes([IsAuthenticated]) def get_bank_account_details(request, account_id): bank_account = get_object_or_404(CompanyBankAccount, pk=account_id) serializer = CompanyBankAccountSerializer(bank_account) return Response(serializer.data) @api_view(['POST']) @permission_classes([IsAuthenticated]) def add_bank_account(request, entity_id): entity = get_object_or_404(CompanyEntity, pk=entity_id) serializer = CompanyBankAccountSerializer(data=request.data) if serializer.is_valid(): serializer.save(company_entity=entity) return Response(serializer.data, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @api_view(['PUT']) @permission_classes([IsAuthenticated]) def update_bank_account(request, account_id): bank_account = get_object_or_404(CompanyBankAccount, pk=account_id) serializer = CompanyBankAccountSerializer(bank_account, data=request.data, partial=True) if serializer.is_valid(): serializer.save() return Response(serializer.data) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @api_view(['DELETE']) @permission_classes([IsAuthenticated]) def delete_bank_account(request, account_id): bank_account = get_object_or_404(CompanyBankAccount, pk=account_id) bank_account.delete() return Response(status=status.HTTP_204_NO_CONTENT) @login_required @custom_permission_required('org_mgnt.view_primarydepartment') def pd_list_view(request): # 声明查询集 query_set = PrimaryDepartment.objects.filter().order_by('-primary_department_id') # 获取查询参数 department_name = request.GET.get('department_name', '') # 根据提供的参数进行筛选 if department_name: query_set = query_set.filter(department_name__icontains=department_name) # 对查询结果进行分页,每页10条记录 items = paginate_query_and_assign_numbers( request=request, queryset=query_set, per_page=10 ) # 构建上下文查询参数字符串 query_params = '&department_name={}'.format(department_name) # Excel上传模板 template_name = "组织管理-一级部门-Excel上传模板.xlsx" # 构建上下文 context = { "model_config": 'org_mgnt.PrimaryDepartment', "items": items, "breadcrumb_list": [ {"title": "首页", "name": "index"}, {"title": "组织管理", "name": "index"}, {"title": "一级部门表", "name": "pd_list"} ], "filters": [ {"type": "text", "id": "department_name", "name": "department_name", "label": "一级部门名称", "placeholder": "请输入一级部门名称"} ], "table_exclude_field_name": ['primary_department_id'], "excel_upload_config": { "template_url": reverse("download_template", kwargs={'template_name': template_name}), "parse_url": reverse("common_excel_parse"), "save_url": reverse("save_excel_table_data"), "fields_preview_config": { "department_name": {"type": "text", "width": "180px"}, "description": {"type": "text", "width": "180px"}, } }, "query_params": query_params, "form_action_url": 'pd_list', "modify_url": reverse("pd_list_modify"), "add_url": reverse("pd_list_add"), "delete_url": reverse("pd_list_delete"), } return render(request, 'items_list.html', context) @login_required @custom_permission_required('org_mgnt.add_primarydepartment') def pd_list_add(request): if request.method == 'POST': form = PrimaryDepartmentForm(request.POST) if form.is_valid(): form.save() return JsonResponse({"message": "添加成功"}) else: form_html = render_to_string('form_partial.html', {'form': form}, request) return JsonResponse({"form_html": form_html, "errors": form.errors}, status=400) elif request.method == 'GET': form = PrimaryDepartmentForm() form_html = render_to_string('form_partial.html', {'form': form}, request) return JsonResponse({"form_html": form_html}) else: return JsonResponse({"message": "无效的请求方法"}, status=405) @login_required @custom_permission_required('org_mgnt.change_primarydepartment') def pd_list_modify(request): if request.method == 'POST': if 'id' in request.POST: instance = PrimaryDepartment.objects.get(primary_department_id=request.POST['id']) form = PrimaryDepartmentForm(request.POST, instance=instance) else: form = PrimaryDepartmentForm(request.POST) if form.is_valid(): form.save() return JsonResponse({"message": "保存成功"}) else: form_html = render_to_string('form_partial.html', {'form': form}, request) return JsonResponse({"form_html": form_html, "errors": form.errors}, status=400) elif request.method == 'GET': if 'id' in request.GET: try: instance = PrimaryDepartment.objects.get(primary_department_id=request.GET['id']) form = PrimaryDepartmentForm(instance=instance) except PrimaryDepartment.DoesNotExist: raise Http404("对象不存在") else: form = PrimaryDepartmentForm() form_html = render_to_string('form_partial.html', {'form': form}, request) return JsonResponse({"form_html": form_html}) else: return JsonResponse({"message": "无效的请求方法"}, status=405) @login_required @custom_permission_required('org_mgnt.delete_primarydepartment') def pd_list_delete(request): if request.method == 'GET': primary_department_id = request.GET.get('primary_department_id') PrimaryDepartment.objects.filter(primary_department_id=primary_department_id).delete() return JsonResponse({"message": "删除成功"}) return JsonResponse({"message": "无效的请求方法"}, status=405) @login_required @custom_permission_required('org_mgnt.view_secondarydepartment') def sd_list_view(request): # 声明查询集 query_set = SecondaryDepartment.objects.filter().order_by('-secondary_department_id') # 获取查询参数 secondary_department_name = request.GET.get('secondary_department_name', '') # 根据提供的参数进行筛选 if secondary_department_name: query_set = query_set.filter(secondary_department_name__icontains=secondary_department_name) # 对查询结果进行分页,每页10条记录 items = paginate_query_and_assign_numbers( request=request, queryset=query_set, per_page=10 ) # 构建上下文查询参数字符串 query_params = '&secondary_department_name={}'.format(secondary_department_name) # Excel上传模板 template_name = "组织管理-二级部门-Excel上传模板.xlsx" # 构建上下文 context = { "model_config": 'org_mgnt.SecondaryDepartment', "items": items, "breadcrumb_list": [ {"title": "首页", "name": "index"}, {"title": "组织管理", "name": "index"}, {"title": "二级部门表", "name": "sd_list"} ], "filters": [ {"type": "text", "id": "secondary_department_name", "name": "secondary_department_name", "label": "二级部门名称", "placeholder": "请输入二级部门名称"} ], "table_exclude_field_name": ['secondary_department_id'], "excel_upload_config": { "template_url": reverse("download_template", kwargs={'template_name': template_name}), "parse_url": reverse("common_excel_parse_sd"), "save_url": reverse("save_excel_table_data_sd"), "fields_preview_config": { "primary_department": {"type": "text", "width": "180px"}, "secondary_department_name": {"type": "text", "width": "180px"}, "description": {"type": "text", "width": "180px"}, } }, "query_params": query_params, "form_action_url": 'sd_list', "modify_url": reverse("sd_list_modify"), "add_url": reverse("sd_list_add"), "delete_url": reverse("sd_list_delete"), } return render(request, 'items_list.html', context) @login_required @custom_permission_required('org_mgnt.add_secondarydepartment') def sd_list_add(request): if request.method == 'POST': form = SecondaryDepartmentForm(request.POST) if form.is_valid(): form.save() return JsonResponse({"message": "添加成功"}) else: form_html = render_to_string('form_partial.html', {'form': form}, request) return JsonResponse({"form_html": form_html, "errors": form.errors}, status=400) elif request.method == 'GET': form = SecondaryDepartmentForm() form_html = render_to_string('form_partial.html', {'form': form}, request) return JsonResponse({"form_html": form_html}) else: return JsonResponse({"message": "无效的请求方法"}, status=405) @login_required @custom_permission_required('org_mgnt.change_secondarydepartment') def sd_list_modify(request): if request.method == 'POST': if 'id' in request.POST: instance = SecondaryDepartment.objects.get(secondary_department_id=request.POST['id']) form = SecondaryDepartmentForm(request.POST, instance=instance) else: form = SecondaryDepartmentForm(request.POST) if form.is_valid(): form.save() return JsonResponse({"message": "保存成功"}) else: form_html = render_to_string('form_partial.html', {'form': form}, request) return JsonResponse({"form_html": form_html, "errors": form.errors}, status=400) elif request.method == 'GET': if 'id' in request.GET: try: instance = SecondaryDepartment.objects.get(secondary_department_id=request.GET['id']) form = SecondaryDepartmentForm(instance=instance) except SecondaryDepartment.DoesNotExist: raise Http404("对象不存在") else: form = SecondaryDepartmentForm() form_html = render_to_string('form_partial.html', {'form': form}, request) return JsonResponse({"form_html": form_html}) else: return JsonResponse({"message": "无效的请求方法"}, status=405) @login_required @custom_permission_required('org_mgnt.delete_secondarydepartment') def sd_list_delete(request): if request.method == 'GET': secondary_department_id = request.GET.get('secondary_department_id') SecondaryDepartment.objects.filter(secondary_department_id=secondary_department_id).delete() return JsonResponse({"message": "删除成功"}) return JsonResponse({"message": "无效的请求方法"}, status=405) @csrf_protect @login_required def common_excel_parse(request): """ 该函数用于解析上传的Excel文件并返回数据。 Args: request: HTTP请求对象,包含上传的Excel文件。 Returns: JsonResponse: 包含解析后的Excel数据的JSON响应,或包含错误消息的JSON响应。 Raises: N/A """ # 如果是POST请求并且有上传的文件 if request.method == 'POST' and request.FILES: # 获取上传的Excel文件 excel_file = request.FILES.get('excel_file') # 如果没有提供文件,返回错误响应 if not excel_file: return JsonResponse({'error': '请先选择文件。'}, status=400) # 获取模型名称 model_config = request.POST.get('model_config', '') # 动态获取模型 try: app_label, model_name = model_config.split('.') model = apps.get_model(app_label, model_name) except (LookupError, KeyError, ValueError): return JsonResponse({'error': '模型配置错误。'}, status=400) # 获取模型中配置的不需要的字段 exclude_fields = getattr(model, 'excluded_fields', []) # 保存文件到服务器 file_name = default_storage.save(excel_file.name, excel_file) file_path = os.path.join(settings.MEDIA_ROOT, file_name) def create_dynamic_serializer(mod, include): class DynamicSerializer(ModelSerializer): class Meta: model = mod fields = include def to_representation(self, instance): representation = super().to_representation(instance) if 'primary_department' in representation and instance.primary_department: representation['primary_department'] = instance.primary_department.department_name return representation return DynamicSerializer try: # 打开并解析Excel文件 workbook = load_workbook(file_path, data_only=True) sheet = workbook.active data = [] # 读取第一行作为表头 header_row = [cell.value for cell in sheet[1]] # 获取字段名和 verbose_name,排除索引字段 model_fields = [field.name for field in model._meta.fields if not field.primary_key and field.name not in exclude_fields] model_verbose_name = [field.verbose_name for field in model._meta.fields if not field.primary_key and field.name not in exclude_fields] if not all(item in model_verbose_name for item in header_row): return JsonResponse({'error': '表头不匹配,请使用正确的Excel上传模板。'}, status=400) # 组成 fields_map fields_map = dict(zip(model_fields, header_row)) fields_map_nf = dict(zip(header_row, model_fields)) # 创建一个映射,将Excel表头映射到模型字段名 header_to_field_map = {header: fields_map_nf[header] for header in header_row} header_fields = [header_to_field_map[header] for header in header_row] for row in sheet.iter_rows(min_row=2, values_only=True): if not all(value is None for value in row): instance_data = dict(zip(model_fields, row)) # 处理primary_department字段,假设模型中有primary_department字段并且Excel文件中的表头包含一级部门名称 primary_department_name = instance_data.get('primary_department') if primary_department_name: try: primary_department_instance = PrimaryDepartment.objects.get( department_name=primary_department_name) instance_data['primary_department'] = primary_department_instance except PrimaryDepartment.DoesNotExist: return JsonResponse({'error': f'找不到名称为 {primary_department_name} 的一级部门信息。'}, status=400) instance = model(**instance_data) try: instance.full_clean() data.append(instance) except ValidationError as e: return JsonResponse({'error': f'数据校验错误: {e.message_dict}'}, status=400) # 动态获取序列化器 serializer_class = create_dynamic_serializer(model, include=header_fields) serializer = serializer_class(data, many=True) # 清理,删除上传的文件 os.remove(file_path) return JsonResponse({"table_data": serializer.data, "fields_map": fields_map}, safe=False) except Exception as e: # 清理,删除上传的文件 os.remove(file_path) return JsonResponse({'error': f'解析文件时出错: {str(e)}'}, status=500) return JsonResponse({'error': '请求错误'}, status=400) @csrf_protect @login_required def save_excel_table_data(request): if request.method == 'POST': try: data = json.loads(request.body) model_config = data.get('model_config') table_data = data.get('table_data') if not model_config or not table_data: return JsonResponse({'error': '缺少必要的参数'}, status=400) # 分割 model_config 以获取 app_label 和 model_name try: app_label, model_name = model_config.split('.') Model = apps.get_model(app_label, model_name) except (ValueError, LookupError): return JsonResponse({'error': '无效的 model_config'}, status=400) # 获取模型中配置的不需要的字段 exclude_fields = getattr(Model, 'excluded_fields', []) # 创建模型实例列表 instances = [] for row_data in table_data: # 处理primary_department字段,假设模型中有primary_department字段并且table_data中包含一级部门名称 primary_department_name = row_data.get('primary_department') if primary_department_name: try: primary_department_instance = PrimaryDepartment.objects.get( department_name=primary_department_name) row_data['primary_department'] = primary_department_instance except PrimaryDepartment.DoesNotExist: return JsonResponse({'error': f'找不到名称为 {primary_department_name} 的一级部门信息。'}, status=400) try: # 排除不需要的字段 instance_data = {key: value for key, value in row_data.items() if key not in exclude_fields} instance = Model(**instance_data) instance.full_clean() # 验证数据 instances.append(instance) except ValidationError as e: return JsonResponse({'error': f'数据校验错误: {e.message_dict}'}, status=400) except Exception as e: return JsonResponse({'error': f'创建实例时出错: {str(e)}'}, status=500) # 批量创建模型实例 try: Model.objects.bulk_create(instances) except Exception as e: return JsonResponse({'error': f'批量保存数据时出错: {str(e)}'}, status=500) return JsonResponse({'message': '表格数据保存成功'}, status=200) except json.JSONDecodeError: return JsonResponse({'error': '无效的JSON格式'}, status=400) except Exception as e: return JsonResponse({'error': f'服务器内部错误: {str(e)}'}, status=500) return JsonResponse({'error': '无效的请求方法'}, status=400)