diff --git a/src/sentry/dynamic_sampling/per_org/tasks/configuration.py b/src/sentry/dynamic_sampling/per_org/tasks/configuration.py index 9bc52dfcf4614d..2a05bd53f0041b 100644 --- a/src/sentry/dynamic_sampling/per_org/tasks/configuration.py +++ b/src/sentry/dynamic_sampling/per_org/tasks/configuration.py @@ -45,6 +45,7 @@ def get_configuration(organization_id: int) -> BaseDynamicSamplingConfiguration: class BaseDynamicSamplingConfiguration(ABC): measure: SamplingMeasure + should_balance_projects: bool = True def __init__(self, organization: Organization) -> None: self.organization = organization @@ -115,6 +116,7 @@ def is_enabled(self) -> bool: class CustomDynamicSamplingProjectConfiguration(BaseDynamicSamplingConfiguration): project_target_sample_rates: ProjectTargetSampleRates + should_balance_projects: bool = False def __init__(self, organization: Organization) -> None: super().__init__(organization) diff --git a/src/sentry/dynamic_sampling/per_org/tasks/queries.py b/src/sentry/dynamic_sampling/per_org/tasks/queries.py index a5a69d684f8a4c..d5b36a8f765d78 100644 --- a/src/sentry/dynamic_sampling/per_org/tasks/queries.py +++ b/src/sentry/dynamic_sampling/per_org/tasks/queries.py @@ -1,6 +1,7 @@ from __future__ import annotations from collections.abc import Iterator, Mapping +from dataclasses import dataclass from datetime import UTC, datetime, timedelta from typing import Any @@ -8,6 +9,7 @@ from sentry.constants import ObjectStatus from sentry.dynamic_sampling.per_org.tasks.configuration import BaseDynamicSamplingConfiguration +from sentry.dynamic_sampling.rules.utils import ProjectId from sentry.dynamic_sampling.tasks.common import ( ACTIVE_ORGS_VOLUMES_DEFAULT_TIME_INTERVAL, OrganizationDataVolume, @@ -20,8 +22,17 @@ from sentry.snuba.spans_rpc import Spans +@dataclass(order=True) +class ProjectVolume: + project_id: ProjectId + total: int + keep: int + drop: int + + def _get_aggregate_int(row: Mapping[str, Any], column: str) -> int: - return int(row.get(column, 0)) + value = row.get(column) + return int(value) if value is not None else 0 def run_eap_spans_table_query_in_chunks( @@ -51,8 +62,9 @@ def get_eap_organization_volume( config: BaseDynamicSamplingConfiguration, time_interval: timedelta = ACTIVE_ORGS_VOLUMES_DEFAULT_TIME_INTERVAL, ) -> OrganizationDataVolume | None: + organization = config.organization projects = list( - Project.objects.filter(organization_id=config.organization.id, status=ObjectStatus.ACTIVE) + Project.objects.filter(organization_id=organization.id, status=ObjectStatus.ACTIVE) ) if not projects: return None @@ -64,7 +76,7 @@ def get_eap_organization_volume( start=start_time, end=end_time, projects=projects, - organization=config.organization, + organization=organization, ), query_string="is_transaction:true", selected_columns=["count()", "count_sample()"], @@ -89,4 +101,53 @@ def get_eap_organization_volume( return None indexed = _get_aggregate_int(row, "count_sample()") - return OrganizationDataVolume(org_id=config.organization.id, total=total, indexed=indexed) + return OrganizationDataVolume(org_id=organization.id, total=total, indexed=indexed) + + +def get_eap_project_volumes( + config: BaseDynamicSamplingConfiguration, + time_interval: timedelta = timedelta(hours=1), +) -> list[ProjectVolume]: + organization = config.organization + projects = list( + Project.objects.filter(organization_id=organization.id, status=ObjectStatus.ACTIVE) + ) + if not projects: + return [] + + end_time = datetime.now(UTC) + start_time = end_time - time_interval + project_volumes: list[ProjectVolume] = [] + + for data in run_eap_spans_table_query_in_chunks( + { + "params": SnubaParams( + start=start_time, + end=end_time, + projects=projects, + organization=organization, + ), + "query_string": "is_segment:true", + "selected_columns": ["sentry.dsc.root_project", "count()", "count_sample()"], + "orderby": ["sentry.dsc.root_project"], + "referrer": Referrer.DYNAMIC_SAMPLING_PER_ORG_GET_EAP_PROJECT_VOLUMES.value, + "config": SearchResolverConfig( + auto_fields=True, + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_SERVER_ONLY, + ), + "sampling_mode": SAMPLING_MODE_HIGHEST_ACCURACY, + } + ): + for row in data: + total = _get_aggregate_int(row, "count()") + keep = _get_aggregate_int(row, "count_sample()") + project_volumes.append( + ProjectVolume( + project_id=ProjectId(int(row["sentry.dsc.root_project"])), + total=total, + keep=keep, + drop=max(total - keep, 0), + ) + ) + + return project_volumes diff --git a/src/sentry/dynamic_sampling/per_org/tasks/scheduler.py b/src/sentry/dynamic_sampling/per_org/tasks/scheduler.py index 0c6e77c563ba23..e91d55ed86b3ef 100644 --- a/src/sentry/dynamic_sampling/per_org/tasks/scheduler.py +++ b/src/sentry/dynamic_sampling/per_org/tasks/scheduler.py @@ -10,7 +10,10 @@ from sentry.dynamic_sampling.per_org.tasks.configuration import get_configuration from sentry.dynamic_sampling.per_org.tasks.gate import is_org_in_rollout -from sentry.dynamic_sampling.per_org.tasks.queries import get_eap_organization_volume +from sentry.dynamic_sampling.per_org.tasks.queries import ( + get_eap_organization_volume, + get_eap_project_volumes, +) from sentry.dynamic_sampling.per_org.tasks.telemetry import ( SCHEDULER_BUCKET_ORG_STATUS_METRIC, TelemetryStatus, @@ -103,6 +106,11 @@ def run_calculations_per_org_task(org_id: OrganizationId) -> TelemetryStatus | N org_volume = get_eap_organization_volume(config) if org_volume is None: - return TelemetryStatus.NO_VOLUME + return TelemetryStatus.NO_ORG_VOLUME + + if config.should_balance_projects: + project_volumes = get_eap_project_volumes(config) + if not project_volumes: + return TelemetryStatus.NO_PROJECT_VOLUMES return None diff --git a/src/sentry/dynamic_sampling/per_org/tasks/telemetry.py b/src/sentry/dynamic_sampling/per_org/tasks/telemetry.py index f5d2b377970bbd..722daf8db00c57 100644 --- a/src/sentry/dynamic_sampling/per_org/tasks/telemetry.py +++ b/src/sentry/dynamic_sampling/per_org/tasks/telemetry.py @@ -32,7 +32,8 @@ class TelemetryStatus(StrEnum): FAILED = "failed" KILLSWITCHED = "killswitched" NO_SUBSCRIPTION = "no_subscription" - NO_VOLUME = "no_volume" + NO_ORG_VOLUME = "no_org_volume" + NO_PROJECT_VOLUMES = "no_project_volumes" NOT_IN_ROLLOUT = "not_in_rollout" ORG_HAS_NO_DYNAMIC_SAMPLING = "org_has_no_dynamic_sampling" ORG_NOT_FOUND = "org_not_found" diff --git a/src/sentry/snuba/referrer.py b/src/sentry/snuba/referrer.py index 5da654dc682bb6..f1b04fc292ee9b 100644 --- a/src/sentry/snuba/referrer.py +++ b/src/sentry/snuba/referrer.py @@ -639,6 +639,9 @@ class Referrer(StrEnum): "dynamic_sampling.distribution.fetch_projects_with_count_per_root_total_volumes" ) DYNAMIC_SAMPLING_PER_ORG_GET_EAP_ORG_VOLUME = "dynamic_sampling.per_org.get_eap_org_volume" + DYNAMIC_SAMPLING_PER_ORG_GET_EAP_PROJECT_VOLUMES = ( + "dynamic_sampling.per_org.get_eap_project_volumes" + ) DYNAMIC_SAMPLING_COUNTERS_FETCH_PROJECTS_WITH_COUNT_PER_TRANSACTION = ( "dynamic_sampling.counters.fetch_projects_with_count_per_transaction_volumes" ) diff --git a/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py b/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py index 893db57d782eb9..470b77a251fe4e 100644 --- a/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py +++ b/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py @@ -10,7 +10,9 @@ get_configuration, ) from sentry.dynamic_sampling.per_org.tasks.queries import ( + ProjectVolume, get_eap_organization_volume, + get_eap_project_volumes, run_eap_spans_table_query_in_chunks, ) from sentry.dynamic_sampling.tasks.common import OrganizationDataVolume @@ -79,7 +81,10 @@ def test_iterates_query_data_in_offset_chunks(self) -> None: class EAPOrganizationVolumeTest(TestCase, SnubaTestCase, SpanTestCase): - def get_config(self, organization: Organization) -> BaseDynamicSamplingConfiguration: + def get_config( + self, + organization: Organization, + ) -> BaseDynamicSamplingConfiguration: with patch( "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate", return_value=1.0, @@ -166,8 +171,108 @@ def test_get_eap_organization_volume_without_traffic(self) -> None: def test_get_eap_organization_volume_without_projects(self) -> None: organization = self.create_organization() - org_volume = get_eap_organization_volume( - self.get_config(organization), time_interval=timedelta(hours=1) - ) + with patch( + "sentry.dynamic_sampling.per_org.tasks.queries.Spans.run_table_query" + ) as run_table_query: + org_volume = get_eap_organization_volume( + self.get_config(organization), time_interval=timedelta(hours=1) + ) assert org_volume is None + run_table_query.assert_not_called() + + def test_get_eap_project_volumes_existing_org(self) -> None: + organization = self.create_organization() + project = self.create_project(organization=organization) + other_project = self.create_project(organization=organization) + other_organization = self.create_organization() + self.create_project(organization=other_organization) + + with patch( + "sentry.dynamic_sampling.per_org.tasks.queries.Spans.run_table_query", + return_value={ + "data": [ + { + "sentry.dsc.root_project": project.id, + "count()": 2, + "count_sample()": 2, + }, + { + "sentry.dsc.root_project": other_project.id, + "count()": 1, + "count_sample()": 1, + }, + ] + }, + ) as run_table_query: + project_volumes = get_eap_project_volumes( + self.get_config(organization), time_interval=timedelta(hours=1) + ) + + assert sorted(project_volumes) == [ + ProjectVolume(project_id=project.id, total=2, keep=2, drop=0), + ProjectVolume(project_id=other_project.id, total=1, keep=1, drop=0), + ] + run_table_query.assert_called_once() + assert sorted(run_table_query.call_args.kwargs["params"].projects, key=lambda p: p.id) == [ + project, + other_project, + ] + assert run_table_query.call_args.kwargs["query_string"] == "is_segment:true" + assert run_table_query.call_args.kwargs["selected_columns"] == [ + "sentry.dsc.root_project", + "count()", + "count_sample()", + ] + assert run_table_query.call_args.kwargs["orderby"] == ["sentry.dsc.root_project"] + assert ( + run_table_query.call_args.kwargs["referrer"] + == Referrer.DYNAMIC_SAMPLING_PER_ORG_GET_EAP_PROJECT_VOLUMES.value + ) + + def test_get_eap_project_volumes_without_traffic(self) -> None: + organization = self.create_organization() + self.create_project(organization=organization) + + with patch( + "sentry.dynamic_sampling.per_org.tasks.queries.Spans.run_table_query", + return_value={"data": []}, + ): + project_volumes = get_eap_project_volumes( + self.get_config(organization), time_interval=timedelta(hours=1) + ) + + assert project_volumes == [] + + def test_get_eap_project_volumes_handles_none_aggregate_values(self) -> None: + organization = self.create_organization() + project = self.create_project(organization=organization) + + with patch( + "sentry.dynamic_sampling.per_org.tasks.queries.Spans.run_table_query", + return_value={ + "data": [ + { + "sentry.dsc.root_project": project.id, + "count()": None, + "count_sample()": None, + } + ] + }, + ): + project_volumes = get_eap_project_volumes(self.get_config(organization)) + + assert project_volumes == [ProjectVolume(project_id=project.id, total=0, keep=0, drop=0)] + + def test_get_eap_project_volumes_without_projects(self) -> None: + organization = self.create_organization() + + with patch( + "sentry.dynamic_sampling.per_org.tasks.queries.Spans.run_table_query" + ) as run_table_query: + project_volumes = get_eap_project_volumes( + self.get_config(organization), time_interval=timedelta(hours=1) + ) + + assert project_volumes == [] + run_table_query.assert_not_called() diff --git a/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py b/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py index 79aeede4264e65..ed58d41e5aa8e9 100644 --- a/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py +++ b/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py @@ -1,6 +1,6 @@ from __future__ import annotations -from unittest.mock import patch +from unittest.mock import Mock, patch from django.core.exceptions import ObjectDoesNotExist @@ -15,17 +15,22 @@ from sentry.dynamic_sampling.per_org.tasks.telemetry import TelemetryStatus from sentry.dynamic_sampling.rules.utils import get_redis_client_for_ds from sentry.dynamic_sampling.tasks.common import OrganizationDataVolume +from sentry.dynamic_sampling.types import DynamicSamplingMode from sentry.models.organization import OrganizationStatus from sentry.testutils.cases import TestCase from sentry.testutils.helpers.options import override_options from sentry.testutils.helpers.task_runner import BurstTaskRunner -def _assert_called_once_with_config(mock, organization_id: int) -> None: +def _assert_called_once_with_config( + mock: Mock, + organization_id: int, +) -> BaseDynamicSamplingConfiguration: mock.assert_called_once() config = mock.call_args.args[0] assert isinstance(config, BaseDynamicSamplingConfiguration) assert config.organization.id == organization_id + return config def _drain_dispatched_org_ids(burst) -> list[int]: @@ -131,16 +136,21 @@ def test_run_calculations_per_org_returns_no_volume_without_traffic(self) -> Non patch( "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate", return_value=1.0, - ), + ) as get_blended_sample_rate, patch( "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_organization_volume", return_value=None, ) as get_volume, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_project_volumes" + ) as get_project_volumes, ): result = run_calculations_per_org_task(org.id) - assert result == TelemetryStatus.NO_VOLUME + assert result == TelemetryStatus.NO_ORG_VOLUME _assert_called_once_with_config(get_volume, org.id) + get_blended_sample_rate.assert_called_once_with(organization_id=org.id) + get_project_volumes.assert_not_called() @override_options({"dynamic-sampling.per_org.rollout-rate": 1.0}) def test_run_calculations_per_org_continues_with_traffic(self) -> None: @@ -151,26 +161,164 @@ def test_run_calculations_per_org_continues_with_traffic(self) -> None: patch( "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate", return_value=1.0, - ), + ) as get_blended_sample_rate, patch( "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_organization_volume", return_value=org_volume, ) as get_volume, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_project_volumes", + return_value=[(1, 100, 25, 75)], + ) as get_project_volumes, ): result = run_calculations_per_org_task(org.id) assert result is None _assert_called_once_with_config(get_volume, org.id) + get_blended_sample_rate.assert_called_once_with(organization_id=org.id) + _assert_called_once_with_config(get_project_volumes, org.id) @override_options({"dynamic-sampling.per_org.rollout-rate": 1.0}) - def test_run_calculations_per_org_skips_org_without_dynamic_sampling(self) -> None: + def test_run_calculations_per_org_returns_no_volume_without_project_volumes(self) -> None: + org = self.create_organization() + org_volume = OrganizationDataVolume(org_id=org.id, total=100, indexed=25) + + with ( + patch( + "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate", + return_value=1.0, + ) as get_blended_sample_rate, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_organization_volume", + return_value=org_volume, + ) as get_volume, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_project_volumes", + return_value=[], + ) as get_project_volumes, + ): + result = run_calculations_per_org_task(org.id) + + assert result == TelemetryStatus.NO_PROJECT_VOLUMES + get_blended_sample_rate.assert_called_once_with(organization_id=org.id) + _assert_called_once_with_config(get_volume, org.id) + _assert_called_once_with_config(get_project_volumes, org.id) + + @override_options({"dynamic-sampling.per_org.rollout-rate": 1.0}) + def test_run_calculations_per_org_skips_project_balancing_for_project_mode(self) -> None: + org = self.create_organization() + project = self.create_project(organization=org) + org.update_option("sentry:sampling_mode", DynamicSamplingMode.PROJECT) + project.update_option("sentry:target_sample_rate", 0.2) + org_volume = OrganizationDataVolume(org_id=org.id, total=100, indexed=25) + + with ( + self.feature("organizations:dynamic-sampling-custom"), + patch( + "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate" + ) as get_blended_sample_rate, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_organization_volume", + return_value=org_volume, + ) as get_volume, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_project_volumes", + ) as get_project_volumes, + ): + result = run_calculations_per_org_task(org.id) + + assert result is None + get_blended_sample_rate.assert_not_called() + _assert_called_once_with_config(get_volume, org.id) + get_project_volumes.assert_not_called() + + @override_options({"dynamic-sampling.per_org.rollout-rate": 1.0}) + def test_run_calculations_per_org_queries_projects_for_am3_org_mode(self) -> None: + org = self.create_organization() + org.update_option("sentry:sampling_mode", DynamicSamplingMode.ORGANIZATION) + org_volume = OrganizationDataVolume(org_id=org.id, total=100, indexed=25) + + with ( + self.feature("organizations:dynamic-sampling-custom"), + patch( + "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate" + ) as get_blended_sample_rate, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_organization_volume", + return_value=org_volume, + ) as get_volume, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_project_volumes", + return_value=[(1, 100, 25, 75)], + ) as get_project_volumes, + ): + result = run_calculations_per_org_task(org.id) + + assert result is None + get_blended_sample_rate.assert_not_called() + _assert_called_once_with_config(get_volume, org.id) + _assert_called_once_with_config(get_project_volumes, org.id) + + @override_options({"dynamic-sampling.per_org.rollout-rate": 1.0}) + def test_run_calculations_per_org_skips_project_mode_without_project_rates(self) -> None: + org = self.create_organization() + self.create_project(organization=org) + org.update_option("sentry:sampling_mode", DynamicSamplingMode.PROJECT) + + with ( + self.feature("organizations:dynamic-sampling-custom"), + patch( + "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate" + ) as get_blended_sample_rate, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_organization_volume" + ) as get_volume, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_project_volumes" + ) as get_project_volumes, + ): + result = run_calculations_per_org_task(org.id) + + assert result == TelemetryStatus.ORG_HAS_NO_DYNAMIC_SAMPLING + get_blended_sample_rate.assert_not_called() + get_volume.assert_not_called() + get_project_volumes.assert_not_called() + + @override_options({"dynamic-sampling.per_org.rollout-rate": 1.0}) + def test_run_calculations_per_org_queries_projects_for_am2(self) -> None: + org = self.create_organization() + org_volume = OrganizationDataVolume(org_id=org.id, total=100, indexed=25) + + with ( + patch( + "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate", + return_value=1.0, + ) as get_blended_sample_rate, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_organization_volume", + return_value=org_volume, + ) as get_volume, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_project_volumes", + return_value=[(1, 100, 25, 75)], + ) as get_project_volumes, + ): + result = run_calculations_per_org_task(org.id) + + assert result is None + get_blended_sample_rate.assert_called_once_with(organization_id=org.id) + _assert_called_once_with_config(get_volume, org.id) + _assert_called_once_with_config(get_project_volumes, org.id) + + @override_options({"dynamic-sampling.per_org.rollout-rate": 1.0}) + def test_run_calculations_per_org_skips_org_without_transaction_sample_rate(self) -> None: org = self.create_organization() with ( patch( "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate", return_value=None, - ), + ) as get_blended_sample_rate, patch( "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_organization_volume" ) as get_volume, @@ -178,6 +326,7 @@ def test_run_calculations_per_org_skips_org_without_dynamic_sampling(self) -> No result = run_calculations_per_org_task(org.id) assert result == TelemetryStatus.ORG_HAS_NO_DYNAMIC_SAMPLING + get_blended_sample_rate.assert_called_once_with(organization_id=org.id) get_volume.assert_not_called() @override_options({"dynamic-sampling.per_org.rollout-rate": 1.0})