XH_Digital_Management/common/views.py

562 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import urllib.parse
import uuid
import openpyxl
from django.apps import apps
from django.core.exceptions import ValidationError
from django.shortcuts import render
from openpyxl import load_workbook
from django.conf import settings
from django.contrib.auth.decorators import login_required
from django.contrib.staticfiles import finders
from django.core.files.storage import default_storage
from django.http import FileResponse, HttpResponseNotFound, JsonResponse
from django.views.decorators.csrf import csrf_protect
from rest_framework.serializers import ModelSerializer
from application import fac_mgnt
from application.fac_mgnt.models import InvoiceRecord
from application.org_mgnt.models import SecondaryDepartment
from application.pjt_mgnt.models import ProjectLedger
def error_page(request):
return render(request, 'error_page.html')
@login_required
def download_template(request, template_name):
"""
该视图提供下载指定的Excel模板文件。
参数:
request: HttpRequest对象。
template_name: 请求下载的Excel模板文件的名称期望是一个字符串。
fields: 包含字段信息的字典用于生成Excel文件的表头和格式。
返回:
FileResponse对象允许用户下载生成的Excel文件。
如果文件未找到则返回一个HttpResponseNotFound响应。
"""
# 使用finders.find()根据文件名在Django的静态文件目录中查找文件的绝对路径
file_path = finders.find(f'excels/{template_name}')
# 如果文件路径不存在返回404响应
if not file_path:
return HttpResponseNotFound('<h1>文件未找到</h1>')
# 创建FileResponse对象设置为以附件形式下载设置MIME类型为Excel文件
response = FileResponse(open(file_path, 'rb'),
content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
# 设置Content-Disposition头告诉浏览器这是一个需要下载的文件
# 使用urllib.parse.quote对文件名进行URL编码以确保文件名中包含的特殊字符不会引起问题
response['Content-Disposition'] = f'attachment; filename="{urllib.parse.quote(template_name)}"'
return response
@login_required
def download_excel_template(request, template_name, fields):
"""
该视图提供下载指定的Excel模板文件。
参数:
request: HttpRequest对象。
template_name: 请求下载的Excel模板文件的名称期望是一个字符串。
fields: 包含字段信息的字典用于生成Excel文件的表头和格式。
返回:
FileResponse对象允许用户下载生成的Excel文件。
如果文件未找到则返回一个HttpResponseNotFound响应。
"""
fields = json.loads(urllib.parse.unquote(fields).replace("'", '"'))
# 创建一个新的工作簿
wb = openpyxl.Workbook()
ws = wb.active
# 设置表头行
headers = [field['label'] for field in fields if field['is_add']]
ws.append(headers)
# 保存生成的Excel文件到指定路径
static_dir = os.path.join(settings.BASE_DIR, 'static', 'excels')
if not os.path.exists(static_dir):
os.makedirs(static_dir)
temp_file_path = os.path.join(static_dir, template_name)
wb.save(temp_file_path)
# 创建FileResponse对象设置为以附件形式下载设置MIME类型为Excel文件
response = FileResponse(open(temp_file_path, 'rb'),
content_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')
response['Content-Disposition'] = f'attachment; filename="{urllib.parse.quote(template_name)}"'
return response
@csrf_protect
@login_required
def common_excel_parse(request):
"""
该函数用于解析上传的Excel文件并返回数据。
Args:
request: HTTP请求对象包含上传的Excel文件。
Returns:
JsonResponse: 包含解析后的Excel数据的JSON响应或包含错误消息的JSON响应。
Raises:
N/A
"""
# 如果是POST请求并且有上传的文件
if request.method == 'POST' and request.FILES:
# 获取上传的Excel文件
excel_file = request.FILES.get('excel_file')
# 如果没有提供文件,返回错误响应
if not excel_file:
return JsonResponse({'error': '请先选择文件。'}, status=400)
# 获取模型名称
model_config = request.POST.get('model_config', '')
# 动态获取模型
try:
app_label, model_name = model_config.split('.')
model = apps.get_model(app_label, model_name)
except (LookupError, KeyError, ValueError):
return JsonResponse({'error': '模型配置错误。'}, status=400)
# 保存文件到服务器
file_name = default_storage.save(excel_file.name, excel_file)
file_path = os.path.join(settings.MEDIA_ROOT, file_name)
try:
# 打开并解析Excel文件
workbook = load_workbook(file_path, data_only=True)
sheet = workbook.active
# 读取第一行作为表头
header_row = [cell.value for cell in sheet[1]]
# 获取字段名和 verbose_name排除索引字段
model_fields = [field.name for field in model._meta.fields if
not field.primary_key and field.verbose_name in header_row]
model_verbose_name = [field.verbose_name for field in model._meta.fields if
not field.primary_key and field.verbose_name in header_row]
if not all(item in model_verbose_name for item in header_row):
return JsonResponse({'error': '表头不匹配请使用正确的Excel上传模板。'}, status=400)
# 组成 fields_map
fields_map = dict(zip(model_fields, header_row))
fields_map_nf = dict(zip(header_row, model_fields))
# 创建一个映射将Excel表头映射到模型字段名
header_to_field_map = {header: fields_map_nf[header] for header in header_row}
instance_list = []
for row in sheet.iter_rows(min_row=2, values_only=True):
if not all(value is None for value in row):
instance_list.append({header_to_field_map[header]: value for header, value in zip(header_row, row)})
# 清理,删除上传的文件
os.remove(file_path)
return JsonResponse({"table_data": instance_list, "fields_map": fields_map}, safe=False)
except Exception as e:
# 清理,删除上传的文件
os.remove(file_path)
return JsonResponse({'error': f'解析文件时出错: {str(e)}'}, status=500)
return JsonResponse({'error': '请求错误'}, status=400)
@csrf_protect
@login_required
def save_excel_table_data(request):
if request.method == 'POST':
try:
data = json.loads(request.body)
model_config = data.get('model_config')
table_data = data.get('table_data')
if not model_config or not table_data:
return JsonResponse({'error': '缺少必要的参数'}, status=400)
# 分割 model_config 以获取 app_label 和 model_name
try:
app_label, model_name = model_config.split('.')
Model = apps.get_model(app_label, model_name)
except (ValueError, LookupError):
return JsonResponse({'error': '无效的 model_config'}, status=400)
instances = []
for row_data in table_data:
instance_data = {}
for field_name, value in row_data.items():
field = Model._meta.get_field(field_name)
if field.is_relation:
related_model = field.related_model
related_field_name_list = related_model._meta.fields
for related_field_name in related_field_name_list:
try:
related_instance = related_model.objects.get(**{related_field_name.name: value})
value = related_instance
instance_data[field_name] = value
break
except Exception:
continue
else:
instance_data[field_name] = value
instance = Model(**instance_data)
try:
instance.full_clean()
instances.append(instance)
except ValidationError as e:
return JsonResponse({'error': f'数据校验错误: {e.message_dict}'}, status=400)
# 批量保存数据
Model.objects.bulk_create(instances)
return JsonResponse({'success': '数据保存成功'}, status=201)
except json.JSONDecodeError:
return JsonResponse({'error': '无效的JSON数据'}, status=400)
except Exception as e:
return JsonResponse({'error': f'保存数据时出错: {str(e)}'}, status=500)
return JsonResponse({'error': '请求错误'}, status=400)
@login_required
def load_secondary_departments(request):
primary_department_id = request.GET.get('primary_department_id')
secondary_departments = SecondaryDepartment.objects.filter(primary_department_id=primary_department_id).order_by(
'secondary_department_name')
return JsonResponse(list(secondary_departments.values('secondary_department_id', 'secondary_department_name')),
safe=False)
@csrf_protect
@login_required
def save_excel_table_data_fac_inv(request):
if request.method == 'POST':
try:
data = json.loads(request.body)
model_config = data.get('model_config')
table_data = data.get('table_data')
if not model_config or not table_data:
return JsonResponse({'error': '缺少必要的参数'}, status=400)
# 分割 model_config 以获取 app_label 和 model_name
try:
app_label, model_name = model_config.split('.')
Model = apps.get_model(app_label, model_name)
except (ValueError, LookupError):
return JsonResponse({'error': '无效的 model_config'}, status=400)
instances = []
for row_data in table_data:
row_data['project_id'] = ProjectLedger.objects.get(project_name=row_data['project_name'])
instance = Model(**row_data)
try:
instance.full_clean()
instances.append(instance)
except ValidationError as e:
return JsonResponse({'error': f'数据校验错误: {e.message_dict}'}, status=400)
# 批量保存数据
Model.objects.bulk_create(instances)
return JsonResponse({'success': '数据保存成功'}, status=201)
except json.JSONDecodeError:
return JsonResponse({'error': '无效的JSON数据'}, status=400)
except Exception as e:
return JsonResponse({'error': f'保存数据时出错: {str(e)}'}, status=500)
return JsonResponse({'error': '请求错误'}, status=400)
@csrf_protect
@login_required
def save_excel_table_data_fac_rep(request):
if request.method == 'POST':
try:
data = json.loads(request.body)
model_config = data.get('model_config')
table_data = data.get('table_data')
if not model_config or not table_data:
return JsonResponse({'error': '缺少必要的参数'}, status=400)
# 分割 model_config 以获取 app_label 和 model_name
try:
app_label, model_name = model_config.split('.')
Model = apps.get_model(app_label, model_name)
except (ValueError, LookupError):
return JsonResponse({'error': '无效的 model_config'}, status=400)
instances = []
for row_data in table_data:
row_data['project_id'] = ProjectLedger.objects.get(project_name=row_data['project_name'])
row_data['invoice'] = InvoiceRecord.objects.get(project_name=row_data['project_name'])
instance = Model(**row_data)
try:
instance.full_clean()
instances.append(instance)
except ValidationError as e:
return JsonResponse({'error': f'数据校验错误: {e.message_dict}'}, status=400)
# 批量保存数据
Model.objects.bulk_create(instances)
return JsonResponse({'success': '数据保存成功'}, status=201)
except json.JSONDecodeError:
return JsonResponse({'error': '无效的JSON数据'}, status=400)
except Exception as e:
return JsonResponse({'error': f'保存数据时出错: {str(e)}'}, status=500)
return JsonResponse({'error': '请求错误'}, status=400)
@csrf_protect
@login_required
def save_excel_table_data_inventory(request):
if request.method == 'POST':
try:
data = json.loads(request.body)
model_config = data.get('model_config')
def convert_replenishment_alert(td):
for item in td:
item['replenishment_alert'] = True if item['replenishment_alert'] == '' else False
return td
table_data = convert_replenishment_alert(data.get('table_data'))
if not model_config or not table_data:
return JsonResponse({'error': '缺少必要的参数'}, status=400)
# 分割 model_config 以获取 app_label 和 model_name
try:
app_label, model_name = model_config.split('.')
Model = apps.get_model(app_label, model_name)
except (ValueError, LookupError):
return JsonResponse({'error': '无效的 model_config'}, status=400)
instances = []
for row_data in table_data:
instance_data = {}
for field_name, value in row_data.items():
field = Model._meta.get_field(field_name)
if field.is_relation:
related_model = field.related_model
related_field_name_list = related_model._meta.fields
for related_field_name in related_field_name_list:
try:
related_instance = related_model.objects.get(**{related_field_name.name: value})
value = related_instance
instance_data[field_name] = value
break
except Exception:
continue
else:
instance_data[field_name] = value
instance = Model(**instance_data)
try:
instance.full_clean()
instances.append(instance)
except ValidationError as e:
return JsonResponse({'error': f'数据校验错误: {e.message_dict}'}, status=400)
# 批量保存数据
Model.objects.bulk_create(instances)
return JsonResponse({'success': '数据保存成功'}, status=201)
except json.JSONDecodeError:
return JsonResponse({'error': '无效的JSON数据'}, status=400)
except Exception as e:
return JsonResponse({'error': f'保存数据时出错: {str(e)}'}, status=500)
return JsonResponse({'error': '请求错误'}, status=400)
@csrf_protect
@login_required
def save_excel_table_data_svc(request):
if request.method == 'POST':
try:
data = json.loads(request.body)
model_config = data.get('model_config')
table_data = data.get('table_data')
if not model_config or not table_data:
return JsonResponse({'error': '缺少必要的参数'}, status=400)
# 分割 model_config 以获取 app_label 和 model_name
try:
app_label, model_name = model_config.split('.')
Model = apps.get_model(app_label, model_name)
except (ValueError, LookupError):
return JsonResponse({'error': '无效的 model_config'}, status=400)
instances = []
for row_data in table_data:
instance_data = {'usage_records': 0}
for field_name, value in row_data.items():
field = Model._meta.get_field(field_name)
if field.is_relation:
related_model = field.related_model
related_field_name_list = related_model._meta.fields
for related_field_name in related_field_name_list:
try:
related_instance = related_model.objects.get(**{related_field_name.name: value})
value = related_instance
instance_data[field_name] = value
break
except Exception:
continue
else:
instance_data[field_name] = value
instance = Model(**instance_data)
try:
instance.full_clean()
instances.append(instance)
except ValidationError as e:
return JsonResponse({'error': f'数据校验错误: {e.message_dict}'}, status=400)
# 批量保存数据
Model.objects.bulk_create(instances)
return JsonResponse({'success': '数据保存成功'}, status=201)
except json.JSONDecodeError:
return JsonResponse({'error': '无效的JSON数据'}, status=400)
except Exception as e:
return JsonResponse({'error': f'保存数据时出错: {str(e)}'}, status=500)
return JsonResponse({'error': '请求错误'}, status=400)
@csrf_protect
@login_required
def save_excel_table_data_ctt(request):
if request.method == 'POST':
try:
data = json.loads(request.body)
model_config = data.get('model_config')
def convert_replenishment_alert(td):
for item in td:
item['payment_agreement_needed'] = True if item['payment_agreement_needed'] == '' else False
item['signed'] = True if item['signed'] == '' else False
return td
table_data = convert_replenishment_alert(data.get('table_data'))
if not model_config or not table_data:
return JsonResponse({'error': '缺少必要的参数'}, status=400)
# 分割 model_config 以获取 app_label 和 model_name
try:
app_label, model_name = model_config.split('.')
Model = apps.get_model(app_label, model_name)
except (ValueError, LookupError):
return JsonResponse({'error': '无效的 model_config'}, status=400)
instances = []
for row_data in table_data:
instance_data = {}
for field_name, value in row_data.items():
field = Model._meta.get_field(field_name)
if field.is_relation:
related_model = field.related_model
related_field_name_list = related_model._meta.fields
for related_field_name in related_field_name_list:
try:
related_instance = related_model.objects.get(**{related_field_name.name: value})
value = related_instance
instance_data[field_name] = value
break
except Exception:
continue
else:
instance_data[field_name] = value
instance = Model(**instance_data)
try:
instance.full_clean()
instances.append(instance)
except ValidationError as e:
return JsonResponse({'error': f'数据校验错误: {e.message_dict}'}, status=400)
# 批量保存数据
Model.objects.bulk_create(instances)
return JsonResponse({'success': '数据保存成功'}, status=201)
except json.JSONDecodeError:
return JsonResponse({'error': '无效的JSON数据'}, status=400)
except Exception as e:
return JsonResponse({'error': f'保存数据时出错: {str(e)}'}, status=500)
return JsonResponse({'error': '请求错误'}, status=400)
@csrf_protect
@login_required
def save_excel_table_data_fix(request):
if request.method == 'POST':
try:
data = json.loads(request.body)
model_config = data.get('model_config')
table_data = data.get('table_data')
if not model_config or not table_data:
return JsonResponse({'error': '缺少必要的参数'}, status=400)
# 分割 model_config 以获取 app_label 和 model_name
try:
app_label, model_name = model_config.split('.')
Model = apps.get_model(app_label, model_name)
except (ValueError, LookupError):
return JsonResponse({'error': '无效的 model_config'}, status=400)
instances = []
for row_data in table_data:
instance_data = {'asset_id': str(uuid.uuid4())}
for field_name, value in row_data.items():
field = Model._meta.get_field(field_name)
if field.is_relation:
related_model = field.related_model
related_field_name_list = related_model._meta.fields
for related_field_name in related_field_name_list:
try:
related_instance = related_model.objects.get(**{related_field_name.name: value})
value = related_instance
instance_data[field_name] = value
break
except Exception:
continue
else:
instance_data[field_name] = value
instance = Model(**instance_data)
try:
instance.full_clean()
instances.append(instance)
except ValidationError as e:
return JsonResponse({'error': f'数据校验错误: {e.message_dict}'}, status=400)
# 批量保存数据
Model.objects.bulk_create(instances)
return JsonResponse({'success': '数据保存成功'}, status=201)
except json.JSONDecodeError:
return JsonResponse({'error': '无效的JSON数据'}, status=400)
except Exception as e:
return JsonResponse({'error': f'保存数据时出错: {str(e)}'}, status=500)
return JsonResponse({'error': '请求错误'}, status=400)