XH_Digital_Management/application/accounts/views.py

726 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 标准库导入
import json
from datetime import datetime, date
from django.db import transaction
from django.db.models import Q, Sum
from django.http import JsonResponse, HttpResponseNotFound
from django.shortcuts import redirect, render, get_object_or_404
# Django组件导入
from django.contrib import messages
from django.contrib.auth import logout, login, authenticate
from django.contrib.auth.decorators import login_required
from django.contrib.auth.models import Permission, User, Group
from django.urls import reverse
from django.utils.timezone import now
from django.views.decorators.csrf import csrf_protect
from django.views.decorators.http import require_http_methods
from application.accounts.forms import EmailAuthenticationForm
# 本地Django应用导入
from application.accounts.models import AccountProfile
from application.accounts.viewmodels import UserPermissionItem
from application.hrm_mgnt.models import AnnualLeaveRecord
from application.hrm_mgnt.models import EmployeeInformation
from application.perf_mgnt.models import EmployeePerformanceTarget
from application.pjt_mgnt.models import ProjectLedger
from common.auth import custom_permission_required
from common.utils.page_helper import paginate_query_and_assign_numbers
from pypinyin import lazy_pinyin
from XH_Digital_Management import settings
def format_permissions(permissions):
action_prefixes = ['Can add ', 'Can change ', 'Can delete ', 'Can view ']
# 创建动作映射字典
action_mapping = {
'add': '新增',
'change': '修改',
'view': '查看',
'delete': '删除'
}
formatted_permissions = []
for perm in permissions:
name = perm['name']
# 从权限名称中移除动作前缀
for prefix in action_prefixes:
if name.startswith(prefix):
name = name.replace(prefix, '')
break
# 从settings中获取APP_NAME_MAPPING
app_label = perm['content_type__app_label']
resource_group = settings.APP_NAME_MAPPING.get(app_label)
# 如果APP_NAME_MAPPING中没有相应的映射则跳过这个权限
if resource_group is None:
continue
# 获取权限的动作描述
action = perm['codename'].split('_')[0]
permission_description = action_mapping.get(action, '未知操作')
# 构建格式化后的权限字典并添加到列表中
formatted_permissions.append({
'id': perm['id'],
'resource_group': resource_group,
'resource': name,
'permission': permission_description,
'codename': perm['codename']
})
return formatted_permissions
@csrf_protect
def custom_login_view(request):
template_name = 'accounts/login.html'
if request.user.is_authenticated:
return redirect('user_homepage')
if request.method == 'POST':
form = EmailAuthenticationForm(data=request.POST)
if form.is_valid():
email = form.cleaned_data.get('username')
password = form.cleaned_data.get('password')
remember_me = request.POST.get('remember_me')
user = authenticate(request, username=email, password=password)
if user is not None:
login(request, user)
if remember_me:
request.session.set_expiry(settings.REMEMBER_ME_DURATION)
else:
request.session.set_expiry(0)
return redirect('user_homepage')
else:
messages.error(request, '邮箱或密码错误。')
else:
messages.error(request, '表单无效,请检查输入的内容。')
return redirect('user_login')
else:
form = EmailAuthenticationForm()
return render(request, template_name, {'form': form})
def logout_view(request):
logout(request)
# Redirect to a success page, such as the home page
return redirect('user_login')
@login_required
@custom_permission_required('auth.view_user')
def user_permissions_list(request):
query_set = AccountProfile.objects.filter().order_by('id')
name = request.GET.get('name', '')
primary_department = request.GET.get('primary_department', '')
if name:
query_set = query_set.filter(employee_information__name__icontains=name)
if primary_department:
query_set = query_set.filter(employee_information__primary_department=primary_department)
page_items = paginate_query_and_assign_numbers(
request=request,
queryset=query_set,
per_page=10
)
items = []
for profile in page_items:
item = UserPermissionItem(
id=profile.user.id,
name=profile.employee_information.name,
email=profile.user.email,
department=profile.employee_information.primary_department,
position=profile.employee_information.position,
employment_status=profile.employee_information.status,
account_status='正常' if profile.user.is_active else '停用',
is_superuser='管理员' if profile.user.is_superuser else '用户'
)
items.append(item)
# 构建上下文查询参数字符串
query_params = '&name={}' + format(name) + '&primary_department={}' + format(primary_department)
context = {
'list_key': 'user.id',
'breadcrumb_list': [
{"title": "首页", "name": "index"},
{"title": "权限设置", "name": "user_permissions_list"},
{"title": "账号权限表", "name": "user_permissions_list"}
],
"form_action_url": "user_permissions_list",
'filters': [
{
"type": "text",
"id": "name",
"name": "name",
"label": "姓名",
"placeholder": "请输入姓名"
},
{
"type": "select",
"id": "primary_department",
"name": "primary_department",
"label": "一级部门",
"options": [
{"value": "天信", "display": "天信"},
{"value": "混改", "display": "混改"},
{"value": "艾力芬特", "display": "艾力芬特"},
{"value": "星河", "display": "星河"},
{"value": "星海", "display": "星海"}
]
}
],
'query_params': query_params,
"columns": ["姓名", "邮箱", "一级部门", "职务", "在职状态", "账号状态", "账号类型"],
'items': items,
"delete_url": reverse("delete_account_profile"),
}
return render(request, 'accounts/user_permissions_list.html', context)
@login_required
@custom_permission_required('auth.delete_user')
def delete_account_profile(request):
if request.method == 'GET':
user_id = request.GET.get('id')
if not user_id:
return JsonResponse({"message": "缺少id参数"}, status=400)
AccountProfile.objects.filter(user_id=user_id).delete()
return JsonResponse({"message": "删除成功"})
return JsonResponse({"message": "无效的请求方法"}, status=405)
@login_required()
@custom_permission_required('auth.view_permission')
def get_user_existing_permissions(request, user_id):
if not user_id:
return JsonResponse({'error': 'User ID is required.'}, status=400)
try:
user_id = int(user_id)
except ValueError:
return JsonResponse({'error': 'Invalid User ID.'}, status=400)
user = get_object_or_404(User, pk=user_id)
# 获取所有权限
all_permissions = format_permissions(
Permission.objects.all().select_related('content_type').values('id', 'name', 'codename',
'content_type__app_label'))
# 获取用户的个人权限
user_permissions = user.user_permissions.all().values_list('codename', flat=True)
# 处理数据以合并相同资源分组和资源名称的权限
processed_permissions = {}
for perm in all_permissions:
key = (perm['resource_group'], perm['resource'])
if key not in processed_permissions:
processed_permissions[key] = {
'resource_group': perm['resource_group'],
'resource': perm['resource'],
'add': {'id': None, 'has_permission': False},
'remove': {'id': None, 'has_permission': False},
'change': {'id': None, 'has_permission': False},
'view': {'id': None, 'has_permission': False}
}
if perm['permission'] == '新增':
processed_permissions[key]['add'] = {'id': perm['id'],
'has_permission': perm['codename'] in user_permissions}
elif perm['permission'] == '删除':
processed_permissions[key]['remove'] = {'id': perm['id'],
'has_permission': perm['codename'] in user_permissions}
elif perm['permission'] == '修改':
processed_permissions[key]['change'] = {'id': perm['id'],
'has_permission': perm['codename'] in user_permissions}
elif perm['permission'] == '查看':
processed_permissions[key]['view'] = {'id': perm['id'],
'has_permission': perm['codename'] in user_permissions}
# 将processed_permissions从字典转换为列表
permissions_list = []
for key, perms in processed_permissions.items():
perms['resource_group'], perms['resource'] = key
permissions_list.append(perms)
return JsonResponse({'permissions': permissions_list, 'user_id': user_id})
@login_required
@custom_permission_required('auth.change_permission')
def save_user_permissions(request, user_id):
if request.method != 'POST':
return JsonResponse({'error': 'Invalid request method'}, status=400)
user = get_object_or_404(User, pk=user_id)
try:
permissions_data = json.loads(request.body)
except ValueError:
return JsonResponse({'error': 'Invalid JSON data'}, status=400)
# 处理修改后的权限项
for perm_id, perm_changes in permissions_data.get('permissions', {}).items():
try:
permission = Permission.objects.get(id=perm_id)
# 处理“新增”权限
if 'add_permission' in perm_changes:
if perm_changes['add_permission']:
user.user_permissions.add(permission)
else:
user.user_permissions.remove(permission)
# 处理“删除”权限
if 'delete_permission' in perm_changes:
if perm_changes['delete_permission']:
user.user_permissions.add(permission)
else:
user.user_permissions.remove(permission)
# 处理“修改”权限
if 'edit_permission' in perm_changes:
if perm_changes['edit_permission']:
user.user_permissions.add(permission)
else:
user.user_permissions.remove(permission)
# 处理“查看”权限
if 'view_permission' in perm_changes:
if perm_changes['view_permission']:
user.user_permissions.add(permission)
else:
user.user_permissions.remove(permission)
except Permission.DoesNotExist:
continue
return JsonResponse({'success': True, 'message': 'Permissions updated successfully'})
@login_required
@custom_permission_required('auth.change_permission')
def refresh_user_permissions(request, user_id):
if request.method != 'POST':
return JsonResponse({'error': 'Invalid request method'}, status=400)
user = get_object_or_404(User, pk=user_id)
# 清空用户的个人权限
user.user_permissions.clear()
# 获取用户所在组的权限ID
group_permissions = Permission.objects.filter(group__user=user).values_list('id', flat=True)
# 为用户分配组权限
user.user_permissions.add(*group_permissions)
return JsonResponse({'success': True, 'message': 'User permissions refreshed successfully'})
@login_required
@custom_permission_required('auth.view_permission')
def group_perm_list(request):
query_set = Group.objects.filter().order_by('id')
group_name = request.GET.get('group_name')
if group_name:
query_set = query_set.filter(name__icontains=group_name)
items = paginate_query_and_assign_numbers(
request=request,
queryset=query_set,
per_page=10
)
# 构建上下文查询参数字符串
query_params = '&name={}' + format(group_name)
context = {
'items': items,
'list_key': 'id',
'filters': [
{
"type": "text",
"id": "group_name",
"name": "group_name",
"label": "",
"placeholder": "请输入用户组名称"
},
],
'breadcrumb_list': [
{"title": "首页", "name": "index"},
{"title": "权限设置", "name": "user_permissions_list"},
{"title": "用户组权限表", "name": "user_permissions_list"}
],
"table_columns": [
{"header": "用户组", "field": "name"},
{"header": "权限", "field": "authority"},
{"header": "操作", "field": "actions"},
],
'query_params': query_params,
"form_action_url": "group_perm_list"
}
return render(request, 'accounts/group_perm_list.html', context)
@login_required()
def get_group_permissions(request, group_id):
# 校验group_id是否存在
if not group_id:
return JsonResponse({'error': 'Group ID is required.'}, status=400)
try:
group_id = int(group_id)
except ValueError:
return JsonResponse({'error': 'Invalid Group ID.'}, status=400)
group = get_object_or_404(Group, pk=group_id)
# 获取所有权限
all_permissions = format_permissions(
Permission.objects.all().select_related('content_type').values('id', 'name', 'codename',
'content_type__app_label'))
# 获取用户组的权限
group_permissions = group.permissions.all().values_list('codename', flat=True)
# 处理数据以合并相同资源分组和资源名称的权限
processed_permissions = {}
for perm in all_permissions:
key = (perm['resource_group'], perm['resource'])
if key not in processed_permissions:
processed_permissions[key] = {
'resource_group': perm['resource_group'],
'resource': perm['resource'],
'add': {'id': None, 'has_permission': False},
'remove': {'id': None, 'has_permission': False},
'change': {'id': None, 'has_permission': False},
'view': {'id': None, 'has_permission': False}
}
if perm['permission'] == '新增':
processed_permissions[key]['add'] = {'id': perm['id'],
'has_permission': perm['codename'] in group_permissions}
elif perm['permission'] == '删除':
processed_permissions[key]['remove'] = {'id': perm['id'],
'has_permission': perm['codename'] in group_permissions}
elif perm['permission'] == '修改':
processed_permissions[key]['change'] = {'id': perm['id'],
'has_permission': perm['codename'] in group_permissions}
elif perm['permission'] == '查看':
processed_permissions[key]['view'] = {'id': perm['id'],
'has_permission': perm['codename'] in group_permissions}
# 将processed_permissions从字典转换为列表
permissions_list = []
for key, perms in processed_permissions.items():
perms['resource_group'], perms['resource'] = key
permissions_list.append(perms)
return JsonResponse({'permissions': permissions_list, 'group_id': group_id})
@login_required
@custom_permission_required('auth.change_group')
def save_group_permissions(request, group_id):
if request.method != 'POST':
return JsonResponse({'error': 'Invalid request method'}, status=400)
group = get_object_or_404(Group, pk=group_id)
try:
permissions_data = json.loads(request.body)
except ValueError:
return JsonResponse({'error': 'Invalid JSON data'}, status=400)
# 处理修改后的权限项
for perm_id, perm_changes in permissions_data.get('permissions', {}).items():
try:
permission = Permission.objects.get(id=perm_id)
# 处理“新增”权限
if 'add_permission' in perm_changes:
if perm_changes['add_permission']:
group.permissions.add(permission)
else:
group.permissions.remove(permission)
# 处理“删除”权限
if 'delete_permission' in perm_changes:
if perm_changes['delete_permission']:
group.permissions.add(permission)
else:
group.permissions.remove(permission)
# 处理“修改”权限
if 'edit_permission' in perm_changes:
if perm_changes['edit_permission']:
group.permissions.add(permission)
else:
group.permissions.remove(permission)
# 处理“查看”权限
if 'view_permission' in perm_changes:
if perm_changes['view_permission']:
group.permissions.add(permission)
else:
group.permissions.remove(permission)
except Permission.DoesNotExist:
continue
return JsonResponse({'success': True, 'message': 'Group permissions updated successfully'})
@require_http_methods(['POST'])
@login_required
@custom_permission_required('auth.add_group')
def add_group(request):
# 从请求中获取组名
try:
data = json.loads(request.body)
group_name = data.get('group_name')
if not group_name:
raise ValueError('The group name is required.')
# 创建新的用户组
group, created = Group.objects.get_or_create(name=group_name)
if created:
return JsonResponse({'success': True, 'message': 'Group created successfully.'})
else:
return JsonResponse({'success': False, 'message': 'Group already exists.'})
except ValueError as e:
return JsonResponse({'success': False, 'message': str(e)})
except Exception as e:
return JsonResponse({'success': False, 'message': 'An error occurred during group creation.'})
@require_http_methods(['POST'])
@login_required
@custom_permission_required('auth.delete_group')
def delete_group(request, group_id):
try:
group = get_object_or_404(Group, pk=group_id)
group.delete()
return JsonResponse({'success': True, 'message': 'Group deleted successfully.'})
except Exception as e:
return JsonResponse({'success': False, 'message': 'An error occurred during group deletion.'})
@login_required
@custom_permission_required('auth.view_user')
def user_homepage_view(request):
try:
# 查询人员基本信息表中的转正日期、岗位
account_profile = AccountProfile.objects.get(user=request.user)
employee_information = account_profile.employee_information
# 计算在职天数用今天减去employee_information.entry_date
on_duty_days = (date.today() - employee_information.entry_date).days
# 查询年假使用表中的年假剩余天数
try:
annual_leave_records = AnnualLeaveRecord.objects.get(employee_name__icontains=employee_information.name)
remaining_annual_leave = annual_leave_records.remaining_annual_leave
used_annual_leave = annual_leave_records.used_annual_leave
except AnnualLeaveRecord.DoesNotExist:
remaining_annual_leave = 0
used_annual_leave = 0
# 查询员工业绩目标表的sales_target和项目台账的关于这个人今年状态为已完成所有项目的标的金额用标的金额除以目标表的sales_target得到完成率
try:
employee_performance = EmployeePerformanceTarget.objects.get(name=employee_information.name)
performance_target = employee_performance.sales_target
except EmployeePerformanceTarget.DoesNotExist:
performance_target = 0
# 获取今年的日期范围
current_year = now().year
start_date = f"{current_year}-01-01"
end_date = f"{current_year}-12-31"
try:
# 查询今年进行中的项目
ongoing_projects = ProjectLedger.objects.filter(
Q(project_leader=employee_information.name) | Q(project_members__contains=employee_information.name),
project_status='进行中',
end_date__range=[start_date, end_date]
).distinct()
# 查询今年已完成的项目
completed_projects = ProjectLedger.objects.filter(
Q(project_leader=employee_information.name) | Q(project_members__contains=employee_information.name),
project_status='完成',
end_date__range=[start_date, end_date]
).distinct()
ongoing_project_count = ongoing_projects.count()
completed_project_count = completed_projects.count()
total_project_count = ongoing_project_count + completed_project_count
# 计算项目完成率
if total_project_count > 0:
project_completion_rate = (completed_project_count / total_project_count) * 100
else:
project_completion_rate = 0
# 计算已完成项目的总标的金额
total_completed_amount = completed_projects.aggregate(Sum('contract_amount'))['contract_amount__sum'] or 0
except ProjectLedger.DoesNotExist:
total_completed_amount = 0
completed_project_count = 0
total_project_count = 0
project_completion_rate = 0
ongoing_project_count = 0
# 计算完成率
if performance_target > 0:
completion_rate = (total_completed_amount / performance_target) * 100
else:
completion_rate = 0
context = {
'name': employee_information.name,
'postion': employee_information.position,
'on_duty_days': on_duty_days,
'status': employee_information.status,
'remaining_annual_leave': remaining_annual_leave,
'used_annual_leave': used_annual_leave,
'performance_target': performance_target,
'total_completed_amount': total_completed_amount,
'completion_rate': completion_rate,
'completed_project_count': completed_project_count,
'ongoing_project_count': ongoing_project_count,
'total_project_count': total_project_count,
'project_completion_rate': project_completion_rate,
}
except AccountProfile.DoesNotExist:
context = {
'error': '该用户未注册',
}
return render(request, 'user_homepage.html', context)
@login_required
@custom_permission_required('auth.add_user')
def create_account_profile(request):
if request.method == 'POST':
data = json.loads(request.body)
employee_info_ids = data.get('employee_ids') # 获取employee_ids列表
# 检查是否提供了必填信息
if not employee_info_ids:
return JsonResponse({'success': False, 'message': 'Employee Information IDs are required.'})
# 在事务中创建AccountProfile
with transaction.atomic():
created_profiles = []
for employee_info_id in employee_info_ids:
# 尝试获取 employee_information 实例
try:
employee_information = EmployeeInformation.objects.get(employee_id=employee_info_id)
except EmployeeInformation.DoesNotExist as e:
return JsonResponse({'success': False, 'message': str(e)})
# 检查 employee_information 是否已经有对应的AccountProfile
if AccountProfile.objects.filter(employee_information=employee_information).exists():
return JsonResponse(
{'success': False,
'message': f'Employee Information ID {employee_info_id} is already bound to an AccountProfile.'})
# 将姓名转换为拼音作为用户名
username_base = ''.join(lazy_pinyin(employee_information.name))
username = username_base
counter = 1
while User.objects.filter(username=username).exists():
username = f"{username_base}{counter}"
counter += 1
# 创建默认密码
date_str = datetime.now().strftime('%Y%m%d')
password = f"{username_base}{date_str}"
# 创建新的 User 实例
user = User.objects.create_user(
username=username,
email=employee_information.email,
password=password,
last_name=employee_information.name
)
# 创建新的AccountProfile实例
account_profile = AccountProfile.objects.create(
user=user,
employee_information=employee_information
)
created_profiles.append(account_profile)
# 为用户分配 auth.view_user 权限
view_user_permission = Permission.objects.get(codename='view_user')
user.user_permissions.add(view_user_permission)
return JsonResponse(
{'success': True, 'message': f'Created {len(created_profiles)} AccountProfiles successfully.'})
@login_required
@custom_permission_required('auth.view_permission')
def get_user_permission_groups(request, user_id):
if request.method == 'GET':
# 获取所有权限组
all_groups = Group.objects.all().values('id', 'name')
# 获取用户的权限组信息
user = get_object_or_404(User, pk=user_id)
user_groups = user.groups.values_list('id', flat=True)
# 标记用户所在的权限组
groups_data = [{'id': group['id'], 'name': group['name'], 'isAssigned': (group['id'] in user_groups)} for group in all_groups]
return JsonResponse(groups_data, safe=False)
else:
return JsonResponse({'error': 'Invalid request method.'}, status=405)
@login_required
@custom_permission_required('auth.change_permission')
def save_user_permission_groups(request, user_id):
if request.method != 'POST':
return JsonResponse({'error': 'Invalid request method'}, status=400)
try:
user = User.objects.get(pk=user_id)
data = json.loads(request.body)
group_ids = data.get('group_ids', [])
# 清除用户当前的所有组
user.groups.clear()
# 重新添加选中的组
for group_id in group_ids:
group = Group.objects.get(pk=group_id)
user.groups.add(group)
return JsonResponse({'success': True, 'message': '权限组已更新'})
except Exception as e:
return JsonResponse({'success': False, 'message': str(e)}, status=400)