# 标准库导入 import json from datetime import datetime, date from django.contrib.auth.forms import AuthenticationForm from django.db import transaction from django.db.models import Q, Sum from django.http import JsonResponse, HttpResponseBadRequest from django.shortcuts import redirect, render, get_object_or_404 from django.utils.decorators import method_decorator # Django组件导入 from django.contrib import messages from django.contrib.auth import logout, authenticate, login from django.contrib.auth.decorators import login_required, permission_required from django.contrib.auth.models import Permission, User, Group from django.contrib.auth.views import LoginView from django.utils.timezone import now from django.views.decorators.csrf import csrf_protect from django.views.decorators.http import require_http_methods from XH_Digital_Management import settings # 本地Django应用导入 from application.accounts.models import AccountProfile from application.hrm_mgnt.models import EmployeeInformation, AnnualLeaveRecord, PerformanceEvaluation from application.perf_mgnt.models import EmployeePerformanceTarget from application.pjt_mgnt.models import ProjectLedger from common.auth import group_required from common.utils.page_helper import paginate_query_and_assign_numbers from pypinyin import lazy_pinyin def format_permissions(permissions): action_prefixes = ['Can add ', 'Can change ', 'Can delete ', 'Can view '] # 创建动作映射字典 action_mapping = { 'add': '新增', 'change': '修改', 'view': '查看', 'delete': '删除' } formatted_permissions = [] for perm in permissions: name = perm['name'] # 从权限名称中移除动作前缀 for prefix in action_prefixes: if name.startswith(prefix): name = name.replace(prefix, '') break # 从settings中获取APP_NAME_MAPPING app_label = perm['content_type__app_label'] resource_group = settings.APP_NAME_MAPPING.get(app_label) # 如果APP_NAME_MAPPING中没有相应的映射,则跳过这个权限 if resource_group is None: continue # 获取权限的动作描述 action = perm['codename'].split('_')[0] permission_description = action_mapping.get(action, '未知操作') # 构建格式化后的权限字典并添加到列表中 formatted_permissions.append({ 'id': perm['id'], 'resource_group': resource_group, 'resource': name, 'permission': permission_description, 'codename': perm['codename'] }) return formatted_permissions @csrf_protect def custom_login_view(request): template_name = 'accounts/login.html' # 用户已登录,重定向到首页或其他页面 if request.user.is_authenticated: return redirect('home') # 替换 'home' 为实际的重定向目标 if request.method == 'POST': form = AuthenticationForm(request, data=request.POST) if form.is_valid(): username = form.cleaned_data.get('username') password = form.cleaned_data.get('password') # 邮箱登录逻辑 if '@' in username: try: user = User.objects.get(email=username) if user.check_password(password): login(request, user) return redirect('home') # 替换 'home' 为实际的重定向目标 else: messages.error(request, '用户名或密码错误。') except User.DoesNotExist: messages.error(request, '用户名或密码错误。') else: # 用户名登录逻辑 user = authenticate(request, username=username, password=password) if user is not None and user.is_active: login(request, user) return redirect('home') # 替换 'home' 为实际的重定向目标 else: messages.error(request, '用户名或密码错误。') else: messages.error(request, '用户名或密码错误。') else: form = AuthenticationForm() return render(request, template_name, {'form': form}) def logout_view(request): logout(request) # Redirect to a success page, such as the home page return redirect('user_login') @login_required @permission_required('auth.view_user', raise_exception=True) def user_permissions_list(request): query_set = AccountProfile.objects.filter().order_by('id') name = request.GET.get('name', '') primary_department = request.GET.get('primary_department', '') if name: query_set = query_set.filter(employee_information__name__icontains=name) if primary_department: query_set = query_set.filter(employee_information__primary_department=primary_department) # query_set = query_set.filter(user__is_superuser=True) items = paginate_query_and_assign_numbers( request=request, queryset=query_set, per_page=10 ) # 构建上下文查询参数字符串 query_params = '&name={}' + format(name) + '&primary_department={}' + format(primary_department) context = { 'list_key': 'user.id', 'breadcrumb_list': [ {"title": "首页", "name": "index"}, {"title": "权限设置", "name": "user_permissions_list"}, {"title": "账号权限表", "name": "user_permissions_list"} ], "form_action_url": "user_permissions_list", 'filters': [ { "type": "text", "id": "name", "name": "name", "label": "姓名", "placeholder": "请输入姓名" }, { "type": "select", "id": "primary_department", "name": "primary_department", "label": "一级部门", "options": [ {"value": "天信", "display": "天信"}, {"value": "混改", "display": "混改"}, {"value": "艾力芬特", "display": "艾力芬特"}, {"value": "星河", "display": "星河"}, {"value": "星海", "display": "星海"} ] } ], "table_columns": [ {"header": "姓名", "field": "employee_information.name"}, {"header": "邮箱", "field": "user.email"}, {"header": "角色", "field": "user.is_superuser"}, {"header": "部门", "field": "employee_information.primary_department"}, {"header": "职务", "field": "employee_information.position"}, {"header": "状态", "field": "employee_information.status"}, {"header": "权限", "field": "authority"} ], 'query_params': query_params, 'items': items, } return render(request, 'accounts/user_permissions_list.html', context) @login_required() @group_required('系统管理') @permission_required('auth.view_permission', raise_exception=True) def get_user_existing_permissions(request, user_id): if not user_id: return JsonResponse({'error': 'User ID is required.'}, status=400) try: user_id = int(user_id) except ValueError: return JsonResponse({'error': 'Invalid User ID.'}, status=400) user = get_object_or_404(User, pk=user_id) # 获取所有权限 all_permissions = format_permissions( Permission.objects.all().select_related('content_type').values('id', 'name', 'codename', 'content_type__app_label')) # 获取用户的个人权限 user_permissions = user.user_permissions.all().values_list('codename', flat=True) # 处理数据以合并相同资源分组和资源名称的权限 processed_permissions = {} for perm in all_permissions: key = (perm['resource_group'], perm['resource']) if key not in processed_permissions: processed_permissions[key] = { 'resource_group': perm['resource_group'], 'resource': perm['resource'], 'add': {'id': None, 'has_permission': False}, 'remove': {'id': None, 'has_permission': False}, 'change': {'id': None, 'has_permission': False}, 'view': {'id': None, 'has_permission': False} } if perm['permission'] == '新增': processed_permissions[key]['add'] = {'id': perm['id'], 'has_permission': perm['codename'] in user_permissions} elif perm['permission'] == '删除': processed_permissions[key]['remove'] = {'id': perm['id'], 'has_permission': perm['codename'] in user_permissions} elif perm['permission'] == '修改': processed_permissions[key]['change'] = {'id': perm['id'], 'has_permission': perm['codename'] in user_permissions} elif perm['permission'] == '查看': processed_permissions[key]['view'] = {'id': perm['id'], 'has_permission': perm['codename'] in user_permissions} # 将processed_permissions从字典转换为列表 permissions_list = [] for key, perms in processed_permissions.items(): perms['resource_group'], perms['resource'] = key permissions_list.append(perms) return JsonResponse({'permissions': permissions_list, 'user_id': user_id}) @login_required @group_required('系统管理') @permission_required('auth.change_permission', raise_exception=True) def save_user_permissions(request, user_id): if request.method != 'POST': return JsonResponse({'error': 'Invalid request method'}, status=400) user = get_object_or_404(User, pk=user_id) try: permissions_data = json.loads(request.body) except ValueError: return JsonResponse({'error': 'Invalid JSON data'}, status=400) # 处理修改后的权限项 for perm_id, perm_changes in permissions_data.get('permissions', {}).items(): try: permission = Permission.objects.get(id=perm_id) # 处理“新增”权限 if 'add_permission' in perm_changes: if perm_changes['add_permission']: user.user_permissions.add(permission) else: user.user_permissions.remove(permission) # 处理“删除”权限 if 'delete_permission' in perm_changes: if perm_changes['delete_permission']: user.user_permissions.add(permission) else: user.user_permissions.remove(permission) # 处理“修改”权限 if 'edit_permission' in perm_changes: if perm_changes['edit_permission']: user.user_permissions.add(permission) else: user.user_permissions.remove(permission) # 处理“查看”权限 if 'view_permission' in perm_changes: if perm_changes['view_permission']: user.user_permissions.add(permission) else: user.user_permissions.remove(permission) except Permission.DoesNotExist: continue return JsonResponse({'success': True, 'message': 'Permissions updated successfully'}) @login_required @group_required('系统管理') @permission_required('auth.change_permission', raise_exception=True) def refresh_user_permissions(request, user_id): if request.method != 'POST': return JsonResponse({'error': 'Invalid request method'}, status=400) user = get_object_or_404(User, pk=user_id) # 清空用户的个人权限 user.user_permissions.clear() # 获取用户所在组的权限ID group_permissions = Permission.objects.filter(group__user=user).values_list('id', flat=True) # 为用户分配组权限 user.user_permissions.add(*group_permissions) return JsonResponse({'success': True, 'message': 'User permissions refreshed successfully'}) @login_required @group_required('系统管理') @permission_required('auth.view_permission', raise_exception=True) def group_perm_list(request): query_set = Group.objects.filter().order_by('id') group_name = request.GET.get('group_name') if group_name: query_set = query_set.filter(name__icontains=group_name) items = paginate_query_and_assign_numbers( request=request, queryset=query_set, per_page=10 ) # 构建上下文查询参数字符串 query_params = '&name={}' + format(group_name) context = { 'items': items, 'list_key': 'id', 'filters': [ { "type": "text", "id": "group_name", "name": "group_name", "label": "", "placeholder": "请输入用户组名称" }, ], 'breadcrumb_list': [ {"title": "首页", "name": "index"}, {"title": "权限设置", "name": "user_permissions_list"}, {"title": "用户组权限表", "name": "user_permissions_list"} ], "table_columns": [ {"header": "用户组", "field": "name"}, {"header": "权限", "field": "authority"}, {"header": "操作", "field": "actions"}, ], 'query_params': query_params, "form_action_url": "group_perm_list" } return render(request, 'accounts/group_perm_list.html', context) @login_required() @group_required('系统管理') def get_group_permissions(request, group_id): # 校验group_id是否存在 if not group_id: return JsonResponse({'error': 'Group ID is required.'}, status=400) try: group_id = int(group_id) except ValueError: return JsonResponse({'error': 'Invalid Group ID.'}, status=400) group = get_object_or_404(Group, pk=group_id) # 获取所有权限 all_permissions = format_permissions( Permission.objects.all().select_related('content_type').values('id', 'name', 'codename', 'content_type__app_label')) # 获取用户组的权限 group_permissions = group.permissions.all().values_list('codename', flat=True) # 处理数据以合并相同资源分组和资源名称的权限 processed_permissions = {} for perm in all_permissions: key = (perm['resource_group'], perm['resource']) if key not in processed_permissions: processed_permissions[key] = { 'resource_group': perm['resource_group'], 'resource': perm['resource'], 'add': {'id': None, 'has_permission': False}, 'remove': {'id': None, 'has_permission': False}, 'change': {'id': None, 'has_permission': False}, 'view': {'id': None, 'has_permission': False} } if perm['permission'] == '新增': processed_permissions[key]['add'] = {'id': perm['id'], 'has_permission': perm['codename'] in group_permissions} elif perm['permission'] == '删除': processed_permissions[key]['remove'] = {'id': perm['id'], 'has_permission': perm['codename'] in group_permissions} elif perm['permission'] == '修改': processed_permissions[key]['change'] = {'id': perm['id'], 'has_permission': perm['codename'] in group_permissions} elif perm['permission'] == '查看': processed_permissions[key]['view'] = {'id': perm['id'], 'has_permission': perm['codename'] in group_permissions} # 将processed_permissions从字典转换为列表 permissions_list = [] for key, perms in processed_permissions.items(): perms['resource_group'], perms['resource'] = key permissions_list.append(perms) return JsonResponse({'permissions': permissions_list, 'group_id': group_id}) @login_required @group_required('系统管理') @permission_required('auth.change_group', raise_exception=True) def save_group_permissions(request, group_id): if request.method != 'POST': return JsonResponse({'error': 'Invalid request method'}, status=400) group = get_object_or_404(Group, pk=group_id) try: permissions_data = json.loads(request.body) except ValueError: return JsonResponse({'error': 'Invalid JSON data'}, status=400) # 处理修改后的权限项 for perm_id, perm_changes in permissions_data.get('permissions', {}).items(): try: permission = Permission.objects.get(id=perm_id) # 处理“新增”权限 if 'add_permission' in perm_changes: if perm_changes['add_permission']: group.permissions.add(permission) else: group.permissions.remove(permission) # 处理“删除”权限 if 'delete_permission' in perm_changes: if perm_changes['delete_permission']: group.permissions.add(permission) else: group.permissions.remove(permission) # 处理“修改”权限 if 'edit_permission' in perm_changes: if perm_changes['edit_permission']: group.permissions.add(permission) else: group.permissions.remove(permission) # 处理“查看”权限 if 'view_permission' in perm_changes: if perm_changes['view_permission']: group.permissions.add(permission) else: group.permissions.remove(permission) except Permission.DoesNotExist: continue return JsonResponse({'success': True, 'message': 'Group permissions updated successfully'}) @require_http_methods(['POST']) @login_required @group_required('系统管理') @permission_required('auth.add_group', raise_exception=True) def add_group(request): # 从请求中获取组名 try: data = json.loads(request.body) group_name = data.get('group_name') if not group_name: raise ValueError('The group name is required.') # 创建新的用户组 group, created = Group.objects.get_or_create(name=group_name) if created: return JsonResponse({'success': True, 'message': 'Group created successfully.'}) else: return JsonResponse({'success': False, 'message': 'Group already exists.'}) except ValueError as e: return JsonResponse({'success': False, 'message': str(e)}) except Exception as e: return JsonResponse({'success': False, 'message': 'An error occurred during group creation.'}) @require_http_methods(['POST']) @login_required @group_required('系统管理') @permission_required('auth.delete_group', raise_exception=True) def delete_group(request, group_id): try: group = get_object_or_404(Group, pk=group_id) group.delete() return JsonResponse({'success': True, 'message': 'Group deleted successfully.'}) except Exception as e: return JsonResponse({'success': False, 'message': 'An error occurred during group deletion.'}) @login_required @permission_required('auth.view_user', raise_exception=True) def user_homepage_view(request): # 查询人员基本信息表中的转正日期、岗位 account_profile = AccountProfile.objects.get(user=request.user) employee_information = account_profile.employee_information # 计算在职天数,用今天减去employee_information.entry_date on_duty_days = (date.today() - employee_information.entry_date).days # 查询年假使用表中的年假剩余天数 try: annual_leave_records = AnnualLeaveRecord.objects.get(employee_name__icontains=employee_information.name) remaining_annual_leave = annual_leave_records.remaining_annual_leave used_annual_leave = annual_leave_records.used_annual_leave except AnnualLeaveRecord.DoesNotExist: remaining_annual_leave = 0 used_annual_leave = 0 # 查询员工业绩目标表的sales_target,和项目台账的关于这个人今年状态为已完成所有项目的标的金额,用标的金额除以目标表的sales_target得到完成率 try: employee_performance = EmployeePerformanceTarget.objects.get(name=employee_information.name) performance_target = employee_performance.sales_target except EmployeePerformanceTarget.DoesNotExist: performance_target = 0 # 获取今年的日期范围 current_year = now().year start_date = f"{current_year}-01-01" end_date = f"{current_year}-12-31" try: # 查询今年完成的项目 ongoing_projects = ProjectLedger.objects.filter( Q(project_leader=employee_information.name) | Q(project_members__contains=employee_information.name), project_status='进行中', end_date__range=[start_date, end_date] ) completed_projects = ProjectLedger.objects.filter( Q(project_leader=employee_information.name) | Q(project_members__contains=employee_information.name), project_status='完成', end_date__range=[start_date, end_date] ) ongoing_project_count = ongoing_projects.count() completed_project_count = completed_projects.count() total_project_count = ongoing_project_count + completed_project_count # 计算项目完成率 if total_project_count > 0: project_completion_rate = (completed_project_count / total_project_count) * 100 else: project_completion_rate = 0 # 计算已完成项目的总标的金额 total_completed_amount = completed_projects.aggregate(Sum('contract_amount'))['contract_amount__sum'] or 0 except ProjectLedger.DoesNotExist: total_completed_amount = 0 completed_project_count = 0 total_project_count = 0 project_completion_rate = 0 ongoing_project_count = 0 # 计算完成率 if performance_target > 0: completion_rate = (total_completed_amount / performance_target) * 100 else: completion_rate = 0 context = { 'name': employee_information.name, 'postion': employee_information.position, 'on_duty_days': on_duty_days, 'status': employee_information.status, 'remaining_annual_leave': remaining_annual_leave, 'used_annual_leave': used_annual_leave, 'performance_target': performance_target, 'total_completed_amount': total_completed_amount, 'completion_rate': completion_rate, 'completed_project_count': completed_project_count, 'ongoing_project_count': ongoing_project_count, 'total_project_count': total_project_count, 'project_completion_rate': project_completion_rate, } return render(request, 'user_homepage.html', context) @login_required @group_required('系统管理') @permission_required('auth.add_user', raise_exception=True) def create_account_profile(request): if request.method == 'POST': data = json.loads(request.body) employee_info_ids = data.get('employee_ids') # 获取employee_ids列表 # 检查是否提供了必填信息 if not employee_info_ids: return JsonResponse({'success': False, 'message': 'Employee Information IDs are required.'}) # 在事务中创建AccountProfile with transaction.atomic(): created_profiles = [] for employee_info_id in employee_info_ids: # 尝试获取 employee_information 实例 try: employee_information = EmployeeInformation.objects.get(employee_id=employee_info_id) except EmployeeInformation.DoesNotExist as e: return JsonResponse({'success': False, 'message': str(e)}) # 检查 employee_information 是否已经有对应的AccountProfile if AccountProfile.objects.filter(employee_information=employee_information).exists(): return JsonResponse( {'success': False, 'message': f'Employee Information ID {employee_info_id} is already bound to an AccountProfile.'}) # 将姓名转换为拼音作为用户名 username_base = ''.join(lazy_pinyin(employee_information.name)) username = username_base counter = 1 while User.objects.filter(username=username).exists(): username = f"{username_base}{counter}" counter += 1 # 创建默认密码 date_str = datetime.now().strftime('%Y%m%d') password = f"{username_base}{date_str}" # 创建新的 User 实例 user = User.objects.create_user( username=username, email=employee_information.email, password=password ) # 创建新的AccountProfile实例 account_profile = AccountProfile.objects.create( user=user, employee_information=employee_information ) created_profiles.append(account_profile) return JsonResponse( {'success': True, 'message': f'Created {len(created_profiles)} AccountProfiles successfully.'})