XH_Digital_Management/application/accounts/views.py

440 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 标准库导入
import json
from django.http import JsonResponse, HttpResponseBadRequest
from django.shortcuts import redirect, render, get_object_or_404
from django.utils.decorators import method_decorator
# Django组件导入
from django.contrib import messages
from django.contrib.auth import logout
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.models import Permission, User, Group
from django.contrib.auth.views import LoginView
from django.views.decorators.csrf import csrf_protect
from django.views.decorators.http import require_http_methods
from XH_Digital_Management import settings
# 本地Django应用导入
from application.accounts.models import AccountProfile
from common.auth import group_required
from common.utils.page_helper import paginate_query_and_assign_numbers
def format_permissions(permissions):
action_prefixes = ['Can add ', 'Can change ', 'Can delete ', 'Can view ']
# 创建动作映射字典
action_mapping = {
'add': '新增',
'change': '修改',
'view': '查看',
'delete': '删除'
}
formatted_permissions = []
for perm in permissions:
name = perm['name']
# 从权限名称中移除动作前缀
for prefix in action_prefixes:
if name.startswith(prefix):
name = name.replace(prefix, '')
break
# 从settings中获取APP_NAME_MAPPING
app_label = perm['content_type__app_label']
resource_group = settings.APP_NAME_MAPPING.get(app_label)
# 如果APP_NAME_MAPPING中没有相应的映射则跳过这个权限
if resource_group is None:
continue
# 获取权限的动作描述
action = perm['codename'].split('_')[0]
permission_description = action_mapping.get(action, '未知操作')
# 构建格式化后的权限字典并添加到列表中
formatted_permissions.append({
'id': perm['id'],
'resource_group': resource_group,
'resource': name,
'permission': permission_description,
'codename': perm['codename']
})
return formatted_permissions
@method_decorator(csrf_protect, name='dispatch')
class CustomLoginView(LoginView):
template_name = 'accounts/login.html'
def form_valid(self, form):
remember_me = form.cleaned_data.get('remember_me')
if remember_me:
self.request.session.set_expiry(1209600) # 2 weeks
else:
self.request.session.set_expiry(0) # Browser close
return super(CustomLoginView, self).form_valid(form)
def form_invalid(self, form):
messages.error(self.request, '用户名或密码错误。')
return super().form_invalid(form)
def logout_view(request):
logout(request)
# Redirect to a success page, such as the home page
return redirect('user_login')
@login_required
@permission_required('auth.view_user', raise_exception=True)
def user_permissions_list(request):
query_set = AccountProfile.objects.filter().order_by('id')
name = request.GET.get('name', '')
primary_department = request.GET.get('primary_department', '')
if name:
query_set = query_set.filter(employee_information__name__icontains=name)
if primary_department:
query_set = query_set.filter(employee_information__primary_department=primary_department)
query_set = query_set.filter(user__is_superuser=True)
items = paginate_query_and_assign_numbers(
request=request,
queryset=query_set,
per_page=10
)
# 构建上下文查询参数字符串
query_params = '&name={}' + format(name) + '&primary_department={}' + format(primary_department)
context = {
'list_key': 'user.id',
'breadcrumb_list': [
{"title": "首页", "name": "index"},
{"title": "权限设置", "name": "user_permissions_list"},
{"title": "账号权限表", "name": "user_permissions_list"}
],
"form_action_url": "user_permissions_list",
'filters': [
{
"type": "text",
"id": "name",
"name": "name",
"label": "姓名",
"placeholder": "请输入姓名"
},
{
"type": "select",
"id": "primary_department",
"name": "primary_department",
"label": "一级部门",
"options": [
{"value": "天信", "display": "天信"},
{"value": "混改", "display": "混改"},
{"value": "艾力芬特", "display": "艾力芬特"},
{"value": "星河", "display": "星河"},
{"value": "星海", "display": "星海"}
]
}
],
"table_columns": [
{"header": "姓名", "field": "employee_information.name"},
{"header": "邮箱", "field": "user.email"},
{"header": "角色", "field": "user.is_superuser"},
{"header": "部门", "field": "employee_information.primary_department"},
{"header": "职务", "field": "employee_information.position"},
{"header": "状态", "field": "employee_information.status"},
{"header": "权限", "field": "authority"},
{"header": "编辑", "field": "actions"}
],
'query_params': query_params,
'items': items,
}
return render(request, 'accounts/user_permissions_list.html', context)
@login_required()
@group_required('系统管理')
def get_user_existing_permissions(request, user_id):
if not user_id:
return JsonResponse({'error': 'User ID is required.'}, status=400)
try:
user_id = int(user_id)
except ValueError:
return JsonResponse({'error': 'Invalid User ID.'}, status=400)
user = get_object_or_404(User, pk=user_id)
# 获取所有权限
all_permissions = format_permissions(Permission.objects.all().select_related('content_type').values('id', 'name', 'codename', 'content_type__app_label'))
# 获取用户的个人权限
user_permissions = user.user_permissions.all().values_list('codename', flat=True)
# 处理数据以合并相同资源分组和资源名称的权限
processed_permissions = {}
for perm in all_permissions:
key = (perm['resource_group'], perm['resource'])
if key not in processed_permissions:
processed_permissions[key] = {
'resource_group': perm['resource_group'],
'resource': perm['resource'],
'add': {'id': None, 'has_permission': False},
'remove': {'id': None, 'has_permission': False},
'change': {'id': None, 'has_permission': False},
'view': {'id': None, 'has_permission': False}
}
if perm['permission'] == '新增':
processed_permissions[key]['add'] = {'id': perm['id'], 'has_permission': perm['codename'] in user_permissions}
elif perm['permission'] == '删除':
processed_permissions[key]['remove'] = {'id': perm['id'], 'has_permission': perm['codename'] in user_permissions}
elif perm['permission'] == '修改':
processed_permissions[key]['change'] = {'id': perm['id'], 'has_permission': perm['codename'] in user_permissions}
elif perm['permission'] == '查看':
processed_permissions[key]['view'] = {'id': perm['id'], 'has_permission': perm['codename'] in user_permissions}
# 将processed_permissions从字典转换为列表
permissions_list = []
for key, perms in processed_permissions.items():
perms['resource_group'], perms['resource'] = key
permissions_list.append(perms)
return JsonResponse({'permissions': permissions_list, 'user_id': user_id})
@login_required
@group_required('系统管理')
def save_user_permissions(request, user_id):
if request.method != 'POST':
return JsonResponse({'error': 'Invalid request method'}, status=400)
user = get_object_or_404(User, pk=user_id)
try:
permissions_data = json.loads(request.body)
except ValueError:
return JsonResponse({'error': 'Invalid JSON data'}, status=400)
# 处理修改后的权限项
for perm_id, perm_changes in permissions_data.get('permissions', {}).items():
try:
permission = Permission.objects.get(id=perm_id)
# 处理“新增”权限
if 'add_permission' in perm_changes:
if perm_changes['add_permission']:
user.user_permissions.add(permission)
else:
user.user_permissions.remove(permission)
# 处理“删除”权限
if 'delete_permission' in perm_changes:
if perm_changes['delete_permission']:
user.user_permissions.add(permission)
else:
user.user_permissions.remove(permission)
# 处理“修改”权限
if 'edit_permission' in perm_changes:
if perm_changes['edit_permission']:
user.user_permissions.add(permission)
else:
user.user_permissions.remove(permission)
# 处理“查看”权限
if 'view_permission' in perm_changes:
if perm_changes['view_permission']:
user.user_permissions.add(permission)
else:
user.user_permissions.remove(permission)
except Permission.DoesNotExist:
continue
return JsonResponse({'success': True, 'message': 'Permissions updated successfully'})
@login_required
@group_required('系统管理')
def refresh_user_permissions(request, user_id):
if request.method != 'POST':
return JsonResponse({'error': 'Invalid request method'}, status=400)
user = get_object_or_404(User, pk=user_id)
# 清空用户的个人权限
user.user_permissions.clear()
# 获取用户所在组的权限ID
group_permissions = Permission.objects.filter(group__user=user).values_list('id', flat=True)
# 为用户分配组权限
user.user_permissions.add(*group_permissions)
return JsonResponse({'success': True, 'message': 'User permissions refreshed successfully'})
@login_required
@group_required('系统管理')
def group_perm_list(request):
query_set = Group.objects.filter().order_by('id')
items = paginate_query_and_assign_numbers(
request=request,
queryset=query_set,
per_page=10
)
context = {
'items': items,
'list_key': 'id',
'breadcrumb_list': [
{"title": "首页", "name": "index"},
{"title": "权限设置", "name": "user_permissions_list"},
{"title": "用户组权限表", "name": "user_permissions_list"}
],
"table_columns": [
{"header": "用户组", "field": "name"},
{"header": "权限", "field": "authority"},
],
"form_action_url": "group_perm_list"
}
return render(request, 'accounts/group_perm_list.html', context)
@login_required()
@group_required('系统管理')
def get_group_permissions(request, group_id):
# 校验group_id是否存在
if not group_id:
return JsonResponse({'error': 'Group ID is required.'}, status=400)
try:
group_id = int(group_id)
except ValueError:
return JsonResponse({'error': 'Invalid Group ID.'}, status=400)
group = get_object_or_404(Group, pk=group_id)
# 获取所有权限
all_permissions = format_permissions(Permission.objects.all().select_related('content_type').values('id', 'name', 'codename', 'content_type__app_label'))
# 获取用户组的权限
group_permissions = group.permissions.all().values_list('codename', flat=True)
# 处理数据以合并相同资源分组和资源名称的权限
processed_permissions = {}
for perm in all_permissions:
key = (perm['resource_group'], perm['resource'])
if key not in processed_permissions:
processed_permissions[key] = {
'resource_group': perm['resource_group'],
'resource': perm['resource'],
'add': {'id': None, 'has_permission': False},
'remove': {'id': None, 'has_permission': False},
'change': {'id': None, 'has_permission': False},
'view': {'id': None, 'has_permission': False}
}
if perm['permission'] == '新增':
processed_permissions[key]['add'] = {'id': perm['id'], 'has_permission': perm['codename'] in group_permissions}
elif perm['permission'] == '删除':
processed_permissions[key]['remove'] = {'id': perm['id'], 'has_permission': perm['codename'] in group_permissions}
elif perm['permission'] == '修改':
processed_permissions[key]['change'] = {'id': perm['id'], 'has_permission': perm['codename'] in group_permissions}
elif perm['permission'] == '查看':
processed_permissions[key]['view'] = {'id': perm['id'], 'has_permission': perm['codename'] in group_permissions}
# 将processed_permissions从字典转换为列表
permissions_list = []
for key, perms in processed_permissions.items():
perms['resource_group'], perms['resource'] = key
permissions_list.append(perms)
return JsonResponse({'permissions': permissions_list, 'group_id': group_id})
@login_required
@group_required('系统管理')
def save_group_permissions(request, group_id):
if request.method != 'POST':
return JsonResponse({'error': 'Invalid request method'}, status=400)
group = get_object_or_404(Group, pk=group_id)
try:
permissions_data = json.loads(request.body)
except ValueError:
return JsonResponse({'error': 'Invalid JSON data'}, status=400)
# 处理修改后的权限项
for perm_id, perm_changes in permissions_data.get('permissions', {}).items():
try:
permission = Permission.objects.get(id=perm_id)
# 处理“新增”权限
if 'add_permission' in perm_changes:
if perm_changes['add_permission']:
group.permissions.add(permission)
else:
group.permissions.remove(permission)
# 处理“删除”权限
if 'delete_permission' in perm_changes:
if perm_changes['delete_permission']:
group.permissions.add(permission)
else:
group.permissions.remove(permission)
# 处理“修改”权限
if 'edit_permission' in perm_changes:
if perm_changes['edit_permission']:
group.permissions.add(permission)
else:
group.permissions.remove(permission)
# 处理“查看”权限
if 'view_permission' in perm_changes:
if perm_changes['view_permission']:
group.permissions.add(permission)
else:
group.permissions.remove(permission)
except Permission.DoesNotExist:
continue
return JsonResponse({'success': True, 'message': 'Group permissions updated successfully'})
@require_http_methods(['POST'])
@login_required
@group_required('系统管理')
def add_group(request):
# 从请求中获取组名
try:
group_name = request.POST.get('group_name')
if not group_name:
raise ValueError('The group name is required.')
# 创建新的用户组
group, created = Group.objects.get_or_create(name=group_name)
if created:
return JsonResponse({'success': True, 'message': 'Group created successfully.'})
else:
return JsonResponse({'success': False, 'message': 'Group already exists.'})
except ValueError as e:
return JsonResponse({'success': False, 'message': str(e)})
except Exception as e:
return JsonResponse({'success': False, 'message': 'An error occurred during group creation.'})