diff --git a/promgen/forms.py b/promgen/forms.py index 71134160..823209db 100644 --- a/promgen/forms.py +++ b/promgen/forms.py @@ -6,7 +6,6 @@ from dateutil import parser from django import forms -from django.conf import settings from django.contrib.auth.models import User from django.core.exceptions import ValidationError from guardian.conf.settings import ANONYMOUS_USER_NAME @@ -268,7 +267,7 @@ def get_permission_choices(input_object): def get_group_choices(): yield ("", "") - for g in models.Group.objects.exclude(name=settings.PROMGEN_DEFAULT_GROUP).order_by("name"): + for g in models.Group.objects.order_by("name"): yield (g.name, g.name) diff --git a/promgen/locale/ja/LC_MESSAGES/django.mo b/promgen/locale/ja/LC_MESSAGES/django.mo index 568d020e..a83f0afb 100644 Binary files a/promgen/locale/ja/LC_MESSAGES/django.mo and b/promgen/locale/ja/LC_MESSAGES/django.mo differ diff --git a/promgen/locale/ja/LC_MESSAGES/django.po b/promgen/locale/ja/LC_MESSAGES/django.po index 3503a510..2c2f4459 100644 --- a/promgen/locale/ja/LC_MESSAGES/django.po +++ b/promgen/locale/ja/LC_MESSAGES/django.po @@ -185,12 +185,6 @@ msgstr "Actions" msgid "No search parameters provided." msgstr "検索フレーズが指定されていません" -#: templates/promgen/service_detail.html:93 -#: templates/promgen/project_detail.html:101 -#: templates/promgen/farm_detail.html:71 -msgid "This is a transitional release. Even if you are able to assign roles to members, the permission checks are actually disabled. They will be enabled in the next release." -msgstr "このリリースは移行用のリリースです。ロールをメンバーに割り当てても、権限のチェックは行われません。次のリリースで権限のチェックが有効化されます。" - #: templates/promgen/permission_row.html:28 msgid "Delete all permissions for user?" msgstr "ユーザーから全ての権限を削除しますか?" diff --git a/promgen/migrations/0003_default-group.py b/promgen/migrations/0003_default-group.py index 2eff59d7..e59976e2 100644 --- a/promgen/migrations/0003_default-group.py +++ b/promgen/migrations/0003_default-group.py @@ -5,7 +5,7 @@ def create_group(apps, schema_editor): - if not settings.PROMGEN_DEFAULT_GROUP: + if not getattr(settings, "PROMGEN_DEFAULT_GROUP", None): return # Create Default Group diff --git a/promgen/migrations/0038_remove_default_group.py b/promgen/migrations/0038_remove_default_group.py new file mode 100644 index 00000000..d19f480e --- /dev/null +++ b/promgen/migrations/0038_remove_default_group.py @@ -0,0 +1,24 @@ +# Generated by Django 4.2.11 on 2025-08-14 08:34 + +from django.db import migrations + + +def remove_group(apps, schema_editor): + # Get the Default group + # Note: The group name is hardcoded as "Default" in the original Promgen. + # If the name is different in your application, you should change it + # according to the value used in settings.PROMGEN_DEFAULT_GROUP. + default_group = apps.get_model("auth", "Group").objects.filter(name="Default").first() + + if default_group: + default_group.user_set.clear() + default_group.permissions.clear() + default_group.delete() + + +class Migration(migrations.Migration): + dependencies = [ + ("promgen", "0037_remove_project_farm_alter_farm_project"), + ] + + operations = [migrations.RunPython(remove_group)] diff --git a/promgen/mixins.py b/promgen/mixins.py index 84aaad5f..f80fe506 100644 --- a/promgen/mixins.py +++ b/promgen/mixins.py @@ -1,14 +1,16 @@ # Copyright (c) 2019 LINE Corporation # These sources are released under the terms of the MIT license: see LICENSE - +import guardian.mixins +import guardian.utils from django.contrib import messages from django.contrib.auth.mixins import PermissionRequiredMixin +from django.contrib.auth.models import User from django.contrib.auth.views import redirect_to_login from django.contrib.contenttypes.models import ContentType -from django.shortcuts import get_object_or_404 +from django.shortcuts import get_object_or_404, redirect from django.views.generic.base import ContextMixin -from promgen import models +from promgen import models, views class ContentTypeMixin: @@ -78,3 +80,81 @@ def get_context_data(self, **kwargs): models.Service, id=self.kwargs["pk"] ) return context + + +class PromgenGuardianPermissionMixin(guardian.mixins.PermissionRequiredMixin): + def get_check_permission_object(self): + # Override this method to return the object to check permissions for + return self.get_object() + + def get_check_permission_objects(self): + # We only define permission codes for Service, Group and Project + # So we need to check the permission for the parent objects in other cases + try: + object = self.get_check_permission_object() + if isinstance(object, models.Service) or isinstance(object, models.Group): + return [object] + if isinstance(object, models.Project): + return [object, object.service] + if ( + isinstance(object, models.Exporter) + or isinstance(object, models.URL) + or isinstance(object, models.Farm) + ): + return [object.project, object.project.service] + if isinstance(object, models.Host): + return [object.farm.project, object.farm.project.service] + if isinstance(object, models.Rule) or isinstance(object, models.Sender): + if isinstance(object.content_object, models.Project): + return [object.content_object, object.content_object.service] + else: + return [object.content_object] + return None + except Exception: + return None + + def check_permissions(self, request): + # Always allow user to view the site rule + if isinstance(self, views.RuleDetail) and isinstance( + self.get_check_permission_object().content_object, models.Site + ): + return None + + check_permission_objects = self.get_check_permission_objects() + if check_permission_objects is None: + if request.user.is_active and request.user.is_superuser: + return None + return self.on_permission_check_fail(request, None) + + # Loop through all the objects to check permissions for + # If any of the objects has the required permission (any_perm=True), we can proceed + # Otherwise, we will return the forbidden response + forbidden = None + for obj in check_permission_objects: + # Users always have permission on themselves + if isinstance(obj, User) and request.user == obj: + break + + forbidden = guardian.utils.get_40x_or_None( + request, + perms=self.get_required_permissions(request), + obj=obj, + login_url=self.login_url, + redirect_field_name=self.redirect_field_name, + return_403=self.return_403, + return_404=self.return_404, + accept_global_perms=False, + any_perm=True, + ) + if forbidden is None: + break + if forbidden: + return self.on_permission_check_fail(request, forbidden) + return None + + def on_permission_check_fail(self, request, response, obj=None): + messages.warning(request, "You do not have permission to perform this action.") + referer = request.META.get("HTTP_REFERER") + if referer: + return redirect(referer) + return redirect_to_login(self.request.get_full_path()) diff --git a/promgen/permissions.py b/promgen/permissions.py index 67c05d6b..0528ef5f 100644 --- a/promgen/permissions.py +++ b/promgen/permissions.py @@ -1,8 +1,14 @@ # Copyright (c) 2025 LINE Corporation # These sources are released under the terms of the MIT license: see LICENSE +from django.contrib.auth.models import User +from django.db.models import Q from django.utils.itercompat import is_iterable +from guardian.shortcuts import get_objects_for_user +from rest_framework import permissions from rest_framework.permissions import BasePermission +from promgen import models + class PromgenModelPermissions(BasePermission): """ @@ -41,3 +47,54 @@ def has_permission(self, request, view): return any(request.user.has_perm(perm) for perm in perm_list) else: return all(request.user.has_perm(perm) for perm in perm_list) + + +class ReadOnlyForAuthenticatedUserOrIsSuperuser(BasePermission): + """ + Customize Django REST Framework's base permission class to only allow read-only access for + authenticated users and full access for superusers. + """ + + def has_permission(self, request, view): + if request.user.is_superuser: + return True + return bool( + request.user + and request.user.is_authenticated + and request.method in permissions.SAFE_METHODS + ) + + +def get_accessible_services_for_user(user: User): + return get_objects_for_user( + user, + ["service_admin", "service_editor", "service_viewer"], + any_perm=True, + use_groups=True, + accept_global_perms=False, + klass=models.Service, + ) + + +def get_accessible_projects_for_user(user: User): + services = get_accessible_services_for_user(user) + projects = get_objects_for_user( + user, + ["project_admin", "project_editor", "project_viewer"], + any_perm=True, + use_groups=True, + accept_global_perms=False, + klass=models.Project, + ) + return models.Project.objects.filter(Q(pk__in=projects) | Q(service__in=services)) + + +def get_accessible_groups_for_user(user: User): + return get_objects_for_user( + user, + ["group_admin", "group_member"], + any_perm=True, + use_groups=False, + accept_global_perms=False, + klass=models.Group, + ) diff --git a/promgen/prometheus.py b/promgen/prometheus.py index 59d16548..64e9ebb1 100644 --- a/promgen/prometheus.py +++ b/promgen/prometheus.py @@ -65,14 +65,18 @@ def render_rules(rules=None): return renderers.RuleRenderer().render(serializers.AlertRuleSerializer(rules, many=True).data) -def render_urls(): +def render_urls(projects=None): urls = collections.defaultdict(list) - for url in models.URL.objects.prefetch_related( + url_queryset = models.URL.objects.prefetch_related( "project__service", "project__shard", "project", - ): + ) + if projects is not None: + url_queryset = url_queryset.filter(project__in=projects) + + for url in url_queryset: urls[ ( url.project.name, @@ -98,7 +102,7 @@ def render_urls(): return json.dumps(data, indent=2, sort_keys=True) -def render_config(service=None, project=None): +def render_config(service=None, project=None, services=None, projects=None, farms=None): data = [] for exporter in models.Exporter.objects.prefetch_related( "project__farm__host_set", @@ -113,6 +117,10 @@ def render_config(service=None, project=None): continue if project and exporter.project.name != project.name: continue + if services is not None and exporter.project.service not in services: + continue + if projects is not None and exporter.project not in projects: + continue if not exporter.enabled: continue @@ -129,8 +137,9 @@ def render_config(service=None, project=None): labels["__metrics_path__"] = exporter.path hosts = [] - for host in exporter.project.farm.host_set.all(): - hosts.append(f"{host.name}:{exporter.port}") + if farms is None or exporter.project.farm in farms: + for host in exporter.project.farm.host_set.all(): + hosts.append(f"{host.name}:{exporter.port}") data.append({"labels": labels, "targets": hosts}) return json.dumps(data, indent=2, sort_keys=True) diff --git a/promgen/proxy.py b/promgen/proxy.py index c958e468..d1544c3b 100644 --- a/promgen/proxy.py +++ b/promgen/proxy.py @@ -16,6 +16,7 @@ from rest_framework.views import APIView from promgen import forms, models, prometheus, serializers, util +from promgen import permissions as promgen_permissions logger = logging.getLogger(__name__) @@ -164,6 +165,22 @@ def get(self, request): logger.error("Error connecting to %s", url) return JsonResponse({}, status=HTTPStatus.INTERNAL_SERVER_ERROR) else: + # Filter the alerts based on the user's permissions + if not self.request.user.is_superuser: + services = promgen_permissions.get_accessible_services_for_user(self.request.user) + projects = promgen_permissions.get_accessible_projects_for_user(self.request.user) + + accessible_projects = projects.values_list("name", flat=True) + accessible_services = services.values_list("name", flat=True) + + filtered_response = [ + alert + for alert in response.json() + if alert.get("labels", {}).get("service") in accessible_services + or alert.get("labels", {}).get("project") in accessible_projects + ] + return HttpResponse(json.dumps(filtered_response), content_type="application/json") + # If the user is a superuser, return all alerts return HttpResponse(response.content, content_type="application/json") @@ -176,6 +193,31 @@ def get(self, request): logger.error("Error connecting to %s", url) return JsonResponse({}, status=HTTPStatus.INTERNAL_SERVER_ERROR) else: + # Filter the silences based on the user's permissions + if not self.request.user.is_superuser: + services = promgen_permissions.get_accessible_services_for_user(self.request.user) + projects = promgen_permissions.get_accessible_projects_for_user(self.request.user) + + accessible_projects = projects.values_list("name", flat=True) + accessible_services = services.values_list("name", flat=True) + + filtered_response = [ + silence + for silence in response.json() + if any( + ( + matcher.get("name") == "service" + and matcher.get("value") in accessible_services + ) + or ( + matcher.get("name") == "project" + and matcher.get("value") in accessible_projects + ) + for matcher in silence.get("matchers", []) + ) + ] + return HttpResponse(json.dumps(filtered_response), content_type="application/json") + return HttpResponse(response.content, content_type="application/json") def post(self, request): @@ -196,6 +238,48 @@ def post(self, request): status=HTTPStatus.UNPROCESSABLE_ENTITY, ) + # Check if the user has permission to silence the alert + if not request.user.is_superuser: + if "project" not in body["labels"] and "service" not in body["labels"]: + return JsonResponse( + { + "messages": [ + { + "class": "alert alert-warning", + "message": "You must specify either a project or service label", + } + ] + }, + status=HTTPStatus.UNPROCESSABLE_ENTITY, + ) + + permission_denied_response = JsonResponse( + { + "messages": [ + { + "class": "alert alert-danger", + "message": "You do not have permission to silence this alert", + } + ] + }, + status=HTTPStatus.FORBIDDEN, + ) + if "project" in body["labels"]: + project = models.Project.objects.get(name=body["labels"]["project"]) + if ( + not request.user.has_perm("project_admin", project) + and not request.user.has_perm("project_editor", project) + and not request.user.has_perm("service_admin", project.service) + and not request.user.has_perm("service_editor", project.service) + ): + return permission_denied_response + elif "service" in body["labels"]: + service = models.Service.objects.get(name=body["labels"]["service"]) + if not request.user.has_perm( + "service_admin", service + ) and not request.user.has_perm("service_editor", service): + return permission_denied_response + try: response = prometheus.silence(**form.cleaned_data) except requests.HTTPError as e: @@ -260,6 +344,63 @@ def post(self, request, *args, **kwargs): class ProxyDeleteSilence(View): def delete(self, request, silence_id): url = urljoin(util.setting("alertmanager:url"), f"/api/v2/silence/{silence_id}") + # First, check if the silence exists + response = util.get(url) + if response.status_code != 200: + return HttpResponse( + response.text, status=response.status_code, content_type="application/json" + ) + + # Check if the user has permission to delete the silence + if not request.user.is_superuser: + silence = response.json() + project = None + service = None + for matcher in silence.get("matchers", []): + if matcher.get("name") == "project": + project = matcher.get("value") + if matcher.get("name") == "service": + service = matcher.get("value") + if project is None and service is None: + return JsonResponse( + { + "messages": [ + { + "class": "alert alert-warning", + "message": "Silence must have either a project or service matcher", + } + ] + }, + status=HTTPStatus.UNPROCESSABLE_ENTITY, + ) + permission_denied_response = JsonResponse( + { + "messages": [ + { + "class": "alert alert-danger", + "message": "You do not have permission to delete this silence", + } + ] + }, + status=HTTPStatus.FORBIDDEN, + ) + if project: + project = models.Project.objects.get(name=project) + if ( + not request.user.has_perm("project_admin", project) + and not request.user.has_perm("project_editor", project) + and not request.user.has_perm("service_admin", project.service) + and not request.user.has_perm("service_editor", project.service) + ): + return permission_denied_response + elif service: + service = models.Service.objects.get(name=service) + if not request.user.has_perm( + "service_admin", service + ) and not request.user.has_perm("service_editor", service): + return permission_denied_response + + # Delete the silence response = util.delete(url) return HttpResponse( response.text, status=response.status_code, content_type="application/json" diff --git a/promgen/rest.py b/promgen/rest.py index 7d4d4c0e..3a76d092 100644 --- a/promgen/rest.py +++ b/promgen/rest.py @@ -1,14 +1,15 @@ # Copyright (c) 2019 LINE Corporation # These sources are released under the terms of the MIT license: see LICENSE +from itertools import chain from django.http import HttpResponse from requests.exceptions import HTTPError -from rest_framework import permissions, viewsets +from rest_framework import viewsets from rest_framework.decorators import action from rest_framework.response import Response from rest_framework.views import APIView -from promgen import filters, models, prometheus, renderers, serializers, tasks, util +from promgen import filters, models, permissions, prometheus, renderers, serializers, tasks, util from promgen.permissions import PromgenModelPermissions @@ -28,11 +29,27 @@ def post(self, request, *args, **kwargs): class AllViewSet(viewsets.ViewSet): - permission_classes = [permissions.AllowAny] - @action(detail=False, methods=["get"], renderer_classes=[renderers.RuleRenderer]) def rules(self, request): - rules = models.Rule.objects.filter(enabled=True) + site_rules = models.Rule.objects.filter( + content_type__model="site", content_type__app_label="promgen", enabled=True + ) + service_rules = models.Rule.objects.filter( + content_type__model="service", content_type__app_label="promgen", enabled=True + ) + project_rules = models.Rule.objects.filter( + content_type__model="project", content_type__app_label="promgen", enabled=True + ) + + # If the user is not a superuser, we need to filter the rules by the user's permissions + if not self.request.user.is_superuser: + services = permissions.get_accessible_services_for_user(self.request.user) + service_rules = service_rules.filter(object_id__in=services) + + projects = permissions.get_accessible_projects_for_user(self.request.user) + project_rules = project_rules.filter(object_id__in=projects) + + rules = list(chain(site_rules, service_rules, project_rules)) return Response( serializers.AlertRuleSerializer(rules, many=True).data, headers={"Content-Disposition": "attachment; filename=alert.rule.yml"}, @@ -40,15 +57,35 @@ def rules(self, request): @action(detail=False, methods=["get"], renderer_classes=[renderers.renderers.JSONRenderer]) def targets(self, request): + if self.request.user.is_superuser: + return HttpResponse( + prometheus.render_config(), + content_type="application/json", + ) + + # if the user is not a superuser, we need to filter the targets by the user's permissions + services = permissions.get_accessible_services_for_user(self.request.user) + projects = permissions.get_accessible_projects_for_user(self.request.user) + farms = models.Farm.objects.filter(project__in=projects) + return HttpResponse( - prometheus.render_config(), + prometheus.render_config(services=services, projects=projects, farms=farms), content_type="application/json", ) @action(detail=False, methods=["get"], renderer_classes=[renderers.renderers.JSONRenderer]) def urls(self, request): + if self.request.user.is_superuser: + return HttpResponse( + prometheus.render_urls(), + content_type="application/json", + ) + + # if the user is not a superuser, we need to filter the URLs by the user's permissions + projects = permissions.get_accessible_projects_for_user(self.request.user) + return HttpResponse( - prometheus.render_urls(), + prometheus.render_urls(projects=projects), content_type="application/json", ) @@ -115,6 +152,12 @@ class ServiceViewSet(NotifierMixin, RuleMixin, viewsets.ModelViewSet): lookup_value_regex = "[^/]+" lookup_field = "name" + def get_queryset(self): + query_set = self.queryset + return query_set.filter( + pk__in=permissions.get_accessible_services_for_user(self.request.user) + ) + @action(detail=True, methods=["get"]) def projects(self, request, name): service = self.get_object() @@ -135,6 +178,12 @@ class ProjectViewSet(NotifierMixin, RuleMixin, viewsets.ModelViewSet): lookup_value_regex = "[^/]+" lookup_field = "name" + def get_queryset(self): + query_set = self.queryset + return query_set.filter( + pk__in=permissions.get_accessible_projects_for_user(self.request.user) + ) + @action(detail=True, methods=["get"]) def targets(self, request, name): return HttpResponse( @@ -150,6 +199,14 @@ class FarmViewSet(viewsets.ModelViewSet): lookup_value_regex = "[^/]+" lookup_field = "id" + def get_queryset(self): + query_set = self.queryset + # If the user is not a superuser, we need to filter the farms by the user's permissions + if not self.request.user.is_superuser: + projects = permissions.get_accessible_projects_for_user(self.request.user) + query_set = query_set.filter(project__in=projects) + return query_set + def retrieve(self, request, id): farm = self.get_object() farm_data = self.get_serializer(farm).data diff --git a/promgen/settings.py b/promgen/settings.py index da6d5769..a9f7c4fb 100644 --- a/promgen/settings.py +++ b/promgen/settings.py @@ -50,7 +50,6 @@ else: PROMGEN = {} -PROMGEN_DEFAULT_GROUP = "Default" PROMGEN_SCHEME = env.str("PROMGEN_SCHEME", default="http") ALLOWED_HOSTS = env.list("ALLOWED_HOSTS", default=["127.0.0.1", "localhost"]) @@ -194,7 +193,7 @@ "rest_framework.authentication.SessionAuthentication", ), "DEFAULT_PERMISSION_CLASSES": ( - "rest_framework.permissions.DjangoModelPermissionsOrAnonReadOnly", + "promgen.permissions.ReadOnlyForAuthenticatedUserOrIsSuperuser", ), "DEFAULT_FILTER_BACKENDS": ("django_filters.rest_framework.DjangoFilterBackend",), } diff --git a/promgen/signals.py b/promgen/signals.py index a2c2f0ba..dd160932 100644 --- a/promgen/signals.py +++ b/promgen/signals.py @@ -6,7 +6,6 @@ from django.conf import settings from django.contrib import messages -from django.contrib.auth.models import Group from django.contrib.contenttypes.models import ContentType from django.core.cache import cache from django.db.models import Q @@ -276,20 +275,6 @@ def save_service(*, sender, instance, **kwargs): return True -@receiver(post_save, sender=settings.AUTH_USER_MODEL) -@skip_raw -def add_user_to_default_group(instance, created, **kwargs): - # If we enabled our default group, then we want to ensure that all newly - # created users are also added to our default group so they inherit the - # default permissions - if not settings.PROMGEN_DEFAULT_GROUP: - return - if not created: - return - - instance.groups.add(Group.objects.get(name=settings.PROMGEN_DEFAULT_GROUP)) - - @receiver(post_save, sender=settings.AUTH_USER_MODEL) @skip_raw def add_email_sender(instance, created, **kwargs): diff --git a/promgen/templates/promgen/project_detail.html b/promgen/templates/promgen/project_detail.html index 4fb2c733..4ace77a6 100644 --- a/promgen/templates/promgen/project_detail.html +++ b/promgen/templates/promgen/project_detail.html @@ -97,9 +97,6 @@

