XH_Digital_Management/application/org_mgnt/views.py

680 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
from django.http import JsonResponse, Http404
from django.shortcuts import render, get_object_or_404
from django.template.loader import render_to_string
from django.urls import reverse
from django.views.decorators.csrf import csrf_protect
from rest_framework import status
from rest_framework.decorators import api_view, permission_classes
from rest_framework.pagination import PageNumberPagination
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from django.contrib.auth.decorators import login_required
from application.org_mgnt.forms import CompanyEntityForm, PrimaryDepartmentForm, SecondaryDepartmentForm
from application.org_mgnt.models import PrimaryDepartment, SecondaryDepartment, CompanyEntity, CompanyBankAccount
from application.org_mgnt.serializers import EntityChangeRecordSerializer, CompanyBankAccountSerializer
from common.auth import custom_permission_required
from common.utils.page_helper import paginate_query_and_assign_numbers
from django.http import JsonResponse
from django.core.files.storage import default_storage
from django.conf import settings
from openpyxl import load_workbook
from django.apps import apps
from rest_framework.serializers import ModelSerializer
from django.core.exceptions import ValidationError
import os
class StandardResultsSetPagination(PageNumberPagination):
page_size = 5
page_size_query_param = 'page_size'
max_page_size = 100
@login_required
@custom_permission_required('org_mgnt.view_companyentity')
def eir_list_view(request):
# 声明查询集
query_set = CompanyEntity.objects.filter().order_by('-entity_id')
# 获取查询参数
company_name = request.GET.get('company_name', '')
business_status = request.GET.get('business_status', '')
taxpayer_identification_number = request.GET.get('taxpayer_identification_number', '')
# 根据提供的参数进行筛选
if company_name:
query_set = query_set.filter(company_name__icontains=company_name)
if business_status:
query_set = query_set.filter(business_status=business_status)
if taxpayer_identification_number:
query_set = query_set.filter(taxpayer_identification_number__icontains=taxpayer_identification_number)
# 对查询结果进行分页每页10条记录
items = paginate_query_and_assign_numbers(
request=request,
queryset=query_set,
per_page=10
)
# 构建上下文查询参数字符串
query_params = '&company_name={}&business_status={}&taxpayer_identification_number={}'.format(
company_name, business_status, taxpayer_identification_number
)
# Excel上传模板
template_name = "组织管理-公司主体信息登记-Excel上传模板.xlsx"
# 构建上下文
context = {
"model_config": 'org_mgnt.CompanyEntity',
"items": items,
"breadcrumb_list": [
{"title": "首页", "name": "index"},
{"title": "组织管理", "name": "index"},
{"title": "公司主体信息登记表", "name": "eir_list"}
],
"filters": [
{"type": "text", "id": "company_name", "name": "company_name", "label": "公司名称",
"placeholder": "请输入公司名称"},
{"type": "select", "id": "business_status", "name": "business_status", "label": "公司经营状态",
"options": [{"value": "存续", "display": "存续"}, {"value": "注销", "display": "注销"}]},
{"type": "text", "id": "taxpayer_identification_number", "name": "taxpayer_identification_number",
"label": "纳税人识别号", "placeholder": "请输入纳税人识别号"}
],
"table_exclude_field_name": ['entity_id'],
"excel_upload_config": {
"template_url": reverse("download_template", kwargs={'template_name': template_name}),
"parse_url": reverse("common_excel_parse"),
"save_url": reverse("save_excel_table_data"),
"fields_preview_config": {
"company_type": {"type": "text", "width": "180px"},
"company_name": {"type": "text", "width": "180px"},
"registration_address": {"type": "text", "width": "180px"},
"registered_capital": {"type": "text", "width": "180px"},
"capital_paid_time": {"type": "date", "width": "180px"},
"capital_paid": {"type": "text", "width": "180px"},
"establishment_time": {"type": "date", "width": "180px"},
"operation_period": {"type": "text", "width": "180px"},
"taxpayer_identification_number": {"type": "text", "width": "180px"},
"business_status": {"type": "select", "width": "180px", "options": ["存续", "注销"]},
"scope_of_business": {"type": "text", "width": "180px"},
"purpose_of_company": {"type": "text", "width": "180px"},
"shareholders_and_stakes": {"type": "text", "width": "180px"},
"chairman": {"type": "text", "width": "180px"},
"directors": {"type": "text", "width": "180px"},
"supervisor_chairman": {"type": "text", "width": "180px"},
"employee_supervisor": {"type": "text", "width": "180px"},
"supervisors": {"type": "text", "width": "180px"},
"general_manager": {"type": "text", "width": "180px"},
"financial_officer": {"type": "text", "width": "180px"}
}
},
"query_params": query_params,
"form_action_url": 'eir_list',
"modify_url": reverse("eir_list_modify"),
"add_url": reverse("eir_list_add"),
"delete_url": reverse("eir_list_delete"),
}
return render(request, 'ce_list.html', context)
@login_required
@custom_permission_required('org_mgnt.add_companyentity')
def eir_list_add(request):
if request.method == 'POST':
form = CompanyEntityForm(request.POST)
if form.is_valid():
form.save()
return JsonResponse({"message": "添加成功"})
else:
form_html = render_to_string('form_partial.html', {'form': form}, request)
return JsonResponse({"form_html": form_html, "errors": form.errors}, status=400)
elif request.method == 'GET':
form = CompanyEntityForm()
form_html = render_to_string('form_partial.html', {'form': form}, request)
return JsonResponse({"form_html": form_html})
else:
return JsonResponse({"message": "无效的请求方法"}, status=405)
@login_required
@custom_permission_required('org_mgnt.change_companyentity')
def eir_list_modify(request):
if request.method == 'POST':
if 'id' in request.POST:
instance = CompanyEntity.objects.get(entity_id=request.POST['id'])
form = CompanyEntityForm(request.POST, instance=instance)
else:
form = CompanyEntityForm(request.POST)
if form.is_valid():
form.save()
return JsonResponse({"message": "保存成功"})
else:
form_html = render_to_string('form_partial.html', {'form': form}, request)
return JsonResponse({"form_html": form_html, "errors": form.errors}, status=400)
elif request.method == 'GET':
if 'id' in request.GET:
try:
instance = CompanyEntity.objects.get(entity_id=request.GET['id'])
form = CompanyEntityForm(instance=instance)
except CompanyEntity.DoesNotExist:
raise Http404("对象不存在")
else:
form = CompanyEntityForm()
form_html = render_to_string('form_partial.html', {'form': form}, request)
return JsonResponse({"form_html": form_html})
else:
return JsonResponse({"message": "无效的请求方法"}, status=405)
@login_required
@custom_permission_required('org_mgnt.delete_companyentity')
def eir_list_delete(request):
if request.method == 'GET':
target_id = request.GET.get('target_id')
CompanyEntity.objects.filter(entity_id=target_id).delete()
return JsonResponse({"message": "删除成功"})
return JsonResponse({"message": "无效的请求方法"}, status=405)
@api_view(['GET'])
@login_required
@custom_permission_required('org_mgnt.view_entitychangerecord')
def change_record_list(request, entity_id):
entity = get_object_or_404(CompanyEntity, pk=entity_id)
change_records = entity.change_records.all().order_by('record_id')
paginator = StandardResultsSetPagination()
paginated_change_records = paginator.paginate_queryset(change_records, request)
serializer = EntityChangeRecordSerializer(paginated_change_records, many=True)
return paginator.get_paginated_response(serializer.data)
@api_view(['GET'])
@login_required
@custom_permission_required('org_mgnt.view_companybankaccount')
def bank_account_list(request, entity_id):
entity = get_object_or_404(CompanyEntity, pk=entity_id)
bank_accounts = entity.bank_accounts.all().order_by('account_id')
paginator = StandardResultsSetPagination()
paginated_bank_accounts = paginator.paginate_queryset(bank_accounts, request)
serializer = CompanyBankAccountSerializer(paginated_bank_accounts, many=True)
response = paginator.get_paginated_response(serializer.data)
response_data = response.data
response_data['next_page'] = paginator.page.next_page_number() if paginator.page.has_next() else None
response_data['previous_page'] = paginator.page.previous_page_number() if paginator.page.has_previous() else None
return Response(response_data)
@api_view(['GET'])
@permission_classes([IsAuthenticated])
def get_bank_account_details(request, account_id):
bank_account = get_object_or_404(CompanyBankAccount, pk=account_id)
serializer = CompanyBankAccountSerializer(bank_account)
return Response(serializer.data)
@api_view(['POST'])
@permission_classes([IsAuthenticated])
def add_bank_account(request, entity_id):
entity = get_object_or_404(CompanyEntity, pk=entity_id)
serializer = CompanyBankAccountSerializer(data=request.data)
if serializer.is_valid():
serializer.save(company_entity=entity)
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@api_view(['PUT'])
@permission_classes([IsAuthenticated])
def update_bank_account(request, account_id):
bank_account = get_object_or_404(CompanyBankAccount, pk=account_id)
serializer = CompanyBankAccountSerializer(bank_account, data=request.data, partial=True)
if serializer.is_valid():
serializer.save()
return Response(serializer.data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@api_view(['DELETE'])
@permission_classes([IsAuthenticated])
def delete_bank_account(request, account_id):
bank_account = get_object_or_404(CompanyBankAccount, pk=account_id)
bank_account.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
@login_required
@custom_permission_required('org_mgnt.view_primarydepartment')
def pd_list_view(request):
# 声明查询集
query_set = PrimaryDepartment.objects.filter().order_by('-primary_department_id')
# 获取查询参数
department_name = request.GET.get('department_name', '')
# 根据提供的参数进行筛选
if department_name:
query_set = query_set.filter(department_name__icontains=department_name)
# 对查询结果进行分页每页10条记录
items = paginate_query_and_assign_numbers(
request=request,
queryset=query_set,
per_page=10
)
# 构建上下文查询参数字符串
query_params = '&department_name={}'.format(department_name)
# Excel上传模板
template_name = "组织管理-一级部门-Excel上传模板.xlsx"
# 构建上下文
context = {
"model_config": 'org_mgnt.PrimaryDepartment',
"items": items,
"breadcrumb_list": [
{"title": "首页", "name": "index"},
{"title": "组织管理", "name": "index"},
{"title": "一级部门表", "name": "pd_list"}
],
"filters": [
{"type": "text", "id": "department_name", "name": "department_name", "label": "一级部门名称",
"placeholder": "请输入一级部门名称"}
],
"table_exclude_field_name": ['primary_department_id'],
"excel_upload_config": {
"template_url": reverse("download_template", kwargs={'template_name': template_name}),
"parse_url": reverse("common_excel_parse"),
"save_url": reverse("save_excel_table_data"),
"fields_preview_config": {
"department_name": {"type": "text", "width": "180px"},
"description": {"type": "text", "width": "180px"},
}
},
"query_params": query_params,
"form_action_url": 'pd_list',
"modify_url": reverse("pd_list_modify"),
"add_url": reverse("pd_list_add"),
"delete_url": reverse("pd_list_delete"),
}
return render(request, 'items_list.html', context)
@login_required
@custom_permission_required('org_mgnt.add_primarydepartment')
def pd_list_add(request):
if request.method == 'POST':
form = PrimaryDepartmentForm(request.POST)
if form.is_valid():
form.save()
return JsonResponse({"message": "添加成功"})
else:
form_html = render_to_string('form_partial.html', {'form': form}, request)
return JsonResponse({"form_html": form_html, "errors": form.errors}, status=400)
elif request.method == 'GET':
form = PrimaryDepartmentForm()
form_html = render_to_string('form_partial.html', {'form': form}, request)
return JsonResponse({"form_html": form_html})
else:
return JsonResponse({"message": "无效的请求方法"}, status=405)
@login_required
@custom_permission_required('org_mgnt.change_primarydepartment')
def pd_list_modify(request):
if request.method == 'POST':
if 'id' in request.POST:
instance = PrimaryDepartment.objects.get(primary_department_id=request.POST['id'])
form = PrimaryDepartmentForm(request.POST, instance=instance)
else:
form = PrimaryDepartmentForm(request.POST)
if form.is_valid():
form.save()
return JsonResponse({"message": "保存成功"})
else:
form_html = render_to_string('form_partial.html', {'form': form}, request)
return JsonResponse({"form_html": form_html, "errors": form.errors}, status=400)
elif request.method == 'GET':
if 'id' in request.GET:
try:
instance = PrimaryDepartment.objects.get(primary_department_id=request.GET['id'])
form = PrimaryDepartmentForm(instance=instance)
except PrimaryDepartment.DoesNotExist:
raise Http404("对象不存在")
else:
form = PrimaryDepartmentForm()
form_html = render_to_string('form_partial.html', {'form': form}, request)
return JsonResponse({"form_html": form_html})
else:
return JsonResponse({"message": "无效的请求方法"}, status=405)
@login_required
@custom_permission_required('org_mgnt.delete_primarydepartment')
def pd_list_delete(request):
if request.method == 'GET':
primary_department_id = request.GET.get('primary_department_id')
PrimaryDepartment.objects.filter(primary_department_id=primary_department_id).delete()
return JsonResponse({"message": "删除成功"})
return JsonResponse({"message": "无效的请求方法"}, status=405)
@login_required
@custom_permission_required('org_mgnt.view_secondarydepartment')
def sd_list_view(request):
# 声明查询集
query_set = SecondaryDepartment.objects.filter().order_by('-secondary_department_id')
# 获取查询参数
secondary_department_name = request.GET.get('secondary_department_name', '')
# 根据提供的参数进行筛选
if secondary_department_name:
query_set = query_set.filter(secondary_department_name__icontains=secondary_department_name)
# 对查询结果进行分页每页10条记录
items = paginate_query_and_assign_numbers(
request=request,
queryset=query_set,
per_page=10
)
# 构建上下文查询参数字符串
query_params = '&secondary_department_name={}'.format(secondary_department_name)
# Excel上传模板
template_name = "组织管理-二级部门-Excel上传模板.xlsx"
# 构建上下文
context = {
"model_config": 'org_mgnt.SecondaryDepartment',
"items": items,
"breadcrumb_list": [
{"title": "首页", "name": "index"},
{"title": "组织管理", "name": "index"},
{"title": "二级部门表", "name": "sd_list"}
],
"filters": [
{"type": "text", "id": "secondary_department_name", "name": "secondary_department_name",
"label": "二级部门名称", "placeholder": "请输入二级部门名称"}
],
"table_exclude_field_name": ['secondary_department_id'],
"excel_upload_config": {
"template_url": reverse("download_template", kwargs={'template_name': template_name}),
"parse_url": reverse("common_excel_parse_sd"),
"save_url": reverse("save_excel_table_data_sd"),
"fields_preview_config": {
"primary_department": {"type": "text", "width": "180px"},
"secondary_department_name": {"type": "text", "width": "180px"},
"description": {"type": "text", "width": "180px"},
}
},
"query_params": query_params,
"form_action_url": 'sd_list',
"modify_url": reverse("sd_list_modify"),
"add_url": reverse("sd_list_add"),
"delete_url": reverse("sd_list_delete"),
}
return render(request, 'items_list.html', context)
@login_required
@custom_permission_required('org_mgnt.add_secondarydepartment')
def sd_list_add(request):
if request.method == 'POST':
form = SecondaryDepartmentForm(request.POST)
if form.is_valid():
form.save()
return JsonResponse({"message": "添加成功"})
else:
form_html = render_to_string('form_partial.html', {'form': form}, request)
return JsonResponse({"form_html": form_html, "errors": form.errors}, status=400)
elif request.method == 'GET':
form = SecondaryDepartmentForm()
form_html = render_to_string('form_partial.html', {'form': form}, request)
return JsonResponse({"form_html": form_html})
else:
return JsonResponse({"message": "无效的请求方法"}, status=405)
@login_required
@custom_permission_required('org_mgnt.change_secondarydepartment')
def sd_list_modify(request):
if request.method == 'POST':
if 'id' in request.POST:
instance = SecondaryDepartment.objects.get(secondary_department_id=request.POST['id'])
form = SecondaryDepartmentForm(request.POST, instance=instance)
else:
form = SecondaryDepartmentForm(request.POST)
if form.is_valid():
form.save()
return JsonResponse({"message": "保存成功"})
else:
form_html = render_to_string('form_partial.html', {'form': form}, request)
return JsonResponse({"form_html": form_html, "errors": form.errors}, status=400)
elif request.method == 'GET':
if 'id' in request.GET:
try:
instance = SecondaryDepartment.objects.get(secondary_department_id=request.GET['id'])
form = SecondaryDepartmentForm(instance=instance)
except SecondaryDepartment.DoesNotExist:
raise Http404("对象不存在")
else:
form = SecondaryDepartmentForm()
form_html = render_to_string('form_partial.html', {'form': form}, request)
return JsonResponse({"form_html": form_html})
else:
return JsonResponse({"message": "无效的请求方法"}, status=405)
@login_required
@custom_permission_required('org_mgnt.delete_secondarydepartment')
def sd_list_delete(request):
if request.method == 'GET':
secondary_department_id = request.GET.get('secondary_department_id')
SecondaryDepartment.objects.filter(secondary_department_id=secondary_department_id).delete()
return JsonResponse({"message": "删除成功"})
return JsonResponse({"message": "无效的请求方法"}, status=405)
@csrf_protect
@login_required
def common_excel_parse(request):
"""
该函数用于解析上传的Excel文件并返回数据。
Args:
request: HTTP请求对象包含上传的Excel文件。
Returns:
JsonResponse: 包含解析后的Excel数据的JSON响应或包含错误消息的JSON响应。
Raises:
N/A
"""
# 如果是POST请求并且有上传的文件
if request.method == 'POST' and request.FILES:
# 获取上传的Excel文件
excel_file = request.FILES.get('excel_file')
# 如果没有提供文件,返回错误响应
if not excel_file:
return JsonResponse({'error': '请先选择文件。'}, status=400)
# 获取模型名称
model_config = request.POST.get('model_config', '')
# 动态获取模型
try:
app_label, model_name = model_config.split('.')
model = apps.get_model(app_label, model_name)
except (LookupError, KeyError, ValueError):
return JsonResponse({'error': '模型配置错误。'}, status=400)
# 获取模型中配置的不需要的字段
exclude_fields = getattr(model, 'excluded_fields', [])
# 保存文件到服务器
file_name = default_storage.save(excel_file.name, excel_file)
file_path = os.path.join(settings.MEDIA_ROOT, file_name)
def create_dynamic_serializer(mod, include):
class DynamicSerializer(ModelSerializer):
class Meta:
model = mod
fields = include
def to_representation(self, instance):
representation = super().to_representation(instance)
if 'primary_department' in representation and instance.primary_department:
representation['primary_department'] = instance.primary_department.department_name
return representation
return DynamicSerializer
try:
# 打开并解析Excel文件
workbook = load_workbook(file_path, data_only=True)
sheet = workbook.active
data = []
# 读取第一行作为表头
header_row = [cell.value for cell in sheet[1]]
# 获取字段名和 verbose_name排除索引字段
model_fields = [field.name for field in model._meta.fields if
not field.primary_key and field.name not in exclude_fields]
model_verbose_name = [field.verbose_name for field in model._meta.fields if
not field.primary_key and field.name not in exclude_fields]
if not all(item in model_verbose_name for item in header_row):
return JsonResponse({'error': '表头不匹配请使用正确的Excel上传模板。'}, status=400)
# 组成 fields_map
fields_map = dict(zip(model_fields, header_row))
fields_map_nf = dict(zip(header_row, model_fields))
# 创建一个映射将Excel表头映射到模型字段名
header_to_field_map = {header: fields_map_nf[header] for header in header_row}
header_fields = [header_to_field_map[header] for header in header_row]
for row in sheet.iter_rows(min_row=2, values_only=True):
if not all(value is None for value in row):
instance_data = dict(zip(model_fields, row))
# 处理primary_department字段假设模型中有primary_department字段并且Excel文件中的表头包含一级部门名称
primary_department_name = instance_data.get('primary_department')
if primary_department_name:
try:
primary_department_instance = PrimaryDepartment.objects.get(
department_name=primary_department_name)
instance_data['primary_department'] = primary_department_instance
except PrimaryDepartment.DoesNotExist:
return JsonResponse({'error': f'找不到名称为 {primary_department_name} 的一级部门信息。'},
status=400)
instance = model(**instance_data)
try:
instance.full_clean()
data.append(instance)
except ValidationError as e:
return JsonResponse({'error': f'数据校验错误: {e.message_dict}'}, status=400)
# 动态获取序列化器
serializer_class = create_dynamic_serializer(model, include=header_fields)
serializer = serializer_class(data, many=True)
# 清理,删除上传的文件
os.remove(file_path)
return JsonResponse({"table_data": serializer.data, "fields_map": fields_map}, safe=False)
except Exception as e:
# 清理,删除上传的文件
os.remove(file_path)
return JsonResponse({'error': f'解析文件时出错: {str(e)}'}, status=500)
return JsonResponse({'error': '请求错误'}, status=400)
@csrf_protect
@login_required
def save_excel_table_data(request):
if request.method == 'POST':
try:
data = json.loads(request.body)
model_config = data.get('model_config')
table_data = data.get('table_data')
if not model_config or not table_data:
return JsonResponse({'error': '缺少必要的参数'}, status=400)
# 分割 model_config 以获取 app_label 和 model_name
try:
app_label, model_name = model_config.split('.')
Model = apps.get_model(app_label, model_name)
except (ValueError, LookupError):
return JsonResponse({'error': '无效的 model_config'}, status=400)
# 获取模型中配置的不需要的字段
exclude_fields = getattr(Model, 'excluded_fields', [])
# 创建模型实例列表
instances = []
for row_data in table_data:
# 处理primary_department字段假设模型中有primary_department字段并且table_data中包含一级部门名称
primary_department_name = row_data.get('primary_department')
if primary_department_name:
try:
primary_department_instance = PrimaryDepartment.objects.get(
department_name=primary_department_name)
row_data['primary_department'] = primary_department_instance
except PrimaryDepartment.DoesNotExist:
return JsonResponse({'error': f'找不到名称为 {primary_department_name} 的一级部门信息。'},
status=400)
try:
# 排除不需要的字段
instance_data = {key: value for key, value in row_data.items() if key not in exclude_fields}
instance = Model(**instance_data)
instance.full_clean() # 验证数据
instances.append(instance)
except ValidationError as e:
return JsonResponse({'error': f'数据校验错误: {e.message_dict}'}, status=400)
except Exception as e:
return JsonResponse({'error': f'创建实例时出错: {str(e)}'}, status=500)
# 批量创建模型实例
try:
Model.objects.bulk_create(instances)
except Exception as e:
return JsonResponse({'error': f'批量保存数据时出错: {str(e)}'}, status=500)
return JsonResponse({'message': '表格数据保存成功'}, status=200)
except json.JSONDecodeError:
return JsonResponse({'error': '无效的JSON格式'}, status=400)
except Exception as e:
return JsonResponse({'error': f'服务器内部错误: {str(e)}'}, status=500)
return JsonResponse({'error': '无效的请求方法'}, status=400)