- {% include "promgen/permission_block.html" with object=project %}
diff --git a/promgen/templates/promgen/service_detail.html b/promgen/templates/promgen/service_detail.html index 06e7a2d2..cae8e467 100644 --- a/promgen/templates/promgen/service_detail.html +++ b/promgen/templates/promgen/service_detail.html @@ -89,9 +89,6 @@

Service: {{ service.name }}

- {% include "promgen/permission_block.html" with object=service %}
diff --git a/promgen/tests/examples/silence.duration.json b/promgen/tests/examples/silence.duration.json index f468c9c9..3e4e452e 100644 --- a/promgen/tests/examples/silence.duration.json +++ b/promgen/tests/examples/silence.duration.json @@ -1,5 +1,5 @@ { - "createdBy": "demo@example.com", + "createdBy": "admin@example.com", "matchers": [{ "value": "example.com:[0-9]*", "isRegex": true, diff --git a/promgen/tests/examples/silence.range.json b/promgen/tests/examples/silence.range.json index 7277afc5..b9bc7daf 100644 --- a/promgen/tests/examples/silence.range.json +++ b/promgen/tests/examples/silence.range.json @@ -1,5 +1,5 @@ { - "createdBy": "demo@example.com", + "createdBy": "admin@example.com", "matchers": [{ "value": "example.com:[0-9]*", "isRegex": true, diff --git a/promgen/tests/test_host_add.py b/promgen/tests/test_host_add.py index 41c52d0e..992c79f0 100644 --- a/promgen/tests/test_host_add.py +++ b/promgen/tests/test_host_add.py @@ -1,10 +1,11 @@ # Copyright (c) 2017 LINE Corporation # These sources are released under the terms of the MIT license: see LICENSE - - +from django.shortcuts import get_object_or_404 from django.urls import reverse +from guardian.shortcuts import assign_perm from promgen import models, validators +from promgen.middleware import get_current_user from promgen.tests import PromgenTest @@ -16,6 +17,9 @@ def setUp(self): # separated and comma separated work, but are not necessarily testing # valid/invalid hostnames def test_newline(self): + assign_perm( + "promgen.project_editor", get_current_user(), get_object_or_404(models.Project, pk=1) + ) self.client.post( reverse("hosts-add", args=[1]), {"hosts": "\naaa.example.com\nbbb.example.com\nccc.example.com \n"}, @@ -24,6 +28,9 @@ def test_newline(self): self.assertCount(models.Host, 3, "Expected 3 hosts") def test_comma(self): + assign_perm( + "promgen.project_editor", get_current_user(), get_object_or_404(models.Project, pk=1) + ) self.client.post( reverse("hosts-add", args=[1]), {"hosts": ",,aaa.example.com, bbb.example.com,ccc.example.com,"}, diff --git a/promgen/tests/test_mixins.py b/promgen/tests/test_mixins.py new file mode 100644 index 00000000..7bdf2388 --- /dev/null +++ b/promgen/tests/test_mixins.py @@ -0,0 +1,84 @@ +# Copyright (c) 2025 LINE Corporation +# These sources are released under the terms of the MIT license: see LICENSE +from unittest.mock import patch + +from django.contrib.auth.models import Permission +from django.shortcuts import get_object_or_404 +from django.test import RequestFactory +from guardian.shortcuts import assign_perm + +from promgen import models, tests +from promgen.mixins import PromgenGuardianPermissionMixin + + +class MockView(PromgenGuardianPermissionMixin): + def get_object(self): + return self.object + + def dispatch(self, request, *args, **kwargs): + self.request = request + response = self.check_permissions(request) + if response: + return "Permission Denied" + return "Permission Granted" + + +class PromgenGuardianPermissionMixinTest(tests.PromgenTest): + def setUp(self): + self.view = MockView() + factory = RequestFactory() + self.request = factory.get("/") + + def test_permission_granted(self): + user = self.force_login(username="demo") + object = get_object_or_404(models.Project, pk=1) + permission_required = Permission.objects.get( + codename="project_admin", content_type__model="project" + ) + assign_perm(permission_required, user, object) + self.view.permission_required = permission_required.codename + self.view.object = object + self.request.user = user + response = self.view.dispatch(self.request) + self.assertEqual(response, "Permission Granted") + + @patch("django.contrib.messages.api.add_message") + def test_permission_not_granted(self, mock_add_message): + user = self.force_login(username="demo") + object = get_object_or_404(models.Project, pk=1) + permission_required = Permission.objects.get( + codename="project_admin", content_type__model="project" + ) + self.view.permission_required = permission_required.codename + self.view.object = object + self.request.user = user + response = self.view.dispatch(self.request) + self.assertEqual(response, "Permission Denied") + + def test_permission_granted_on_parent_object(self): + user = self.force_login(username="demo") + object = get_object_or_404(models.Service, pk=1) + permission_required = Permission.objects.get( + codename="service_admin", content_type__model="service" + ) + assign_perm(permission_required, user, object) + self.view.permission_required = permission_required.codename + self.view.object = object + self.request.user = user + response = self.view.dispatch(self.request) + self.assertEqual(response, "Permission Granted") + + @patch("django.contrib.messages.api.add_message") + def test_permission_granted_on_another_object(self, mock_add_message): + user = self.force_login(username="demo") + object = get_object_or_404(models.Service, pk=1) + another_object = models.Service.objects.create(name="Another Service", owner=user) + permission_required = Permission.objects.get( + codename="service_admin", content_type__model="service" + ) + assign_perm(permission_required, user, another_object) + self.view.permission_required = permission_required.codename + self.view.object = object + self.request.user = user + response = self.view.dispatch(self.request) + self.assertEqual(response, "Permission Denied") diff --git a/promgen/tests/test_renderers.py b/promgen/tests/test_renderers.py index 8d725bad..e35b9da2 100644 --- a/promgen/tests/test_renderers.py +++ b/promgen/tests/test_renderers.py @@ -10,6 +10,9 @@ class RendererTests(tests.PromgenTest): fixtures = ["testcases.yaml", "extras.yaml"] + def setUp(self): + self.user = self.force_login(username="admin") + def test_global_rule(self): expected = tests.Data("examples", "export.rule.yml").yaml() response = self.client.get(reverse("api:all-rules")) diff --git a/promgen/tests/test_rest.py b/promgen/tests/test_rest.py index 9335bae9..65605198 100644 --- a/promgen/tests/test_rest.py +++ b/promgen/tests/test_rest.py @@ -5,6 +5,7 @@ from django.contrib.auth.models import Permission from django.test import override_settings from django.urls import reverse +from guardian.shortcuts import assign_perm from promgen import models, rest, tests @@ -37,6 +38,18 @@ def test_alert(self): def test_retrieve_farm(self): expected = tests.Data("examples", "rest.farm.json").json() + # Check retrieving all farms without assigning permissions return empty list + response = self.client.get(reverse("api:farm-list")) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json(), []) + + # Check retrieving a specific farm without assigning permissions return 404 Not Found + response = self.client.get(reverse("api:farm-detail", args=[1])) + self.assertEqual(response.status_code, 404) + + # Assigning permissions to the user + assign_perm("project_viewer", self.user, models.Project.objects.get(id=1)) + # Check retrieving all farms response = self.client.get(reverse("api:farm-list")) self.assertEqual(response.status_code, 200) diff --git a/promgen/tests/test_routes.py b/promgen/tests/test_routes.py index 01cfea3f..c14b85be 100644 --- a/promgen/tests/test_routes.py +++ b/promgen/tests/test_routes.py @@ -7,6 +7,7 @@ from django.urls import reverse from promgen import models, tests, views +from promgen.middleware import get_current_user TEST_SETTINGS = tests.Data("examples", "promgen.yml").yaml() TEST_IMPORT = tests.Data("examples", "import.json").raw() @@ -104,7 +105,11 @@ def test_failed_permission(self): self.assertTrue(response.url.startswith("/login")) def test_other_routes(self): - self.add_user_permissions("promgen.add_rule", "promgen.change_site") + user = get_current_user() + user.is_superuser = True + user.save() for request in [{"viewname": "rule-new", "args": ("site", 1)}]: response = self.client.get(reverse(**request)) self.assertRoute(response, views.AlertRuleRegister, 200) + user.is_superuser = False + user.save() diff --git a/promgen/tests/test_silence.py b/promgen/tests/test_silence.py index fa94769a..7ae8aa1b 100644 --- a/promgen/tests/test_silence.py +++ b/promgen/tests/test_silence.py @@ -21,7 +21,7 @@ class SilenceTest(tests.PromgenTest): fixtures = ["testcases.yaml", "extras.yaml"] def setUp(self): - self.user = self.force_login(username="demo") + self.user = self.force_login(username="admin") @override_settings(PROMGEN=TEST_SETTINGS) @mock.patch("promgen.util.post") diff --git a/promgen/tests/test_web.py b/promgen/tests/test_web.py index b4487248..37e85647 100644 --- a/promgen/tests/test_web.py +++ b/promgen/tests/test_web.py @@ -1,11 +1,13 @@ # Copyright (c) 2022 LINE Corporation # These sources are released under the terms of the MIT license: see LICENSE from django.urls import reverse +from guardian.shortcuts import assign_perm, remove_perm -from promgen import tests, views +from promgen import models, views +from promgen.tests import PromgenTest -class WebTests(tests.PromgenTest): +class WebTests(PromgenTest): fixtures = ["testcases.yaml", "extras.yaml"] route_map = [ @@ -13,15 +15,69 @@ class WebTests(tests.PromgenTest): ("datasource-list", views.DatasourceList, {}), ("datasource-detail", views.DatasourceDetail, {"pk": 1}), ("service-list", views.ServiceList, {}), - ("service-detail", views.ServiceDetail, {"pk": 1}), - ("project-detail", views.ProjectDetail, {"pk": 1}), - ("project-exporter", views.ExporterRegister, {"pk": 1}), - ("project-notifier", views.ProjectNotifierRegister, {"pk": 1}), + ( + "service-detail", + views.ServiceDetail, + { + "pk": 1, + "permission": "service_viewer", + "model": models.Service, + "permission_object_pk": 1, + }, + ), + ( + "project-detail", + views.ProjectDetail, + { + "pk": 1, + "permission": "project_viewer", + "model": models.Project, + "permission_object_pk": 1, + }, + ), + ( + "project-exporter", + views.ExporterRegister, + { + "pk": 1, + "permission": "project_editor", + "model": models.Project, + "permission_object_pk": 1, + }, + ), + ( + "project-notifier", + views.ProjectNotifierRegister, + { + "pk": 1, + "permission": "project_editor", + "model": models.Project, + "permission_object_pk": 1, + }, + ), ("url-list", views.URLList, {}), ("farm-list", views.FarmList, {}), - ("farm-detail", views.FarmDetail, {"pk": 1}), + ( + "farm-detail", + views.FarmDetail, + { + "pk": 1, + "permission": "project_viewer", + "model": models.Project, + "permission_object_pk": 1, + }, + ), ("host-list", views.HostList, {}), - ("host-detail", views.HostDetail, {"slug": "example.com"}), + ( + "host-detail", + views.HostDetail, + { + "slug": "example.com", + "permission": "project_viewer", + "model": models.Project, + "permission_object_pk": 1, + }, + ), ("rules-list", views.RulesList, {}), ("rule-detail", views.RuleDetail, {"pk": 1}), ("audit-list", views.AuditList, {}), @@ -39,6 +95,13 @@ def setUp(self): def test_routes(self): for viewname, viewclass, params in self.route_map: + permission = params.pop("permission", None) + permission_model = params.pop("model", None) + permission_object_pk = params.pop("permission_object_pk", None) + if permission and permission_model and permission_object_pk: + permission_object = permission_model.objects.get(pk=permission_object_pk) + assign_perm(permission, self.user, permission_object) + # By default we'll pass all params as-is to our reverse() # method, but we may have a few special ones (like status_code) # that we want to pop and handle separately @@ -48,3 +111,7 @@ def test_routes(self): with self.subTest(viewname=viewname, params=params): response = self.client.get(reverse(viewname, kwargs=params)) self.assertRoute(response, viewclass, status_code) + + if permission and permission_model and permission_object_pk: + permission_object = permission_model.objects.get(pk=permission_object_pk) + remove_perm(permission, self.user, permission_object) diff --git a/promgen/views.py b/promgen/views.py index 00090d42..0f660d6c 100644 --- a/promgen/views.py +++ b/promgen/views.py @@ -18,7 +18,7 @@ from django.contrib.auth.models import User from django.contrib.contenttypes.models import ContentType from django.core.paginator import EmptyPage, Paginator -from django.db.models import Count, Q +from django.db.models import Count, Prefetch, Q from django.db.utils import IntegrityError from django.http import HttpResponse, HttpResponseRedirect, JsonResponse from django.shortcuts import get_object_or_404, redirect, render @@ -41,6 +41,7 @@ forms, mixins, models, + permissions, plugins, prometheus, signals, @@ -48,64 +49,82 @@ util, ) from promgen.forms import GroupMemberForm, UserPermissionForm +from promgen.mixins import PromgenGuardianPermissionMixin from promgen.shortcuts import resolve_domain logger = logging.getLogger(__name__) class DatasourceList(LoginRequiredMixin, ListView): - queryset = models.Shard.objects.prefetch_related( - "project_set__service", - "project_set__service__owner", - "project_set__service__notifiers", - "project_set__service__notifiers__owner", - "project_set__service__rule_set", - "project_set", - "project_set__owner", - "project_set__farm", - "project_set__exporter_set", - "project_set__notifiers", - "project_set__notifiers__owner", - "prometheus_set", - ).annotate(num_projects=Count("project")) + def get_queryset(self): + projects = models.Project.objects.all() + # If the user is not a superuser, we need to filter the shards by the user's permissions + if not self.request.user.is_superuser: + projects = permissions.get_accessible_projects_for_user(self.request.user) + + return models.Shard.objects.prefetch_related( + Prefetch("project_set", queryset=projects), + "project_set__service", + "project_set__service__owner", + "project_set__service__notifiers", + "project_set__service__notifiers__owner", + "project_set__service__rule_set", + "project_set__owner", + "project_set__farm", + "project_set__exporter_set", + "project_set__notifiers", + "project_set__notifiers__owner", + "prometheus_set", + ).annotate(num_projects=Count("project")) class DatasourceDetail(LoginRequiredMixin, DetailView): - queryset = models.Shard.objects.prefetch_related( - "project_set__service", - "project_set__service__owner", - "project_set__service__notifiers", - "project_set__service__notifiers__owner", - "project_set__service__notifiers__filter_set", - "project_set__service__rule_set", - "project_set", - "project_set__owner", - "project_set__farm", - "project_set__exporter_set", - "project_set__notifiers", - "project_set__notifiers__owner", - "project_set__notifiers__filter_set", - ) + def get_queryset(self): + projects = models.Project.objects.all() + # If the user is not a superuser, we need to filter the shards by the user's permissions + if not self.request.user.is_superuser: + projects = permissions.get_accessible_projects_for_user(self.request.user) + + return models.Shard.objects.prefetch_related( + Prefetch("project_set", queryset=projects), + "project_set__service", + "project_set__service__owner", + "project_set__service__notifiers", + "project_set__service__notifiers__owner", + "project_set__service__notifiers__filter_set", + "project_set__service__rule_set", + "project_set__owner", + "project_set__farm", + "project_set__exporter_set", + "project_set__notifiers", + "project_set__notifiers__owner", + "project_set__notifiers__filter_set", + ) class ServiceList(LoginRequiredMixin, ListView): paginate_by = 20 - queryset = models.Service.objects.prefetch_related( - "rule_set", - "rule_set__parent", - "project_set", - "project_set__owner", - "project_set__shard", - "project_set__notifiers", - "project_set__notifiers__owner", - "project_set__notifiers__filter_set", - "project_set__farm", - "project_set__exporter_set", - "owner", - "notifiers", - "notifiers__owner", - "notifiers__filter_set", - ) + + def get_queryset(self): + query_set = models.Service.objects.prefetch_related( + "rule_set", + "rule_set__parent", + "project_set", + "project_set__owner", + "project_set__shard", + "project_set__notifiers", + "project_set__notifiers__owner", + "project_set__notifiers__filter_set", + "project_set__farm", + "project_set__exporter_set", + "owner", + "notifiers", + "notifiers__owner", + "notifiers__filter_set", + ) + + services = permissions.get_accessible_services_for_user(self.request.user) + return query_set.filter(pk__in=services) class HomeList(LoginRequiredMixin, ListView): @@ -121,7 +140,7 @@ def get_queryset(self): ).values_list("object_id") # and return just our list of services - return models.Service.objects.filter(pk__in=senders).prefetch_related( + query_set = models.Service.objects.filter(pk__in=senders).prefetch_related( "notifiers", "notifiers__owner", "owner", @@ -136,13 +155,25 @@ def get_queryset(self): "project_set__notifiers__owner", ) + services = permissions.get_accessible_services_for_user(self.request.user) + return query_set.filter(pk__in=services) + class HostList(LoginRequiredMixin, ListView): - queryset = models.Host.objects.prefetch_related( - "farm", - "farm__project", - "farm__project__service", - ) + def get_queryset(self): + query_set = models.Host.objects.prefetch_related( + "farm", + "farm__project", + "farm__project__service", + ) + + # If the user is not a superuser, we need to filter the hosts by the user's permissions + if not self.request.user.is_superuser: + projects = permissions.get_accessible_projects_for_user(self.request.user) + farms = models.Farm.objects.filter(project__in=projects) + query_set = query_set.filter(farm__in=farms) + + return query_set def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) @@ -164,48 +195,81 @@ def get(self, request, slug): context = {} context["slug"] = self.kwargs["slug"] - context["host_list"] = models.Host.objects.filter( - name__icontains=self.kwargs["slug"] - ).prefetch_related("farm") + hosts = models.Host.objects.filter(name__icontains=self.kwargs["slug"]).prefetch_related( + "farm" + ) + + # If the user is not a superuser, we need to filter the hosts by the user's permissions + if not self.request.user.is_superuser: + projects = permissions.get_accessible_projects_for_user(self.request.user) + farms = models.Farm.objects.filter(project__in=projects) + hosts = hosts.filter(farm__in=farms) + + context["host_list"] = hosts if not context["host_list"]: return render(request, "promgen/host_404.html", context, status=404) - context["farm_list"] = models.Farm.objects.filter( + farms = models.Farm.objects.filter( id__in=context["host_list"].values_list("farm_id", flat=True) ) - context["project_list"] = models.Project.objects.filter( - id__in=context["farm_list"].values_list("project__id", flat=True) + projects = models.Project.objects.filter( + id__in=farms.values_list("project__id", flat=True) ).prefetch_related("notifiers", "rule_set") - context["exporter_list"] = models.Exporter.objects.filter( - project_id__in=context["project_list"].values_list("id", flat=True) + exporters = models.Exporter.objects.filter( + project_id__in=projects.values_list("id", flat=True) ).prefetch_related("project", "project__service") - context["service_list"] = models.Service.objects.filter( - id__in=context["project_list"].values_list("service__id", flat=True) + services = models.Service.objects.filter( + id__in=projects.values_list("service__id", flat=True) ).prefetch_related("notifiers", "rule_set") - context["rule_list"] = ( + rules = ( models.Rule.objects.filter( - Q(id__in=context["project_list"].values_list("rule_set__id")) - | Q(id__in=context["service_list"].values_list("rule_set__id")) + Q(id__in=projects.values_list("rule_set__id")) + | Q(id__in=services.values_list("rule_set__id")) | Q(id__in=models.Site.objects.get_current().rule_set.values_list("id")) ) .select_related("content_type") .prefetch_related("content_object") ) - context["notifier_list"] = ( + notifiers = ( models.Sender.objects.filter( - Q(id__in=context["project_list"].values_list("notifiers__id")) - | Q(id__in=context["service_list"].values_list("notifiers__id")) + Q(id__in=projects.values_list("notifiers__id")) + | Q(id__in=services.values_list("notifiers__id")) ) .select_related("content_type") .prefetch_related("content_object") ) + # If the user is not a superuser, we need to filter other objects by the user's permissions + if not self.request.user.is_superuser: + accessible_services = permissions.get_accessible_services_for_user(self.request.user) + accessible_projects = permissions.get_accessible_projects_for_user(self.request.user) + + projects = projects.filter(pk__in=accessible_projects) + exporters = exporters.filter(project__in=accessible_projects) + services = services.filter(pk__in=accessible_services) + rules = rules.filter( + Q(content_type__model="service", object_id__in=accessible_services) + | Q(content_type__model="project", object_id__in=accessible_projects) + | Q(id__in=models.Site.objects.get_current().rule_set.values_list("id")) + ) + notifiers = notifiers.filter( + Q(content_type__model="service", object_id__in=accessible_services) + | Q(content_type__model="project", object_id__in=accessible_projects) + ) + + context["farm_list"] = farms + context["project_list"] = projects + context["exporter_list"] = exporters + context["service_list"] = services + context["rule_list"] = rules + context["notifier_list"] = notifiers + return render(request, "promgen/host_detail.html", context) @@ -244,12 +308,63 @@ def get_queryset(self): if "user" in self.request.GET: queryset = queryset.filter(user_id=self.request.GET["user"]) + # If the user is not a superuser, we need to filter the audits by the user's permissions + if not self.request.user.is_superuser: + # Get all the services that the user has access to + services = permissions.get_accessible_services_for_user(self.request.user) + + # Get all the projects that the user has access to + projects = permissions.get_accessible_projects_for_user(self.request.user) + + # Get all the farm that the user has access to + farms = models.Farm.objects.filter(project__in=projects) + + # Get all the groups that the user has access to + groups = permissions.get_accessible_groups_for_user(self.request.user) + + # Filter the queryset by the user's permissions + queryset = queryset.filter( + Q( + content_type__model="service", + content_type__app_label="promgen", + object_id__in=services, + ) + | Q( + content_type__model="project", + content_type__app_label="promgen", + object_id__in=projects, + ) + | Q( + content_type__model="farm", + content_type__app_label="promgen", + object_id__in=farms, + ) + | Q( + parent_content_type_id=ContentType.objects.get_for_model(models.Service).id, + parent_object_id__in=services, + ) + | Q( + parent_content_type_id=ContentType.objects.get_for_model(models.Project).id, + parent_object_id__in=projects, + ) + | Q( + parent_content_type_id=ContentType.objects.get_for_model(models.Farm).id, + parent_object_id__in=farms, + ) + | Q( + content_type__model="group", + content_type__app_label="promgen", + object_id__in=groups, + ) + ) + return queryset paginate_by = 50 -class ServiceDetail(LoginRequiredMixin, DetailView): +class ServiceDetail(PromgenGuardianPermissionMixin, DetailView): + permission_required = ["service_admin", "service_editor", "service_viewer"] queryset = models.Service.objects.prefetch_related( "rule_set", "notifiers", @@ -269,21 +384,24 @@ def get_context_data(self, **kwargs): return context -class ServiceDelete(LoginRequiredMixin, DeleteView): +class ServiceDelete(PromgenGuardianPermissionMixin, DeleteView): + permission_required = ["service_admin"] model = models.Service def get_success_url(self): return reverse("service-list") -class ProjectDelete(LoginRequiredMixin, DeleteView): +class ProjectDelete(PromgenGuardianPermissionMixin, DeleteView): + permission_required = ["service_admin", "project_admin"] model = models.Project def get_success_url(self): return reverse("service-detail", args=[self.object.service_id]) -class NotifierUpdate(LoginRequiredMixin, UpdateView): +class NotifierUpdate(PromgenGuardianPermissionMixin, UpdateView): + permission_required = ["service_admin", "service_editor", "project_admin", "project_editor"] model = models.Sender form_class = forms.NotifierUpdate @@ -334,7 +452,8 @@ def post(self, request, pk): return self.get(self, request, pk) -class NotifierDelete(LoginRequiredMixin, DeleteView): +class NotifierDelete(PromgenGuardianPermissionMixin, DeleteView): + permission_required = ["service_admin", "service_editor", "project_admin", "project_editor"] model = models.Sender def get_success_url(self): @@ -345,7 +464,9 @@ def get_success_url(self): return reverse("profile") -class NotifierTest(LoginRequiredMixin, View): +class NotifierTest(PromgenGuardianPermissionMixin, View): + permission_required = ["service_admin", "service_editor", "project_admin", "project_editor"] + def post(self, request, pk): sender = get_object_or_404(models.Sender, id=pk) try: @@ -361,15 +482,21 @@ def post(self, request, pk): return redirect(sender.content_object) return redirect("profile") + def get_check_permission_object(self): + return get_object_or_404(models.Sender, id=self.kwargs["pk"]) + -class ExporterDelete(LoginRequiredMixin, DeleteView): +class ExporterDelete(PromgenGuardianPermissionMixin, DeleteView): + permission_required = ["service_admin", "service_editor", "project_admin", "project_editor"] model = models.Exporter def get_success_url(self): return reverse("project-detail", args=[self.object.project_id]) + "#exporters" -class ExporterToggle(LoginRequiredMixin, View): +class ExporterToggle(PromgenGuardianPermissionMixin, View): + permission_required = ["service_admin", "service_editor", "project_admin", "project_editor"] + def post(self, request, pk): exporter = get_object_or_404(models.Exporter, id=pk) exporter.enabled = not exporter.enabled @@ -377,8 +504,17 @@ def post(self, request, pk): signals.trigger_write_config.send(request) return JsonResponse({"redirect": exporter.project.get_absolute_url() + "#exporters"}) + def get_check_permission_object(self): + return get_object_or_404(models.Exporter, id=self.kwargs["pk"]) + + def on_permission_check_fail(self, request, response, obj=None): + messages.warning(request, "You do not have permission to perform this action.") + return JsonResponse({"redirect": "#exporters"}) + + +class NotifierToggle(PromgenGuardianPermissionMixin, View): + permission_required = ["service_admin", "service_editor", "project_admin", "project_editor"] -class NotifierToggle(LoginRequiredMixin, View): def post(self, request, pk): sender = get_object_or_404(models.Sender, id=pk) sender.enabled = not sender.enabled @@ -386,8 +522,16 @@ def post(self, request, pk): # Redirect to current page return JsonResponse({"redirect": "#notifiers"}) + def get_check_permission_object(self): + return get_object_or_404(models.Sender, id=self.kwargs["pk"]) + + def on_permission_check_fail(self, request, response, obj=None): + messages.warning(request, "You do not have permission to perform this action.") + return JsonResponse({"redirect": "#notifiers"}) + -class RuleDelete(mixins.PromgenPermissionMixin, DeleteView): +class RuleDelete(PromgenGuardianPermissionMixin, DeleteView): + permission_required = ["service_admin", "service_editor", "project_admin", "project_editor"] model = models.Rule def get_permission_denied_message(self): @@ -407,7 +551,8 @@ def get_success_url(self): return self.object.content_object.get_absolute_url() + "#rules" -class RuleToggle(mixins.PromgenPermissionMixin, SingleObjectMixin, View): +class RuleToggle(PromgenGuardianPermissionMixin, SingleObjectMixin, View): + permission_required = ["service_admin", "service_editor", "project_admin", "project_editor"] model = models.Rule def get_permission_denied_message(self): @@ -428,15 +573,28 @@ def post(self, request, pk): self.object.save() return JsonResponse({"redirect": self.object.content_object.get_absolute_url() + "#rules"}) + def on_permission_check_fail(self, request, response, obj=None): + messages.warning(request, "You do not have permission to perform this action.") + return JsonResponse({"redirect": "#rules"}) -class HostDelete(LoginRequiredMixin, DeleteView): + +class HostDelete(PromgenGuardianPermissionMixin, DeleteView): + permission_required = ["project_admin", "service_admin", "project_editor", "service_editor"] model = models.Host def get_success_url(self): return self.object.farm.get_absolute_url() -class ProjectDetail(LoginRequiredMixin, DetailView): +class ProjectDetail(PromgenGuardianPermissionMixin, DetailView): + permission_required = [ + "service_admin", + "service_editor", + "service_viewer", + "project_admin", + "project_editor", + "project_viewer", + ] queryset = models.Project.objects.prefetch_related( "rule_set", "rule_set__parent", @@ -465,13 +623,30 @@ def get_context_data(self, **kwargs): class FarmList(LoginRequiredMixin, ListView): paginate_by = 50 - queryset = models.Farm.objects.prefetch_related( - "project", - "host_set", - ) + + def get_queryset(self): + query_set = models.Farm.objects.prefetch_related( + "project", + "host_set", + ) + + # If the user is not a superuser, we need to filter the farms by the user's permissions + if not self.request.user.is_superuser: + projects = permissions.get_accessible_projects_for_user(self.request.user) + query_set = query_set.filter(project__in=projects) + + return query_set -class FarmDetail(LoginRequiredMixin, DetailView): +class FarmDetail(PromgenGuardianPermissionMixin, DetailView): + permission_required = [ + "project_admin", + "service_admin", + "project_editor", + "service_editor", + "project_viewer", + "service_viewer", + ] model = models.Farm def get_context_data(self, **kwargs): @@ -480,7 +655,8 @@ def get_context_data(self, **kwargs): return context -class FarmUpdate(LoginRequiredMixin, UpdateView): +class FarmUpdate(PromgenGuardianPermissionMixin, UpdateView): + permission_required = ["project_admin", "service_admin", "project_editor", "service_editor"] model = models.Farm button_label = _("Update Farm") template_name = "promgen/farm_update.html" @@ -500,7 +676,8 @@ def form_valid(self, form): return redirect("farm-detail", pk=farm.id) -class FarmDelete(LoginRequiredMixin, RedirectView): +class FarmDelete(PromgenGuardianPermissionMixin, RedirectView): + permission_required = ["service_admin", "service_editor", "project_admin", "project_editor"] pattern_name = "farm-detail" def post(self, request, pk): @@ -509,8 +686,13 @@ def post(self, request, pk): return HttpResponseRedirect(request.POST.get("next", reverse("farm-list"))) + def get_check_permission_object(self): + return get_object_or_404(models.Farm, id=self.kwargs["pk"]) + + +class UnlinkFarm(PromgenGuardianPermissionMixin, View): + permission_required = ["service_admin", "service_editor", "project_admin", "project_editor"] -class UnlinkFarm(LoginRequiredMixin, View): def post(self, request, pk): project = get_object_or_404(models.Project, id=pk) oldfarm, project.farm = project.farm, None @@ -521,6 +703,9 @@ def post(self, request, pk): return HttpResponseRedirect(reverse("project-detail", args=[project.id]) + "#hosts") + def get_check_permission_object(self): + return get_object_or_404(models.Project, id=self.kwargs["pk"]) + class RulesList(LoginRequiredMixin, ListView, mixins.ServiceMixin): paginate_by = 50 @@ -551,14 +736,25 @@ def get_context_data(self, **kwargs): "parent", ) + # If the user is not a superuser, we need to filter the rules by the user's permissions + if not self.request.user.is_superuser: + services = permissions.get_accessible_services_for_user(self.request.user) + service_rules = service_rules.filter(object_id__in=services) + + projects = permissions.get_accessible_projects_for_user(self.request.user) + project_rules = project_rules.filter(object_id__in=projects) + rule_list = list(chain(site_rules, service_rules, project_rules)) page_number = self.request.GET.get("page", 1) context["rule_list"] = Paginator(rule_list, self.paginate_by).page(page_number) + context["page_obj"] = context["rule_list"] return context -class RulesCopy(LoginRequiredMixin, View): +class RulesCopy(PromgenGuardianPermissionMixin, View): + permission_required = ["service_admin", "service_editor", "project_admin", "project_editor"] + def post(self, request, pk): original = get_object_or_404(models.Rule, id=pk) form = forms.RuleCopyForm(request.POST) @@ -569,6 +765,13 @@ def post(self, request, pk): else: return HttpResponseRedirect(reverse("service-detail", args=[pk]) + "#rules") + def get_check_permission_object(self): + content_type = ContentType.objects.get( + app_label="promgen", model=self.request.POST["content_type"] + ) + model_class = content_type.model_class() + return model_class.objects.get(pk=self.request.POST["object_id"]) + class FarmRefresh(LoginRequiredMixin, RedirectView): pattern_name = "farm-detail" @@ -587,7 +790,8 @@ def post(self, request, pk): return redirect(farm) -class FarmConvert(LoginRequiredMixin, RedirectView): +class FarmConvert(PromgenGuardianPermissionMixin, RedirectView): + permission_required = ["project_admin", "service_admin", "project_editor", "service_editor"] pattern_name = "farm-detail" def post(self, request, pk): @@ -611,8 +815,13 @@ def post(self, request, pk): request.POST.get("next", reverse("farm-detail", args=[farm.pk])) ) + def get_check_permission_object(self): + return get_object_or_404(models.Farm, id=self.kwargs["pk"]) + + +class FarmLink(PromgenGuardianPermissionMixin, View): + permission_required = ["service_admin", "service_editor", "project_admin", "project_editor"] -class FarmLink(LoginRequiredMixin, View): def get(self, request, pk, source): if source == discovery.FARM_DEFAULT: messages.error(request, "Cannot link to local farm") @@ -643,8 +852,12 @@ def post(self, request, pk, source): messages.info(request, "Refreshed hosts") return HttpResponseRedirect(reverse("project-detail", args=[project.id]) + "#hosts") + def get_check_permission_object(self): + return get_object_or_404(models.Project, id=self.kwargs["pk"]) + -class ExporterRegister(LoginRequiredMixin, FormView, mixins.ProjectMixin): +class ExporterRegister(PromgenGuardianPermissionMixin, FormView, mixins.ProjectMixin): + permission_required = ["service_admin", "service_editor", "project_admin", "project_editor"] model = models.Exporter template_name = "promgen/exporter_form.html" form_class = forms.ExporterForm @@ -654,6 +867,9 @@ def form_valid(self, form): exporter, _ = models.Exporter.objects.get_or_create(project=project, **form.clean()) return HttpResponseRedirect(reverse("project-detail", args=[project.id]) + "#exporters") + def get_check_permission_object(self): + return get_object_or_404(models.Project, id=self.kwargs["pk"]) + class ExporterScrape(LoginRequiredMixin, View): # TODO: Move to /rest/project//scrape @@ -710,7 +926,8 @@ def query(): return JsonResponse({"error": "Error with query %s" % e}) -class URLRegister(LoginRequiredMixin, FormView, mixins.ProjectMixin): +class URLRegister(PromgenGuardianPermissionMixin, FormView, mixins.ProjectMixin): + permission_required = ["service_admin", "service_editor", "project_admin", "project_editor"] model = models.URL form_class = forms.URLForm @@ -719,8 +936,12 @@ def form_valid(self, form): url, _ = models.URL.objects.get_or_create(project=project, **form.clean()) return HttpResponseRedirect(reverse("project-detail", args=[project.id]) + "#http-checks") + def get_check_permission_object(self): + return get_object_or_404(models.Project, id=self.kwargs["pk"]) + -class URLDelete(LoginRequiredMixin, DeleteView): +class URLDelete(PromgenGuardianPermissionMixin, DeleteView): + permission_required = ["service_admin", "service_editor", "project_admin", "project_editor"] model = models.URL def get_success_url(self): @@ -728,15 +949,24 @@ def get_success_url(self): class URLList(LoginRequiredMixin, ListView): - queryset = models.URL.objects.prefetch_related( - "project", - "project__service", - "project__shard", - "probe", - ) + def get_queryset(self): + query_set = models.URL.objects.prefetch_related( + "project", + "project__service", + "project__shard", + "probe", + ) + + # If the user is not a superuser, we need to filter the URLs by the user's permissions + if not self.request.user.is_superuser: + projects = permissions.get_accessible_projects_for_user(self.request.user) + query_set = query_set.filter(project__in=projects) + + return query_set -class ProjectRegister(LoginRequiredMixin, CreateView): +class ProjectRegister(PromgenGuardianPermissionMixin, CreateView): + permission_required = ["service_admin", "service_editor"] button_label = _("Register Project") model = models.Project fields = ["name", "description", "owner", "shard"] @@ -765,8 +995,12 @@ def form_valid(self, form): form.instance.service_id = self.kwargs["pk"] return super().form_valid(form) + def get_check_permission_object(self): + return get_object_or_404(models.Service, id=self.kwargs["pk"]) -class ProjectUpdate(LoginRequiredMixin, UpdateView): + +class ProjectUpdate(PromgenGuardianPermissionMixin, UpdateView): + permission_required = ["service_admin", "service_editor", "project_admin", "project_editor"] model = models.Project button_label = _("Project Update") template_name = "promgen/project_form.html" @@ -789,7 +1023,8 @@ def form_valid(self, form): return super().form_valid(form) -class ServiceUpdate(LoginRequiredMixin, UpdateView): +class ServiceUpdate(PromgenGuardianPermissionMixin, UpdateView): + permission_required = ["service_admin", "service_editor"] button_label = _("Update Service") form_class = forms.ServiceUpdate model = models.Service @@ -805,7 +1040,15 @@ def form_valid(self, form): return super().form_valid(form) -class RuleDetail(LoginRequiredMixin, DetailView): +class RuleDetail(PromgenGuardianPermissionMixin, DetailView): + permission_required = [ + "service_admin", + "service_editor", + "service_viewer", + "project_admin", + "project_editor", + "project_viewer", + ] queryset = models.Rule.objects.prefetch_related( "content_object", "content_type", @@ -815,7 +1058,9 @@ class RuleDetail(LoginRequiredMixin, DetailView): ) -class RuleUpdate(mixins.PromgenPermissionMixin, UpdateView): +class RuleUpdate(PromgenGuardianPermissionMixin, UpdateView): + permission_required = ["service_admin", "service_editor", "project_admin", "project_editor"] + def get_permission_denied_message(self): return "Unable to edit rule %s. User lacks permission" % self.object @@ -880,7 +1125,8 @@ def post(self, request, *args, **kwargs): return self.form_valid(context["form"]) -class AlertRuleRegister(mixins.PromgenPermissionMixin, mixins.RuleFormMixin, FormView): +class AlertRuleRegister(PromgenGuardianPermissionMixin, mixins.RuleFormMixin, FormView): + permission_required = ["service_admin", "service_editor", "project_admin", "project_editor"] model = models.Rule template_name = "promgen/rule_register.html" form_class = forms.AlertRuleForm @@ -912,6 +1158,13 @@ def form_import(self, form, content_object): messages.info(self.request, "Imported %s" % counters) return HttpResponseRedirect(content_object.get_absolute_url()) + def get_check_permission_object(self): + id = self.kwargs["object_id"] + model = self.kwargs["content_type"] + models = ContentType.objects.get(app_label="promgen", model=model) + obj = models.get_object_for_this_type(pk=id) + return obj + class ServiceRegister(LoginRequiredMixin, CreateView): button_label = _("Register Service") @@ -931,7 +1184,8 @@ def post(self, request, *args, **kwargs): return super().post(request, *args, **kwargs) -class FarmRegister(LoginRequiredMixin, FormView, mixins.ProjectMixin): +class FarmRegister(PromgenGuardianPermissionMixin, FormView, mixins.ProjectMixin): + permission_required = ["service_admin", "service_editor", "project_admin", "project_editor"] model = models.Farm button_label = _("Register Farm") template_name = "promgen/farm_register.html" @@ -961,8 +1215,12 @@ def form_valid(self, form): return HttpResponseRedirect(project.get_absolute_url() + "#hosts") + def get_check_permission_object(self): + return get_object_or_404(models.Project, id=self.kwargs["pk"]) + -class ProjectNotifierRegister(LoginRequiredMixin, FormView, mixins.ProjectMixin): +class ProjectNotifierRegister(PromgenGuardianPermissionMixin, FormView, mixins.ProjectMixin): + permission_required = ["service_admin", "service_editor", "project_admin", "project_editor"] model = models.Sender template_name = "promgen/notifier_form.html" form_class = forms.SenderForm @@ -977,8 +1235,12 @@ def form_valid(self, form): signals.check_user_subscription(models.Sender, sender, created, self.request) return HttpResponseRedirect(project.get_absolute_url() + "#notifiers") + def get_check_permission_object(self): + return get_object_or_404(models.Project, id=self.kwargs["pk"]) -class ServiceNotifierRegister(LoginRequiredMixin, FormView, mixins.ServiceMixin): + +class ServiceNotifierRegister(PromgenGuardianPermissionMixin, FormView, mixins.ServiceMixin): + permission_required = ["service_admin", "service_editor"] model = models.Sender template_name = "promgen/notifier_form.html" form_class = forms.SenderForm @@ -993,6 +1255,9 @@ def form_valid(self, form): signals.check_user_subscription(models.Sender, sender, created, self.request) return HttpResponseRedirect(service.get_absolute_url() + "#notifiers") + def get_check_permission_object(self): + return get_object_or_404(models.Service, id=self.kwargs["pk"]) + class SiteDetail(LoginRequiredMixin, TemplateView): template_name = "promgen/site_detail.html" @@ -1028,7 +1293,8 @@ def form_valid(self, form): return redirect("profile") -class HostRegister(LoginRequiredMixin, FormView): +class HostRegister(PromgenGuardianPermissionMixin, FormView): + permission_required = ["project_admin", "service_admin", "project_editor", "service_editor"] model = models.Host template_name = "promgen/host_form.html" form_class = forms.HostForm @@ -1047,6 +1313,9 @@ def form_valid(self, form): return redirect("farm-detail", pk=farm.id) + def get_check_permission_object(self): + return get_object_or_404(models.Farm, id=self.kwargs["pk"]) + class ApiConfig(View): def get(self, request): @@ -1108,24 +1377,35 @@ class AlertList(LoginRequiredMixin, ListView): queryset = models.Alert.objects.order_by("-created") def get_queryset(self): + qs = self.queryset search = self.request.GET.get("search") if search: - return self.queryset.filter( + qs = self.queryset.filter( Q(alertlabel__name="Service", alertlabel__value__icontains=search) | Q(alertlabel__name="Project", alertlabel__value__icontains=search) | Q(alertlabel__name="Job", alertlabel__value__icontains=search) ) + else: + for key, value in self.request.GET.items(): + if key in ["page", "search"]: + continue + elif key == "noSent": + qs = qs.filter(sent_count=0) + elif key == "sentError": + qs = qs.exclude(error_count=0) + else: + qs = qs.filter(alertlabel__name=key, alertlabel__value=value) + + # If the user is not a superuser, we need to filter the alerts by the user's permissions + if not self.request.user.is_superuser: + services = permissions.get_accessible_services_for_user(self.request.user) + projects = permissions.get_accessible_projects_for_user(self.request.user) + + qs = qs.filter( + Q(alertlabel__name="Service", alertlabel__value__in=services.values_list("name")) + | Q(alertlabel__name="Project", alertlabel__value__in=projects.values_list("name")) + ) - qs = self.queryset - for key, value in self.request.GET.items(): - if key in ["page", "search"]: - continue - elif key == "noSent": - qs = qs.filter(sent_count=0) - elif key == "sentError": - qs = qs.exclude(error_count=0) - else: - qs = qs.filter(alertlabel__name=key, alertlabel__value=value) return qs @@ -1296,16 +1576,41 @@ def get(self, request): else: filters = Q(**{field: query_dict[var]}) - # For groups, we want to exclude the default group from search results - if obj["model"] == models.Group: - if filters: - filters &= ~Q(name=settings.PROMGEN_DEFAULT_GROUP) - else: - filters = ~Q(name=settings.PROMGEN_DEFAULT_GROUP) - logger.info("filtering %s by %s", target, filters) qs = qs.filter(filters) + + # If the user is not a superuser, we need to filter the result by the user's permissions + if not self.request.user.is_superuser: + services = permissions.get_accessible_services_for_user(self.request.user) + projects = permissions.get_accessible_projects_for_user(self.request.user) + farms = models.Farm.objects.filter(project__in=projects) + groups = permissions.get_accessible_groups_for_user(self.request.user) + + if obj["model"] == models.Service: + qs = qs.filter(pk__in=services) + elif obj["model"] == models.Project: + qs = qs.filter(pk__in=projects) + elif obj["model"] == models.Farm: + qs = qs.filter(pk__in=farms) + elif obj["model"] == models.Host: + qs = qs.filter(farm__in=farms) + elif obj["model"] == models.Group: + qs = qs.filter(pk__in=groups) + elif obj["model"] == models.Rule: + qs = qs.filter( + Q( + content_type__model="service", + content_type__app_label="promgen", + object_id__in=services, + ) + | Q( + content_type__model="project", + content_type__app_label="promgen", + object_id__in=projects, + ) + ) + try: page_number = query_dict.get("page", 1) page_target = Paginator(qs, self.paginate_by).page(page_number) @@ -1516,7 +1821,7 @@ def get(self, request): return redirect("profile") -class PermissionAssign(LoginRequiredMixin, View): +class PermissionAssign(PromgenGuardianPermissionMixin, View): permission_required = ["service_admin", "project_admin"] def post(self, request): @@ -1576,7 +1881,9 @@ def get_object(self): return obj -class PermissionDelete(LoginRequiredMixin, View): +class PermissionDelete(PromgenGuardianPermissionMixin, View): + permission_required = ["service_admin", "project_admin"] + def post(self, request): obj = self.get_object() permission_type = request.POST["perm-type"] @@ -1650,11 +1957,14 @@ def build_success_message(self, message): class GroupList(LoginRequiredMixin, ListView): paginate_by = 20 - queryset = models.Group.objects.exclude(name=settings.PROMGEN_DEFAULT_GROUP).order_by("name") + def get_queryset(self): + return permissions.get_accessible_groups_for_user(self.request.user).order_by("name") -class GroupDetail(LoginRequiredMixin, DetailView): - queryset = models.Group.objects.exclude(name=settings.PROMGEN_DEFAULT_GROUP) + +class GroupDetail(PromgenGuardianPermissionMixin, DetailView): + permission_required = ["group_admin", "group_member"] + model = models.Group def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) @@ -1663,8 +1973,9 @@ def get_context_data(self, **kwargs): return context -class GroupAddMember(LoginRequiredMixin, SingleObjectMixin, View): - queryset = models.Group.objects.exclude(name=settings.PROMGEN_DEFAULT_GROUP) +class GroupAddMember(PromgenGuardianPermissionMixin, SingleObjectMixin, View): + permission_required = ["group_admin"] + model = models.Group def post(self, request, *args, **kwargs): group = self.get_object() @@ -1701,8 +2012,9 @@ def post(self, request, *args, **kwargs): return redirect("group-detail", pk=group.pk) -class GroupUpdateMember(LoginRequiredMixin, SingleObjectMixin, View): - queryset = models.Group.objects.exclude(name=settings.PROMGEN_DEFAULT_GROUP) +class GroupUpdateMember(PromgenGuardianPermissionMixin, SingleObjectMixin, View): + permission_required = ["group_admin"] + model = models.Group def post(self, request, *args, **kwargs): group = self.get_object() @@ -1730,8 +2042,9 @@ def post(self, request, *args, **kwargs): return redirect("group-detail", pk=group.pk) -class GroupRemoveMember(LoginRequiredMixin, SingleObjectMixin, View): - queryset = models.Group.objects.exclude(name=settings.PROMGEN_DEFAULT_GROUP) +class GroupRemoveMember(PromgenGuardianPermissionMixin, SingleObjectMixin, View): + permission_required = ["group_admin"] + model = models.Group def post(self, request, *args, **kwargs): group = self.get_object() @@ -1779,15 +2092,17 @@ def get_success_url(self): return super().get_success_url() -class GroupUpdate(LoginRequiredMixin, UpdateView): +class GroupUpdate(PromgenGuardianPermissionMixin, UpdateView): + permission_required = ["group_admin"] button_label = _("Update Group") - queryset = models.Group.objects.exclude(name=settings.PROMGEN_DEFAULT_GROUP) + model = models.Group fields = ["name"] -class GroupDelete(LoginRequiredMixin, DeleteView): +class GroupDelete(PromgenGuardianPermissionMixin, DeleteView): + permission_required = ["group_admin"] button_label = _("Delete Group") - queryset = models.Group.objects.exclude(name=settings.PROMGEN_DEFAULT_GROUP) + model = models.Group def get_success_url(self): return reverse("group-list")