From 267ef0755afec60359c22dfa4cbb20adc0cf0503 Mon Sep 17 00:00:00 2001 From: Simon Hellmayr Date: Tue, 5 May 2026 10:21:40 +0200 Subject: [PATCH 01/15] feat(dynamic-sampling): add per-org project volume queries --- .../dynamic_sampling/per_org/tasks/queries.py | 75 +++++++- .../per_org/tasks/scheduler.py | 9 +- src/sentry/snuba/referrer.py | 3 + .../per_org/tasks/test_queries.py | 126 +++++++++++- .../per_org/tasks/test_scheduler.py | 181 +++++++++++++++++- 5 files changed, 376 insertions(+), 18 deletions(-) diff --git a/src/sentry/dynamic_sampling/per_org/tasks/queries.py b/src/sentry/dynamic_sampling/per_org/tasks/queries.py index a5a69d684f8a..e1bdec1d1e18 100644 --- a/src/sentry/dynamic_sampling/per_org/tasks/queries.py +++ b/src/sentry/dynamic_sampling/per_org/tasks/queries.py @@ -8,10 +8,13 @@ from sentry.constants import ObjectStatus from sentry.dynamic_sampling.per_org.tasks.configuration import BaseDynamicSamplingConfiguration +from sentry.dynamic_sampling.rules.utils import DecisionDropCount, DecisionKeepCount, ProjectId from sentry.dynamic_sampling.tasks.common import ( ACTIVE_ORGS_VOLUMES_DEFAULT_TIME_INTERVAL, OrganizationDataVolume, ) +from sentry.dynamic_sampling.tasks.constants import CHUNK_SIZE +from sentry.dynamic_sampling.types import SamplingMeasure from sentry.models.project import Project from sentry.search.eap.constants import SAMPLING_MODE_HIGHEST_ACCURACY from sentry.search.eap.types import SearchResolverConfig @@ -19,9 +22,17 @@ from sentry.snuba.referrer import Referrer from sentry.snuba.spans_rpc import Spans +ProjectVolumes = tuple[ProjectId, int, DecisionKeepCount, DecisionDropCount] + +EAP_ORGANIZATION_VOLUME_QUERY_STRINGS = { + SamplingMeasure.SEGMENTS: "is_transaction:true", + SamplingMeasure.SPANS: "", +} + def _get_aggregate_int(row: Mapping[str, Any], column: str) -> int: - return int(row.get(column, 0)) + value = row.get(column) + return int(value) if value is not None else 0 def run_eap_spans_table_query_in_chunks( @@ -51,8 +62,9 @@ def get_eap_organization_volume( config: BaseDynamicSamplingConfiguration, time_interval: timedelta = ACTIVE_ORGS_VOLUMES_DEFAULT_TIME_INTERVAL, ) -> OrganizationDataVolume | None: + organization = config.organization projects = list( - Project.objects.filter(organization_id=config.organization.id, status=ObjectStatus.ACTIVE) + Project.objects.filter(organization_id=organization.id, status=ObjectStatus.ACTIVE) ) if not projects: return None @@ -64,9 +76,9 @@ def get_eap_organization_volume( start=start_time, end=end_time, projects=projects, - organization=config.organization, + organization=organization, ), - query_string="is_transaction:true", + query_string=EAP_ORGANIZATION_VOLUME_QUERY_STRINGS[config.measure], selected_columns=["count()", "count_sample()"], orderby=None, offset=0, @@ -89,4 +101,57 @@ def get_eap_organization_volume( return None indexed = _get_aggregate_int(row, "count_sample()") - return OrganizationDataVolume(org_id=config.organization.id, total=total, indexed=indexed) + return OrganizationDataVolume(org_id=organization.id, total=total, indexed=indexed) + + +def get_eap_project_volumes( + config: BaseDynamicSamplingConfiguration, + time_interval: timedelta = ACTIVE_ORGS_VOLUMES_DEFAULT_TIME_INTERVAL, +) -> list[ProjectVolumes]: + organization = config.organization + projects = list( + Project.objects.filter(organization_id=organization.id, status=ObjectStatus.ACTIVE) + ) + if not projects: + return [] + + end_time = datetime.now(UTC) + start_time = end_time - time_interval + offset = 0 + project_volumes: list[ProjectVolumes] = [] + more_results = True + + while more_results: + result = Spans.run_table_query( + params=SnubaParams( + start=start_time, + end=end_time, + projects=projects, + organization=organization, + ), + query_string=EAP_ORGANIZATION_VOLUME_QUERY_STRINGS[config.measure], + selected_columns=["project.id", "count()", "count_sample()"], + orderby=["project.id"], + offset=offset, + limit=CHUNK_SIZE + 1, + referrer=Referrer.DYNAMIC_SAMPLING_PER_ORG_GET_EAP_PROJECT_VOLUMES.value, + config=SearchResolverConfig( + auto_fields=True, + extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_SERVER_ONLY, + ), + sampling_mode=SAMPLING_MODE_HIGHEST_ACCURACY, + ) + + data = result.get("data", []) + more_results = len(data) > CHUNK_SIZE + offset += CHUNK_SIZE + if more_results: + data = data[:-1] + + for row in data: + total = int(row["count()"]) + keep = int(row["count_sample()"]) + drop = max(total - keep, 0) + project_volumes.append((ProjectId(row["project.id"]), total, keep, drop)) + + return project_volumes diff --git a/src/sentry/dynamic_sampling/per_org/tasks/scheduler.py b/src/sentry/dynamic_sampling/per_org/tasks/scheduler.py index 0c6e77c563ba..c2b8d7048559 100644 --- a/src/sentry/dynamic_sampling/per_org/tasks/scheduler.py +++ b/src/sentry/dynamic_sampling/per_org/tasks/scheduler.py @@ -10,7 +10,10 @@ from sentry.dynamic_sampling.per_org.tasks.configuration import get_configuration from sentry.dynamic_sampling.per_org.tasks.gate import is_org_in_rollout -from sentry.dynamic_sampling.per_org.tasks.queries import get_eap_organization_volume +from sentry.dynamic_sampling.per_org.tasks.queries import ( + get_eap_organization_volume, + get_eap_project_volumes, +) from sentry.dynamic_sampling.per_org.tasks.telemetry import ( SCHEDULER_BUCKET_ORG_STATUS_METRIC, TelemetryStatus, @@ -105,4 +108,8 @@ def run_calculations_per_org_task(org_id: OrganizationId) -> TelemetryStatus | N if org_volume is None: return TelemetryStatus.NO_VOLUME + project_volumes = get_eap_project_volumes(config) + if not project_volumes: + return TelemetryStatus.NO_VOLUME + return None diff --git a/src/sentry/snuba/referrer.py b/src/sentry/snuba/referrer.py index 5da654dc682b..f1b04fc292ee 100644 --- a/src/sentry/snuba/referrer.py +++ b/src/sentry/snuba/referrer.py @@ -639,6 +639,9 @@ class Referrer(StrEnum): "dynamic_sampling.distribution.fetch_projects_with_count_per_root_total_volumes" ) DYNAMIC_SAMPLING_PER_ORG_GET_EAP_ORG_VOLUME = "dynamic_sampling.per_org.get_eap_org_volume" + DYNAMIC_SAMPLING_PER_ORG_GET_EAP_PROJECT_VOLUMES = ( + "dynamic_sampling.per_org.get_eap_project_volumes" + ) DYNAMIC_SAMPLING_COUNTERS_FETCH_PROJECTS_WITH_COUNT_PER_TRANSACTION = ( "dynamic_sampling.counters.fetch_projects_with_count_per_transaction_volumes" ) diff --git a/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py b/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py index 893db57d782e..3e0b9ae8a993 100644 --- a/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py +++ b/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py @@ -11,9 +11,11 @@ ) from sentry.dynamic_sampling.per_org.tasks.queries import ( get_eap_organization_volume, + get_eap_project_volumes, run_eap_spans_table_query_in_chunks, ) from sentry.dynamic_sampling.tasks.common import OrganizationDataVolume +from sentry.dynamic_sampling.types import SamplingMeasure from sentry.models.organization import Organization from sentry.search.eap.constants import SAMPLING_MODE_HIGHEST_ACCURACY from sentry.search.eap.types import SearchResolverConfig @@ -79,10 +81,24 @@ def test_iterates_query_data_in_offset_chunks(self) -> None: class EAPOrganizationVolumeTest(TestCase, SnubaTestCase, SpanTestCase): - def get_config(self, organization: Organization) -> BaseDynamicSamplingConfiguration: - with patch( - "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate", - return_value=1.0, + def get_config( + self, + organization: Organization, + measure: SamplingMeasure = SamplingMeasure.SEGMENTS, + ) -> BaseDynamicSamplingConfiguration: + option_values: dict[str, object] = {"dynamic-sampling.check_span_feature_flag": False} + if measure == SamplingMeasure.SPANS: + option_values = { + "dynamic-sampling.check_span_feature_flag": True, + "dynamic-sampling.measure.spans": [organization.id], + } + + with ( + self.options(option_values), + patch( + "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate", + return_value=1.0, + ), ): return get_configuration(organization.id) @@ -153,6 +169,23 @@ def test_get_eap_organization_volume_returns_raw_and_extrapolated_counts(self) - assert org_volume == OrganizationDataVolume(org_id=organization.id, total=10, indexed=1) + def test_get_eap_organization_volume_with_spans_measure(self) -> None: + organization = self.create_organization() + self.create_project(organization=organization) + + with patch( + "sentry.dynamic_sampling.per_org.tasks.queries.Spans.run_table_query", + return_value={"data": [{"count()": 2, "count_sample()": 2}]}, + ) as run_table_query: + org_volume = get_eap_organization_volume( + self.get_config(organization, SamplingMeasure.SPANS), + time_interval=timedelta(hours=1), + ) + + assert org_volume == OrganizationDataVolume(org_id=organization.id, total=2, indexed=2) + run_table_query.assert_called_once() + assert run_table_query.call_args.kwargs["query_string"] == "" + def test_get_eap_organization_volume_without_traffic(self) -> None: organization = self.create_organization() self.create_project(organization=organization) @@ -166,8 +199,87 @@ def test_get_eap_organization_volume_without_traffic(self) -> None: def test_get_eap_organization_volume_without_projects(self) -> None: organization = self.create_organization() - org_volume = get_eap_organization_volume( - self.get_config(organization), time_interval=timedelta(hours=1) - ) + with patch( + "sentry.dynamic_sampling.per_org.tasks.queries.Spans.run_table_query" + ) as run_table_query: + org_volume = get_eap_organization_volume( + self.get_config(organization), time_interval=timedelta(hours=1) + ) assert org_volume is None + run_table_query.assert_not_called() + + def test_get_eap_project_volumes_existing_org(self) -> None: + organization = self.create_organization() + project = self.create_project(organization=organization) + other_project = self.create_project(organization=organization) + other_organization = self.create_organization() + self.create_project(organization=other_organization) + + with patch( + "sentry.dynamic_sampling.per_org.tasks.queries.Spans.run_table_query", + return_value={ + "data": [ + {"project.id": project.id, "count()": 2, "count_sample()": 2}, + {"project.id": other_project.id, "count()": 1, "count_sample()": 1}, + ] + }, + ) as run_table_query: + project_volumes = get_eap_project_volumes( + self.get_config(organization), time_interval=timedelta(hours=1) + ) + + assert sorted(project_volumes) == [ + (project.id, 2, 2, 0), + (other_project.id, 1, 1, 0), + ] + run_table_query.assert_called_once() + assert sorted(run_table_query.call_args.kwargs["params"].projects, key=lambda p: p.id) == [ + project, + other_project, + ] + assert run_table_query.call_args.kwargs["query_string"] == "is_transaction:true" + + def test_get_eap_project_volumes_with_spans_measure(self) -> None: + organization = self.create_organization() + self.create_project(organization=organization) + + with patch( + "sentry.dynamic_sampling.per_org.tasks.queries.Spans.run_table_query", + return_value={"data": []}, + ) as run_table_query: + project_volumes = get_eap_project_volumes( + self.get_config(organization, SamplingMeasure.SPANS), + time_interval=timedelta(hours=1), + ) + + assert project_volumes == [] + run_table_query.assert_called_once() + assert run_table_query.call_args.kwargs["query_string"] == "" + + def test_get_eap_project_volumes_without_traffic(self) -> None: + organization = self.create_organization() + self.create_project(organization=organization) + + with patch( + "sentry.dynamic_sampling.per_org.tasks.queries.Spans.run_table_query", + return_value={"data": []}, + ): + project_volumes = get_eap_project_volumes( + self.get_config(organization), time_interval=timedelta(hours=1) + ) + + assert project_volumes == [] + + def test_get_eap_project_volumes_without_projects(self) -> None: + organization = self.create_organization() + + with patch( + "sentry.dynamic_sampling.per_org.tasks.queries.Spans.run_table_query" + ) as run_table_query: + project_volumes = get_eap_project_volumes( + self.get_config(organization), time_interval=timedelta(hours=1) + ) + + assert project_volumes == [] + run_table_query.assert_not_called() diff --git a/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py b/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py index 79aeede4264e..67181b72a071 100644 --- a/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py +++ b/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py @@ -1,5 +1,7 @@ from __future__ import annotations +from collections.abc import Callable +from typing import NamedTuple from unittest.mock import patch from django.core.exceptions import ObjectDoesNotExist @@ -15,17 +17,48 @@ from sentry.dynamic_sampling.per_org.tasks.telemetry import TelemetryStatus from sentry.dynamic_sampling.rules.utils import get_redis_client_for_ds from sentry.dynamic_sampling.tasks.common import OrganizationDataVolume +from sentry.dynamic_sampling.types import DynamicSamplingMode, SamplingMeasure from sentry.models.organization import OrganizationStatus from sentry.testutils.cases import TestCase from sentry.testutils.helpers.options import override_options from sentry.testutils.helpers.task_runner import BurstTaskRunner +SpanOrgIds = Callable[[int], list[int]] -def _assert_called_once_with_config(mock, organization_id: int) -> None: + +class MeasureOptionCase(NamedTuple): + name: str + check_span_feature_flag: bool + span_org_ids: SpanOrgIds + expected_measure: SamplingMeasure + + +def _include_org_id(org_id: int) -> list[int]: + return [org_id] + + +def _exclude_org_id(org_id: int) -> list[int]: + return [] + + +MEASURE_OPTION_CASES = ( + MeasureOptionCase("span-option-disabled", False, _include_org_id, SamplingMeasure.SEGMENTS), + MeasureOptionCase("org-not-in-span-option", True, _exclude_org_id, SamplingMeasure.SEGMENTS), + MeasureOptionCase("org-in-span-option", True, _include_org_id, SamplingMeasure.SPANS), +) + + +def _assert_called_once_with_config( + mock, + organization_id: int, + measure: SamplingMeasure = SamplingMeasure.SEGMENTS, +) -> BaseDynamicSamplingConfiguration: mock.assert_called_once() config = mock.call_args.args[0] assert isinstance(config, BaseDynamicSamplingConfiguration) assert config.organization.id == organization_id + assert config.measure == measure + return config def _drain_dispatched_org_ids(burst) -> list[int]: @@ -131,16 +164,21 @@ def test_run_calculations_per_org_returns_no_volume_without_traffic(self) -> Non patch( "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate", return_value=1.0, - ), + ) as get_blended_sample_rate, patch( "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_organization_volume", return_value=None, ) as get_volume, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_project_volumes" + ) as get_project_volumes, ): result = run_calculations_per_org_task(org.id) assert result == TelemetryStatus.NO_VOLUME _assert_called_once_with_config(get_volume, org.id) + get_blended_sample_rate.assert_called_once_with(organization_id=org.id) + get_project_volumes.assert_not_called() @override_options({"dynamic-sampling.per_org.rollout-rate": 1.0}) def test_run_calculations_per_org_continues_with_traffic(self) -> None: @@ -151,26 +189,158 @@ def test_run_calculations_per_org_continues_with_traffic(self) -> None: patch( "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate", return_value=1.0, - ), + ) as get_blended_sample_rate, patch( "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_organization_volume", return_value=org_volume, ) as get_volume, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_project_volumes", + return_value=[(1, 100, 25, 75)], + ) as get_project_volumes, ): result = run_calculations_per_org_task(org.id) assert result is None _assert_called_once_with_config(get_volume, org.id) + get_blended_sample_rate.assert_called_once_with(organization_id=org.id) + _assert_called_once_with_config(get_project_volumes, org.id) @override_options({"dynamic-sampling.per_org.rollout-rate": 1.0}) - def test_run_calculations_per_org_skips_org_without_dynamic_sampling(self) -> None: + def test_run_calculations_per_org_returns_no_volume_without_project_volumes(self) -> None: + org = self.create_organization() + org_volume = OrganizationDataVolume(org_id=org.id, total=100, indexed=25) + + with ( + patch( + "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate", + return_value=1.0, + ) as get_blended_sample_rate, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_organization_volume", + return_value=org_volume, + ) as get_volume, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_project_volumes", + return_value=[], + ) as get_project_volumes, + ): + result = run_calculations_per_org_task(org.id) + + assert result == TelemetryStatus.NO_VOLUME + get_blended_sample_rate.assert_called_once_with(organization_id=org.id) + _assert_called_once_with_config(get_volume, org.id) + _assert_called_once_with_config(get_project_volumes, org.id) + + @override_options({"dynamic-sampling.per_org.rollout-rate": 1.0}) + def test_run_calculations_per_org_uses_measure_options_for_project_mode(self) -> None: + for measure_case in MEASURE_OPTION_CASES: + with self.subTest(measure_case=measure_case.name): + org = self.create_organization() + project = self.create_project(organization=org) + org.update_option("sentry:sampling_mode", DynamicSamplingMode.PROJECT) + project.update_option("sentry:target_sample_rate", 0.2) + org_volume = OrganizationDataVolume(org_id=org.id, total=100, indexed=25) + + with ( + self.feature("organizations:dynamic-sampling-custom"), + self.options( + { + "dynamic-sampling.check_span_feature_flag": measure_case.check_span_feature_flag, + "dynamic-sampling.measure.spans": measure_case.span_org_ids(org.id), + } + ), + patch( + "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate" + ) as get_blended_sample_rate, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_organization_volume", + return_value=org_volume, + ) as get_volume, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_project_volumes", + return_value=[(project.id, 100, 25, 75)], + ) as get_project_volumes, + ): + result = run_calculations_per_org_task(org.id) + + assert result is None + get_blended_sample_rate.assert_not_called() + _assert_called_once_with_config(get_volume, org.id, measure_case.expected_measure) + _assert_called_once_with_config( + get_project_volumes, org.id, measure_case.expected_measure + ) + + @override_options({"dynamic-sampling.per_org.rollout-rate": 1.0}) + def test_run_calculations_per_org_skips_project_mode_without_project_rates(self) -> None: + org = self.create_organization() + self.create_project(organization=org) + org.update_option("sentry:sampling_mode", DynamicSamplingMode.PROJECT) + + with ( + self.feature("organizations:dynamic-sampling-custom"), + patch( + "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate" + ) as get_blended_sample_rate, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_organization_volume" + ) as get_volume, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_project_volumes" + ) as get_project_volumes, + ): + result = run_calculations_per_org_task(org.id) + + assert result == TelemetryStatus.ORG_HAS_NO_DYNAMIC_SAMPLING + get_blended_sample_rate.assert_not_called() + get_volume.assert_not_called() + get_project_volumes.assert_not_called() + + @override_options({"dynamic-sampling.per_org.rollout-rate": 1.0}) + def test_run_calculations_per_org_uses_measure_options_for_am2(self) -> None: + for measure_case in MEASURE_OPTION_CASES: + with self.subTest(measure_case=measure_case.name): + org = self.create_organization() + org_volume = OrganizationDataVolume(org_id=org.id, total=100, indexed=25) + + with ( + self.options( + { + "dynamic-sampling.check_span_feature_flag": measure_case.check_span_feature_flag, + "dynamic-sampling.measure.spans": measure_case.span_org_ids(org.id), + } + ), + patch( + "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate", + return_value=1.0, + ) as get_blended_sample_rate, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_organization_volume", + return_value=org_volume, + ) as get_volume, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_project_volumes", + return_value=[(1, 100, 25, 75)], + ) as get_project_volumes, + ): + result = run_calculations_per_org_task(org.id) + + assert result is None + get_blended_sample_rate.assert_called_once_with(organization_id=org.id) + _assert_called_once_with_config(get_volume, org.id, measure_case.expected_measure) + _assert_called_once_with_config( + get_project_volumes, org.id, measure_case.expected_measure + ) + + @override_options({"dynamic-sampling.per_org.rollout-rate": 1.0}) + def test_run_calculations_per_org_skips_org_without_transaction_sample_rate(self) -> None: org = self.create_organization() with ( patch( "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate", return_value=None, - ), + ) as get_blended_sample_rate, patch( "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_organization_volume" ) as get_volume, @@ -178,6 +348,7 @@ def test_run_calculations_per_org_skips_org_without_dynamic_sampling(self) -> No result = run_calculations_per_org_task(org.id) assert result == TelemetryStatus.ORG_HAS_NO_DYNAMIC_SAMPLING + get_blended_sample_rate.assert_called_once_with(organization_id=org.id) get_volume.assert_not_called() @override_options({"dynamic-sampling.per_org.rollout-rate": 1.0}) From cf66a331da739a1940504e945ec233ff9b9b6a1e Mon Sep 17 00:00:00 2001 From: Simon Hellmayr Date: Tue, 5 May 2026 11:08:13 +0200 Subject: [PATCH 02/15] use config in queriers --- src/sentry/dynamic_sampling/per_org/tasks/queries.py | 10 +++++----- .../dynamic_sampling/per_org/tasks/test_scheduler.py | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/sentry/dynamic_sampling/per_org/tasks/queries.py b/src/sentry/dynamic_sampling/per_org/tasks/queries.py index e1bdec1d1e18..002b1a09cdc2 100644 --- a/src/sentry/dynamic_sampling/per_org/tasks/queries.py +++ b/src/sentry/dynamic_sampling/per_org/tasks/queries.py @@ -64,7 +64,7 @@ def get_eap_organization_volume( ) -> OrganizationDataVolume | None: organization = config.organization projects = list( - Project.objects.filter(organization_id=organization.id, status=ObjectStatus.ACTIVE) + Project.objects.filter(organization_id=config.organization.id, status=ObjectStatus.ACTIVE) ) if not projects: return None @@ -76,7 +76,7 @@ def get_eap_organization_volume( start=start_time, end=end_time, projects=projects, - organization=organization, + organization=config.organization, ), query_string=EAP_ORGANIZATION_VOLUME_QUERY_STRINGS[config.measure], selected_columns=["count()", "count_sample()"], @@ -101,7 +101,7 @@ def get_eap_organization_volume( return None indexed = _get_aggregate_int(row, "count_sample()") - return OrganizationDataVolume(org_id=organization.id, total=total, indexed=indexed) + return OrganizationDataVolume(org_id=config.organization.id, total=total, indexed=indexed) def get_eap_project_volumes( @@ -110,7 +110,7 @@ def get_eap_project_volumes( ) -> list[ProjectVolumes]: organization = config.organization projects = list( - Project.objects.filter(organization_id=organization.id, status=ObjectStatus.ACTIVE) + Project.objects.filter(organization_id=config.organization.id, status=ObjectStatus.ACTIVE) ) if not projects: return [] @@ -127,7 +127,7 @@ def get_eap_project_volumes( start=start_time, end=end_time, projects=projects, - organization=organization, + organization=config.organization, ), query_string=EAP_ORGANIZATION_VOLUME_QUERY_STRINGS[config.measure], selected_columns=["project.id", "count()", "count_sample()"], diff --git a/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py b/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py index 67181b72a071..9a3349da93d3 100644 --- a/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py +++ b/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py @@ -2,7 +2,7 @@ from collections.abc import Callable from typing import NamedTuple -from unittest.mock import patch +from unittest.mock import Mock, patch from django.core.exceptions import ObjectDoesNotExist @@ -49,7 +49,7 @@ def _exclude_org_id(org_id: int) -> list[int]: def _assert_called_once_with_config( - mock, + mock: Mock, organization_id: int, measure: SamplingMeasure = SamplingMeasure.SEGMENTS, ) -> BaseDynamicSamplingConfiguration: From 659b9f9ef183cf727971739919b53d0474f3c5e9 Mon Sep 17 00:00:00 2001 From: Simon Hellmayr Date: Tue, 5 May 2026 11:11:44 +0200 Subject: [PATCH 03/15] differentiate between org and project volumes for status --- src/sentry/dynamic_sampling/per_org/tasks/queries.py | 8 ++++++-- src/sentry/dynamic_sampling/per_org/tasks/scheduler.py | 4 ++-- src/sentry/dynamic_sampling/per_org/tasks/telemetry.py | 3 ++- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/src/sentry/dynamic_sampling/per_org/tasks/queries.py b/src/sentry/dynamic_sampling/per_org/tasks/queries.py index 002b1a09cdc2..035c753f321d 100644 --- a/src/sentry/dynamic_sampling/per_org/tasks/queries.py +++ b/src/sentry/dynamic_sampling/per_org/tasks/queries.py @@ -7,7 +7,11 @@ from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ExtrapolationMode from sentry.constants import ObjectStatus -from sentry.dynamic_sampling.per_org.tasks.configuration import BaseDynamicSamplingConfiguration +from sentry.dynamic_sampling.per_org.tasks.configuration import ( + AutomaticDynamicSamplingConfiguration, + BaseDynamicSamplingConfiguration, + CustomDynamicSamplingProjectConfiguration, +) from sentry.dynamic_sampling.rules.utils import DecisionDropCount, DecisionKeepCount, ProjectId from sentry.dynamic_sampling.tasks.common import ( ACTIVE_ORGS_VOLUMES_DEFAULT_TIME_INTERVAL, @@ -105,7 +109,7 @@ def get_eap_organization_volume( def get_eap_project_volumes( - config: BaseDynamicSamplingConfiguration, + config: AutomaticDynamicSamplingConfiguration | CustomDynamicSamplingProjectConfiguration, time_interval: timedelta = ACTIVE_ORGS_VOLUMES_DEFAULT_TIME_INTERVAL, ) -> list[ProjectVolumes]: organization = config.organization diff --git a/src/sentry/dynamic_sampling/per_org/tasks/scheduler.py b/src/sentry/dynamic_sampling/per_org/tasks/scheduler.py index c2b8d7048559..beca20adde43 100644 --- a/src/sentry/dynamic_sampling/per_org/tasks/scheduler.py +++ b/src/sentry/dynamic_sampling/per_org/tasks/scheduler.py @@ -106,10 +106,10 @@ def run_calculations_per_org_task(org_id: OrganizationId) -> TelemetryStatus | N org_volume = get_eap_organization_volume(config) if org_volume is None: - return TelemetryStatus.NO_VOLUME + return TelemetryStatus.NO_ORG_VOLUME project_volumes = get_eap_project_volumes(config) if not project_volumes: - return TelemetryStatus.NO_VOLUME + return TelemetryStatus.NO_PROJECT_VOLUMES return None diff --git a/src/sentry/dynamic_sampling/per_org/tasks/telemetry.py b/src/sentry/dynamic_sampling/per_org/tasks/telemetry.py index f5d2b377970b..722daf8db00c 100644 --- a/src/sentry/dynamic_sampling/per_org/tasks/telemetry.py +++ b/src/sentry/dynamic_sampling/per_org/tasks/telemetry.py @@ -32,7 +32,8 @@ class TelemetryStatus(StrEnum): FAILED = "failed" KILLSWITCHED = "killswitched" NO_SUBSCRIPTION = "no_subscription" - NO_VOLUME = "no_volume" + NO_ORG_VOLUME = "no_org_volume" + NO_PROJECT_VOLUMES = "no_project_volumes" NOT_IN_ROLLOUT = "not_in_rollout" ORG_HAS_NO_DYNAMIC_SAMPLING = "org_has_no_dynamic_sampling" ORG_NOT_FOUND = "org_not_found" From ebe9b3a614ad46c8b49f5ffaae95a55581ca77af Mon Sep 17 00:00:00 2001 From: Simon Hellmayr Date: Tue, 5 May 2026 14:07:36 +0200 Subject: [PATCH 04/15] ref --- .../dynamic_sampling/per_org/tasks/queries.py | 22 ++++++++----------- .../per_org/tasks/test_queries.py | 14 ++++++++++++ .../per_org/tasks/test_scheduler.py | 4 ++-- 3 files changed, 25 insertions(+), 15 deletions(-) diff --git a/src/sentry/dynamic_sampling/per_org/tasks/queries.py b/src/sentry/dynamic_sampling/per_org/tasks/queries.py index 035c753f321d..9da8532c4d5e 100644 --- a/src/sentry/dynamic_sampling/per_org/tasks/queries.py +++ b/src/sentry/dynamic_sampling/per_org/tasks/queries.py @@ -7,11 +7,7 @@ from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ExtrapolationMode from sentry.constants import ObjectStatus -from sentry.dynamic_sampling.per_org.tasks.configuration import ( - AutomaticDynamicSamplingConfiguration, - BaseDynamicSamplingConfiguration, - CustomDynamicSamplingProjectConfiguration, -) +from sentry.dynamic_sampling.per_org.tasks.configuration import BaseDynamicSamplingConfiguration from sentry.dynamic_sampling.rules.utils import DecisionDropCount, DecisionKeepCount, ProjectId from sentry.dynamic_sampling.tasks.common import ( ACTIVE_ORGS_VOLUMES_DEFAULT_TIME_INTERVAL, @@ -68,7 +64,7 @@ def get_eap_organization_volume( ) -> OrganizationDataVolume | None: organization = config.organization projects = list( - Project.objects.filter(organization_id=config.organization.id, status=ObjectStatus.ACTIVE) + Project.objects.filter(organization_id=organization.id, status=ObjectStatus.ACTIVE) ) if not projects: return None @@ -80,7 +76,7 @@ def get_eap_organization_volume( start=start_time, end=end_time, projects=projects, - organization=config.organization, + organization=organization, ), query_string=EAP_ORGANIZATION_VOLUME_QUERY_STRINGS[config.measure], selected_columns=["count()", "count_sample()"], @@ -105,16 +101,16 @@ def get_eap_organization_volume( return None indexed = _get_aggregate_int(row, "count_sample()") - return OrganizationDataVolume(org_id=config.organization.id, total=total, indexed=indexed) + return OrganizationDataVolume(org_id=organization.id, total=total, indexed=indexed) def get_eap_project_volumes( - config: AutomaticDynamicSamplingConfiguration | CustomDynamicSamplingProjectConfiguration, + config: BaseDynamicSamplingConfiguration, time_interval: timedelta = ACTIVE_ORGS_VOLUMES_DEFAULT_TIME_INTERVAL, ) -> list[ProjectVolumes]: organization = config.organization projects = list( - Project.objects.filter(organization_id=config.organization.id, status=ObjectStatus.ACTIVE) + Project.objects.filter(organization_id=organization.id, status=ObjectStatus.ACTIVE) ) if not projects: return [] @@ -131,7 +127,7 @@ def get_eap_project_volumes( start=start_time, end=end_time, projects=projects, - organization=config.organization, + organization=organization, ), query_string=EAP_ORGANIZATION_VOLUME_QUERY_STRINGS[config.measure], selected_columns=["project.id", "count()", "count_sample()"], @@ -153,8 +149,8 @@ def get_eap_project_volumes( data = data[:-1] for row in data: - total = int(row["count()"]) - keep = int(row["count_sample()"]) + total = _get_aggregate_int(row, "count()") + keep = _get_aggregate_int(row, "count_sample()") drop = max(total - keep, 0) project_volumes.append((ProjectId(row["project.id"]), total, keep, drop)) diff --git a/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py b/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py index 3e0b9ae8a993..dba5bc1c9974 100644 --- a/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py +++ b/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py @@ -271,6 +271,20 @@ def test_get_eap_project_volumes_without_traffic(self) -> None: assert project_volumes == [] + def test_get_eap_project_volumes_handles_none_aggregate_values(self) -> None: + organization = self.create_organization() + project = self.create_project(organization=organization) + + with patch( + "sentry.dynamic_sampling.per_org.tasks.queries.Spans.run_table_query", + return_value={ + "data": [{"project.id": project.id, "count()": None, "count_sample()": None}] + }, + ): + project_volumes = get_eap_project_volumes(self.get_config(organization)) + + assert project_volumes == [(project.id, 0, 0, 0)] + def test_get_eap_project_volumes_without_projects(self) -> None: organization = self.create_organization() diff --git a/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py b/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py index 9a3349da93d3..00c85bff85d0 100644 --- a/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py +++ b/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py @@ -175,7 +175,7 @@ def test_run_calculations_per_org_returns_no_volume_without_traffic(self) -> Non ): result = run_calculations_per_org_task(org.id) - assert result == TelemetryStatus.NO_VOLUME + assert result == TelemetryStatus.NO_ORG_VOLUME _assert_called_once_with_config(get_volume, org.id) get_blended_sample_rate.assert_called_once_with(organization_id=org.id) get_project_volumes.assert_not_called() @@ -227,7 +227,7 @@ def test_run_calculations_per_org_returns_no_volume_without_project_volumes(self ): result = run_calculations_per_org_task(org.id) - assert result == TelemetryStatus.NO_VOLUME + assert result == TelemetryStatus.NO_PROJECT_VOLUMES get_blended_sample_rate.assert_called_once_with(organization_id=org.id) _assert_called_once_with_config(get_volume, org.id) _assert_called_once_with_config(get_project_volumes, org.id) From fa4b96cc98f879fd048d96926fbaec111a935d49 Mon Sep 17 00:00:00 2001 From: Simon Hellmayr Date: Tue, 5 May 2026 14:18:59 +0200 Subject: [PATCH 05/15] fix interval --- src/sentry/dynamic_sampling/per_org/tasks/queries.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/sentry/dynamic_sampling/per_org/tasks/queries.py b/src/sentry/dynamic_sampling/per_org/tasks/queries.py index 9da8532c4d5e..82dc70067ead 100644 --- a/src/sentry/dynamic_sampling/per_org/tasks/queries.py +++ b/src/sentry/dynamic_sampling/per_org/tasks/queries.py @@ -13,7 +13,9 @@ ACTIVE_ORGS_VOLUMES_DEFAULT_TIME_INTERVAL, OrganizationDataVolume, ) -from sentry.dynamic_sampling.tasks.constants import CHUNK_SIZE +from sentry.dynamic_sampling.tasks.constants import ( + CHUNK_SIZE, +) from sentry.dynamic_sampling.types import SamplingMeasure from sentry.models.project import Project from sentry.search.eap.constants import SAMPLING_MODE_HIGHEST_ACCURACY @@ -106,7 +108,7 @@ def get_eap_organization_volume( def get_eap_project_volumes( config: BaseDynamicSamplingConfiguration, - time_interval: timedelta = ACTIVE_ORGS_VOLUMES_DEFAULT_TIME_INTERVAL, + time_interval: timedelta = timedelta(hours=1), ) -> list[ProjectVolumes]: organization = config.organization projects = list( From 3a66d3d0311b0211225990c873274eed49e57bde Mon Sep 17 00:00:00 2001 From: Simon Hellmayr Date: Thu, 7 May 2026 14:46:16 +0200 Subject: [PATCH 06/15] don't do that --- .../dynamic_sampling/per_org/tasks/queries.py | 3 +- tools/eap_org_volume_rpc_body.py | 236 ++++++++++++++++++ tools/eap_rpc_specs/get_eap_org_volume.json | 9 + .../get_eap_project_volumes.json | 10 + .../get_eap_transaction_volumes.json | 10 + 5 files changed, 266 insertions(+), 2 deletions(-) create mode 100644 tools/eap_org_volume_rpc_body.py create mode 100644 tools/eap_rpc_specs/get_eap_org_volume.json create mode 100644 tools/eap_rpc_specs/get_eap_project_volumes.json create mode 100644 tools/eap_rpc_specs/get_eap_transaction_volumes.json diff --git a/src/sentry/dynamic_sampling/per_org/tasks/queries.py b/src/sentry/dynamic_sampling/per_org/tasks/queries.py index 82dc70067ead..e58350254d70 100644 --- a/src/sentry/dynamic_sampling/per_org/tasks/queries.py +++ b/src/sentry/dynamic_sampling/per_org/tasks/queries.py @@ -33,8 +33,7 @@ def _get_aggregate_int(row: Mapping[str, Any], column: str) -> int: - value = row.get(column) - return int(value) if value is not None else 0 + return int(row.get(column, 0)) def run_eap_spans_table_query_in_chunks( diff --git a/tools/eap_org_volume_rpc_body.py b/tools/eap_org_volume_rpc_body.py new file mode 100644 index 000000000000..7ff89d2651c2 --- /dev/null +++ b/tools/eap_org_volume_rpc_body.py @@ -0,0 +1,236 @@ +from __future__ import annotations + +import argparse +import json +from collections.abc import Sequence +from datetime import UTC, datetime, timedelta +from pathlib import Path +from typing import Any + +from google.protobuf.json_format import MessageToJson, ParseDict +from google.protobuf.timestamp_pb2 import Timestamp +from sentry_protos.snuba.v1.downsampled_storage_pb2 import DownsampledStorageConfig +from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import Column, TraceItemTableRequest +from sentry_protos.snuba.v1.request_common_pb2 import PageToken, RequestMeta, TraceItemType +from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( + AttributeAggregation, + AttributeKey, + AttributeValue, + ExtrapolationMode, + Function, +) +from sentry_protos.snuba.v1.trace_item_filter_pb2 import ComparisonFilter, TraceItemFilter + +DEFAULT_REFERRER = "dynamic_sampling.per_org.get_eap_org_volume" +DEFAULT_SPEC_PATH = Path(__file__).with_name("eap_rpc_specs") / "get_eap_org_volume.json" + + +def get_attribute_key(column: str) -> AttributeKey: + if column in {"project.id", "project_id"}: + return AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.project_id") + if column == "transaction": + return AttributeKey(type=AttributeKey.TYPE_STRING, name="sentry.transaction") + if column == "is_transaction": + return AttributeKey(type=AttributeKey.TYPE_BOOLEAN, name="sentry.is_segment") + raise ValueError(f"Unsupported shortcut column: {column}") + + +def get_count_key(column: str) -> AttributeKey: + if column == "count()": + return AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.project_id") + if column == "count_sample()": + return AttributeKey(type=AttributeKey.TYPE_DOUBLE, name="sentry.duration_ms") + raise ValueError(f"Unsupported aggregate shortcut: {column}") + + +def get_extrapolation_mode(column: str) -> ExtrapolationMode.ValueType: + if column == "count_sample()": + return ExtrapolationMode.EXTRAPOLATION_MODE_NONE + return ExtrapolationMode.EXTRAPOLATION_MODE_SERVER_ONLY + + +def parse_datetime(value: str) -> datetime: + parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) + if parsed.tzinfo is None: + return parsed.replace(tzinfo=UTC) + return parsed.astimezone(UTC) + + +def to_timestamp(value: datetime) -> Timestamp: + timestamp = Timestamp() + timestamp.FromDatetime(value) + return timestamp + + +def resolve_time_window( + *, + start: datetime | None, + end: datetime | None, + last_hours: float, +) -> tuple[datetime, datetime]: + resolved_end = end or datetime.now(UTC) + resolved_start = start or resolved_end - timedelta(hours=last_hours) + return resolved_start, resolved_end + + +def load_spec(path: str) -> dict[str, Any]: + with open(path) as spec_file: + return json.load(spec_file) + + +def get_query_string(spec: dict[str, Any], measure: str) -> str: + if "query_string" in spec: + return spec["query_string"] + query_strings = spec.get("query_string_by_measure", {}) + return query_strings.get(measure, "") + + +def build_filter(spec: dict[str, Any], measure: str) -> TraceItemFilter | None: + if "filter" in spec: + return ParseDict(spec["filter"], TraceItemFilter()) + + query_string = get_query_string(spec, measure) + if not query_string: + return None + if query_string in {"is_transaction:true", "is_transaction:1"}: + return TraceItemFilter( + comparison_filter=ComparisonFilter( + key=get_attribute_key("is_transaction"), + op=ComparisonFilter.OP_EQUALS, + value=AttributeValue(val_bool=True), + ) + ) + raise ValueError(f"Unsupported query string shortcut: {query_string}") + + +def build_column(column: str) -> Column: + if column in {"count()", "count_sample()"}: + return Column( + aggregation=AttributeAggregation( + aggregate=Function.FUNCTION_COUNT, + key=get_count_key(column), + label=column, + extrapolation_mode=get_extrapolation_mode(column), + ) + ) + return Column(key=get_attribute_key(column), label=column) + + +def build_columns(spec: dict[str, Any]) -> list[Column]: + if "columns" in spec: + return [ParseDict(column, Column()) for column in spec["columns"]] + return [build_column(column) for column in spec.get("selected_columns", [])] + + +def build_order_by(spec: dict[str, Any]) -> list[TraceItemTableRequest.OrderBy]: + orderby = spec.get("orderby", spec.get("order_by")) + if not orderby: + return [] + if "order_by" in spec: + return [ + ParseDict(order_by, TraceItemTableRequest.OrderBy()) for order_by in spec["order_by"] + ] + return [ + TraceItemTableRequest.OrderBy( + column=build_column(column.lstrip("-")), + descending=column.startswith("-"), + ) + for column in orderby + ] + + +def build_group_by(spec: dict[str, Any]) -> list[AttributeKey]: + if "group_by" in spec: + return [ParseDict(group_by, AttributeKey()) for group_by in spec["group_by"]] + selected_columns = spec.get("selected_columns", []) + if not any(column in {"count()", "count_sample()"} for column in selected_columns): + return [] + return [ + get_attribute_key(column) + for column in selected_columns + if column not in {"count()", "count_sample()"} + ] + + +def build_request( + *, + spec: dict[str, Any], + measure: str, + organization_id: int, + project_ids: Sequence[int], + start: datetime, + end: datetime, + cogs_category: str, + referrer: str, + limit: int, + offset: int, +) -> TraceItemTableRequest: + meta = RequestMeta( + organization_id=organization_id, + project_ids=project_ids, + cogs_category=cogs_category, + referrer=referrer, + start_timestamp=to_timestamp(start), + end_timestamp=to_timestamp(end), + trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN, + downsampled_storage_config=DownsampledStorageConfig( + mode=DownsampledStorageConfig.MODE_HIGHEST_ACCURACY + ), + ) + request_filter = build_filter(spec, measure) + request = TraceItemTableRequest( + meta=meta, + columns=build_columns(spec), + order_by=build_order_by(spec), + group_by=build_group_by(spec), + limit=limit, + ) + if request_filter is not None: + request.filter.CopyFrom(request_filter) + if offset: + request.page_token.CopyFrom(PageToken(offset=offset)) + return request + + +def main(argv: Sequence[str] | None = None) -> int: + parser = argparse.ArgumentParser( + description="Print a Snuba EAP TraceItemTableRequest body from a small query spec.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument("spec", nargs="?", default=str(DEFAULT_SPEC_PATH)) + parser.add_argument("--organization-id", type=int, default=1) + parser.add_argument("--project-id", dest="project_ids", action="append", type=int) + parser.add_argument("--measure", choices=["segments", "spans"], default="segments") + parser.add_argument("--start", type=parse_datetime) + parser.add_argument("--end", type=parse_datetime) + parser.add_argument("--last-hours", type=float, default=1.0) + parser.add_argument("--cogs-category", default="snuba-admin") + parser.add_argument("--referrer") + parser.add_argument("--limit", type=int) + parser.add_argument("--offset", type=int, default=0) + args = parser.parse_args(argv) + if args.last_hours <= 0: + parser.error("--last-hours must be greater than 0") + start, end = resolve_time_window(start=args.start, end=args.end, last_hours=args.last_hours) + if start >= end: + parser.error("--start must be earlier than --end") + spec = load_spec(args.spec) + + request = build_request( + spec=spec, + measure=args.measure, + organization_id=args.organization_id, + project_ids=args.project_ids or [1], + start=start, + end=end, + cogs_category=args.cogs_category, + referrer=args.referrer or spec.get("referrer", DEFAULT_REFERRER), + limit=args.limit if args.limit is not None else spec.get("limit", 1), + offset=args.offset, + ) + print(MessageToJson(request, preserving_proto_field_name=True)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tools/eap_rpc_specs/get_eap_org_volume.json b/tools/eap_rpc_specs/get_eap_org_volume.json new file mode 100644 index 000000000000..878871291973 --- /dev/null +++ b/tools/eap_rpc_specs/get_eap_org_volume.json @@ -0,0 +1,9 @@ +{ + "referrer": "dynamic_sampling.per_org.get_eap_org_volume", + "query_string_by_measure": { + "segments": "is_transaction:true", + "spans": "" + }, + "selected_columns": ["count()", "count_sample()"], + "limit": 1 +} diff --git a/tools/eap_rpc_specs/get_eap_project_volumes.json b/tools/eap_rpc_specs/get_eap_project_volumes.json new file mode 100644 index 000000000000..3b6a0a76d508 --- /dev/null +++ b/tools/eap_rpc_specs/get_eap_project_volumes.json @@ -0,0 +1,10 @@ +{ + "referrer": "dynamic_sampling.per_org.get_eap_project_volumes", + "query_string_by_measure": { + "segments": "is_transaction:true", + "spans": "" + }, + "selected_columns": ["project.id", "count()", "count_sample()"], + "orderby": ["project.id"], + "limit": 9999 +} diff --git a/tools/eap_rpc_specs/get_eap_transaction_volumes.json b/tools/eap_rpc_specs/get_eap_transaction_volumes.json new file mode 100644 index 000000000000..8e0dbe157799 --- /dev/null +++ b/tools/eap_rpc_specs/get_eap_transaction_volumes.json @@ -0,0 +1,10 @@ +{ + "referrer": "dynamic_sampling.per_org.get_eap_transaction_volumes", + "query_string_by_measure": { + "segments": "is_transaction:true", + "spans": "" + }, + "selected_columns": ["project_id", "transaction", "count()", "count_sample()"], + "orderby": ["project_id", "transaction"], + "limit": 9998 +} From 661c99466247fe570cdcc1beff1929bd8db29e6d Mon Sep 17 00:00:00 2001 From: Simon Hellmayr Date: Thu, 7 May 2026 15:08:12 +0200 Subject: [PATCH 07/15] se utility for queries --- .../dynamic_sampling/per_org/tasks/queries.py | 63 ++++++++++--------- 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/src/sentry/dynamic_sampling/per_org/tasks/queries.py b/src/sentry/dynamic_sampling/per_org/tasks/queries.py index e58350254d70..a8edd4c18672 100644 --- a/src/sentry/dynamic_sampling/per_org/tasks/queries.py +++ b/src/sentry/dynamic_sampling/per_org/tasks/queries.py @@ -1,6 +1,7 @@ from __future__ import annotations from collections.abc import Iterator, Mapping +from dataclasses import dataclass from datetime import UTC, datetime, timedelta from typing import Any @@ -8,14 +9,11 @@ from sentry.constants import ObjectStatus from sentry.dynamic_sampling.per_org.tasks.configuration import BaseDynamicSamplingConfiguration -from sentry.dynamic_sampling.rules.utils import DecisionDropCount, DecisionKeepCount, ProjectId +from sentry.dynamic_sampling.rules.utils import ProjectId from sentry.dynamic_sampling.tasks.common import ( ACTIVE_ORGS_VOLUMES_DEFAULT_TIME_INTERVAL, OrganizationDataVolume, ) -from sentry.dynamic_sampling.tasks.constants import ( - CHUNK_SIZE, -) from sentry.dynamic_sampling.types import SamplingMeasure from sentry.models.project import Project from sentry.search.eap.constants import SAMPLING_MODE_HIGHEST_ACCURACY @@ -24,7 +22,14 @@ from sentry.snuba.referrer import Referrer from sentry.snuba.spans_rpc import Spans -ProjectVolumes = tuple[ProjectId, int, DecisionKeepCount, DecisionDropCount] + +@dataclass +class ProjectVolume: + project_id: ProjectId + total: int + keep: int + drop: int + EAP_ORGANIZATION_VOLUME_QUERY_STRINGS = { SamplingMeasure.SEGMENTS: "is_transaction:true", @@ -33,7 +38,7 @@ def _get_aggregate_int(row: Mapping[str, Any], column: str) -> int: - return int(row.get(column, 0)) + return int(row.get(column) or 0) def run_eap_spans_table_query_in_chunks( @@ -108,7 +113,7 @@ def get_eap_organization_volume( def get_eap_project_volumes( config: BaseDynamicSamplingConfiguration, time_interval: timedelta = timedelta(hours=1), -) -> list[ProjectVolumes]: +) -> list[ProjectVolume]: organization = config.organization projects = list( Project.objects.filter(organization_id=organization.id, status=ObjectStatus.ACTIVE) @@ -118,41 +123,37 @@ def get_eap_project_volumes( end_time = datetime.now(UTC) start_time = end_time - time_interval - offset = 0 - project_volumes: list[ProjectVolumes] = [] - more_results = True + project_volumes: list[ProjectVolume] = [] - while more_results: - result = Spans.run_table_query( - params=SnubaParams( + for data in run_eap_spans_table_query_in_chunks( + { + "params": SnubaParams( start=start_time, end=end_time, projects=projects, organization=organization, ), - query_string=EAP_ORGANIZATION_VOLUME_QUERY_STRINGS[config.measure], - selected_columns=["project.id", "count()", "count_sample()"], - orderby=["project.id"], - offset=offset, - limit=CHUNK_SIZE + 1, - referrer=Referrer.DYNAMIC_SAMPLING_PER_ORG_GET_EAP_PROJECT_VOLUMES.value, - config=SearchResolverConfig( + "query_string": EAP_ORGANIZATION_VOLUME_QUERY_STRINGS[config.measure], + "selected_columns": ["project.id", "count()", "count_sample()"], + "orderby": ["project.id"], + "referrer": Referrer.DYNAMIC_SAMPLING_PER_ORG_GET_EAP_PROJECT_VOLUMES.value, + "config": SearchResolverConfig( auto_fields=True, extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_SERVER_ONLY, ), - sampling_mode=SAMPLING_MODE_HIGHEST_ACCURACY, - ) - - data = result.get("data", []) - more_results = len(data) > CHUNK_SIZE - offset += CHUNK_SIZE - if more_results: - data = data[:-1] - + "sampling_mode": SAMPLING_MODE_HIGHEST_ACCURACY, + } + ): for row in data: total = _get_aggregate_int(row, "count()") keep = _get_aggregate_int(row, "count_sample()") - drop = max(total - keep, 0) - project_volumes.append((ProjectId(row["project.id"]), total, keep, drop)) + project_volumes.append( + ProjectVolume( + project_id=ProjectId(int(row["project.id"])), + total=total, + keep=keep, + drop=max(total - keep, 0), + ) + ) return project_volumes From b852a96bf8de69f5958cad4c6366b7a203fa07a0 Mon Sep 17 00:00:00 2001 From: Simon Hellmayr Date: Thu, 7 May 2026 15:54:58 +0200 Subject: [PATCH 08/15] remove tool --- tools/eap_org_volume_rpc_body.py | 236 ------------------ tools/eap_rpc_specs/get_eap_org_volume.json | 9 - .../get_eap_project_volumes.json | 10 - .../get_eap_transaction_volumes.json | 10 - 4 files changed, 265 deletions(-) delete mode 100644 tools/eap_org_volume_rpc_body.py delete mode 100644 tools/eap_rpc_specs/get_eap_org_volume.json delete mode 100644 tools/eap_rpc_specs/get_eap_project_volumes.json delete mode 100644 tools/eap_rpc_specs/get_eap_transaction_volumes.json diff --git a/tools/eap_org_volume_rpc_body.py b/tools/eap_org_volume_rpc_body.py deleted file mode 100644 index 7ff89d2651c2..000000000000 --- a/tools/eap_org_volume_rpc_body.py +++ /dev/null @@ -1,236 +0,0 @@ -from __future__ import annotations - -import argparse -import json -from collections.abc import Sequence -from datetime import UTC, datetime, timedelta -from pathlib import Path -from typing import Any - -from google.protobuf.json_format import MessageToJson, ParseDict -from google.protobuf.timestamp_pb2 import Timestamp -from sentry_protos.snuba.v1.downsampled_storage_pb2 import DownsampledStorageConfig -from sentry_protos.snuba.v1.endpoint_trace_item_table_pb2 import Column, TraceItemTableRequest -from sentry_protos.snuba.v1.request_common_pb2 import PageToken, RequestMeta, TraceItemType -from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ( - AttributeAggregation, - AttributeKey, - AttributeValue, - ExtrapolationMode, - Function, -) -from sentry_protos.snuba.v1.trace_item_filter_pb2 import ComparisonFilter, TraceItemFilter - -DEFAULT_REFERRER = "dynamic_sampling.per_org.get_eap_org_volume" -DEFAULT_SPEC_PATH = Path(__file__).with_name("eap_rpc_specs") / "get_eap_org_volume.json" - - -def get_attribute_key(column: str) -> AttributeKey: - if column in {"project.id", "project_id"}: - return AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.project_id") - if column == "transaction": - return AttributeKey(type=AttributeKey.TYPE_STRING, name="sentry.transaction") - if column == "is_transaction": - return AttributeKey(type=AttributeKey.TYPE_BOOLEAN, name="sentry.is_segment") - raise ValueError(f"Unsupported shortcut column: {column}") - - -def get_count_key(column: str) -> AttributeKey: - if column == "count()": - return AttributeKey(type=AttributeKey.TYPE_INT, name="sentry.project_id") - if column == "count_sample()": - return AttributeKey(type=AttributeKey.TYPE_DOUBLE, name="sentry.duration_ms") - raise ValueError(f"Unsupported aggregate shortcut: {column}") - - -def get_extrapolation_mode(column: str) -> ExtrapolationMode.ValueType: - if column == "count_sample()": - return ExtrapolationMode.EXTRAPOLATION_MODE_NONE - return ExtrapolationMode.EXTRAPOLATION_MODE_SERVER_ONLY - - -def parse_datetime(value: str) -> datetime: - parsed = datetime.fromisoformat(value.replace("Z", "+00:00")) - if parsed.tzinfo is None: - return parsed.replace(tzinfo=UTC) - return parsed.astimezone(UTC) - - -def to_timestamp(value: datetime) -> Timestamp: - timestamp = Timestamp() - timestamp.FromDatetime(value) - return timestamp - - -def resolve_time_window( - *, - start: datetime | None, - end: datetime | None, - last_hours: float, -) -> tuple[datetime, datetime]: - resolved_end = end or datetime.now(UTC) - resolved_start = start or resolved_end - timedelta(hours=last_hours) - return resolved_start, resolved_end - - -def load_spec(path: str) -> dict[str, Any]: - with open(path) as spec_file: - return json.load(spec_file) - - -def get_query_string(spec: dict[str, Any], measure: str) -> str: - if "query_string" in spec: - return spec["query_string"] - query_strings = spec.get("query_string_by_measure", {}) - return query_strings.get(measure, "") - - -def build_filter(spec: dict[str, Any], measure: str) -> TraceItemFilter | None: - if "filter" in spec: - return ParseDict(spec["filter"], TraceItemFilter()) - - query_string = get_query_string(spec, measure) - if not query_string: - return None - if query_string in {"is_transaction:true", "is_transaction:1"}: - return TraceItemFilter( - comparison_filter=ComparisonFilter( - key=get_attribute_key("is_transaction"), - op=ComparisonFilter.OP_EQUALS, - value=AttributeValue(val_bool=True), - ) - ) - raise ValueError(f"Unsupported query string shortcut: {query_string}") - - -def build_column(column: str) -> Column: - if column in {"count()", "count_sample()"}: - return Column( - aggregation=AttributeAggregation( - aggregate=Function.FUNCTION_COUNT, - key=get_count_key(column), - label=column, - extrapolation_mode=get_extrapolation_mode(column), - ) - ) - return Column(key=get_attribute_key(column), label=column) - - -def build_columns(spec: dict[str, Any]) -> list[Column]: - if "columns" in spec: - return [ParseDict(column, Column()) for column in spec["columns"]] - return [build_column(column) for column in spec.get("selected_columns", [])] - - -def build_order_by(spec: dict[str, Any]) -> list[TraceItemTableRequest.OrderBy]: - orderby = spec.get("orderby", spec.get("order_by")) - if not orderby: - return [] - if "order_by" in spec: - return [ - ParseDict(order_by, TraceItemTableRequest.OrderBy()) for order_by in spec["order_by"] - ] - return [ - TraceItemTableRequest.OrderBy( - column=build_column(column.lstrip("-")), - descending=column.startswith("-"), - ) - for column in orderby - ] - - -def build_group_by(spec: dict[str, Any]) -> list[AttributeKey]: - if "group_by" in spec: - return [ParseDict(group_by, AttributeKey()) for group_by in spec["group_by"]] - selected_columns = spec.get("selected_columns", []) - if not any(column in {"count()", "count_sample()"} for column in selected_columns): - return [] - return [ - get_attribute_key(column) - for column in selected_columns - if column not in {"count()", "count_sample()"} - ] - - -def build_request( - *, - spec: dict[str, Any], - measure: str, - organization_id: int, - project_ids: Sequence[int], - start: datetime, - end: datetime, - cogs_category: str, - referrer: str, - limit: int, - offset: int, -) -> TraceItemTableRequest: - meta = RequestMeta( - organization_id=organization_id, - project_ids=project_ids, - cogs_category=cogs_category, - referrer=referrer, - start_timestamp=to_timestamp(start), - end_timestamp=to_timestamp(end), - trace_item_type=TraceItemType.TRACE_ITEM_TYPE_SPAN, - downsampled_storage_config=DownsampledStorageConfig( - mode=DownsampledStorageConfig.MODE_HIGHEST_ACCURACY - ), - ) - request_filter = build_filter(spec, measure) - request = TraceItemTableRequest( - meta=meta, - columns=build_columns(spec), - order_by=build_order_by(spec), - group_by=build_group_by(spec), - limit=limit, - ) - if request_filter is not None: - request.filter.CopyFrom(request_filter) - if offset: - request.page_token.CopyFrom(PageToken(offset=offset)) - return request - - -def main(argv: Sequence[str] | None = None) -> int: - parser = argparse.ArgumentParser( - description="Print a Snuba EAP TraceItemTableRequest body from a small query spec.", - formatter_class=argparse.ArgumentDefaultsHelpFormatter, - ) - parser.add_argument("spec", nargs="?", default=str(DEFAULT_SPEC_PATH)) - parser.add_argument("--organization-id", type=int, default=1) - parser.add_argument("--project-id", dest="project_ids", action="append", type=int) - parser.add_argument("--measure", choices=["segments", "spans"], default="segments") - parser.add_argument("--start", type=parse_datetime) - parser.add_argument("--end", type=parse_datetime) - parser.add_argument("--last-hours", type=float, default=1.0) - parser.add_argument("--cogs-category", default="snuba-admin") - parser.add_argument("--referrer") - parser.add_argument("--limit", type=int) - parser.add_argument("--offset", type=int, default=0) - args = parser.parse_args(argv) - if args.last_hours <= 0: - parser.error("--last-hours must be greater than 0") - start, end = resolve_time_window(start=args.start, end=args.end, last_hours=args.last_hours) - if start >= end: - parser.error("--start must be earlier than --end") - spec = load_spec(args.spec) - - request = build_request( - spec=spec, - measure=args.measure, - organization_id=args.organization_id, - project_ids=args.project_ids or [1], - start=start, - end=end, - cogs_category=args.cogs_category, - referrer=args.referrer or spec.get("referrer", DEFAULT_REFERRER), - limit=args.limit if args.limit is not None else spec.get("limit", 1), - offset=args.offset, - ) - print(MessageToJson(request, preserving_proto_field_name=True)) - return 0 - - -if __name__ == "__main__": - raise SystemExit(main()) diff --git a/tools/eap_rpc_specs/get_eap_org_volume.json b/tools/eap_rpc_specs/get_eap_org_volume.json deleted file mode 100644 index 878871291973..000000000000 --- a/tools/eap_rpc_specs/get_eap_org_volume.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "referrer": "dynamic_sampling.per_org.get_eap_org_volume", - "query_string_by_measure": { - "segments": "is_transaction:true", - "spans": "" - }, - "selected_columns": ["count()", "count_sample()"], - "limit": 1 -} diff --git a/tools/eap_rpc_specs/get_eap_project_volumes.json b/tools/eap_rpc_specs/get_eap_project_volumes.json deleted file mode 100644 index 3b6a0a76d508..000000000000 --- a/tools/eap_rpc_specs/get_eap_project_volumes.json +++ /dev/null @@ -1,10 +0,0 @@ -{ - "referrer": "dynamic_sampling.per_org.get_eap_project_volumes", - "query_string_by_measure": { - "segments": "is_transaction:true", - "spans": "" - }, - "selected_columns": ["project.id", "count()", "count_sample()"], - "orderby": ["project.id"], - "limit": 9999 -} diff --git a/tools/eap_rpc_specs/get_eap_transaction_volumes.json b/tools/eap_rpc_specs/get_eap_transaction_volumes.json deleted file mode 100644 index 8e0dbe157799..000000000000 --- a/tools/eap_rpc_specs/get_eap_transaction_volumes.json +++ /dev/null @@ -1,10 +0,0 @@ -{ - "referrer": "dynamic_sampling.per_org.get_eap_transaction_volumes", - "query_string_by_measure": { - "segments": "is_transaction:true", - "spans": "" - }, - "selected_columns": ["project_id", "transaction", "count()", "count_sample()"], - "orderby": ["project_id", "transaction"], - "limit": 9998 -} From bef58703858fb344e5a5706c35a168175538ddcb Mon Sep 17 00:00:00 2001 From: Simon Hellmayr Date: Thu, 7 May 2026 16:14:37 +0200 Subject: [PATCH 09/15] allow ordering --- src/sentry/dynamic_sampling/per_org/tasks/queries.py | 2 +- .../sentry/dynamic_sampling/per_org/tasks/test_queries.py | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/sentry/dynamic_sampling/per_org/tasks/queries.py b/src/sentry/dynamic_sampling/per_org/tasks/queries.py index a8edd4c18672..558db4371d50 100644 --- a/src/sentry/dynamic_sampling/per_org/tasks/queries.py +++ b/src/sentry/dynamic_sampling/per_org/tasks/queries.py @@ -23,7 +23,7 @@ from sentry.snuba.spans_rpc import Spans -@dataclass +@dataclass(order=True) class ProjectVolume: project_id: ProjectId total: int diff --git a/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py b/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py index dba5bc1c9974..d8bfc9403762 100644 --- a/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py +++ b/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py @@ -10,6 +10,7 @@ get_configuration, ) from sentry.dynamic_sampling.per_org.tasks.queries import ( + ProjectVolume, get_eap_organization_volume, get_eap_project_volumes, run_eap_spans_table_query_in_chunks, @@ -230,8 +231,8 @@ def test_get_eap_project_volumes_existing_org(self) -> None: ) assert sorted(project_volumes) == [ - (project.id, 2, 2, 0), - (other_project.id, 1, 1, 0), + ProjectVolume(project_id=project.id, total=2, keep=2, drop=0), + ProjectVolume(project_id=other_project.id, total=1, keep=1, drop=0), ] run_table_query.assert_called_once() assert sorted(run_table_query.call_args.kwargs["params"].projects, key=lambda p: p.id) == [ @@ -283,7 +284,7 @@ def test_get_eap_project_volumes_handles_none_aggregate_values(self) -> None: ): project_volumes = get_eap_project_volumes(self.get_config(organization)) - assert project_volumes == [(project.id, 0, 0, 0)] + assert project_volumes == [ProjectVolume(project_id=project.id, total=0, keep=0, drop=0)] def test_get_eap_project_volumes_without_projects(self) -> None: organization = self.create_organization() From 1368d19b2abc12ef3f897dffe04617d27acf5cd3 Mon Sep 17 00:00:00 2001 From: Simon Hellmayr Date: Thu, 7 May 2026 16:21:00 +0200 Subject: [PATCH 10/15] differentiate plans --- .../per_org/tasks/configuration.py | 12 +++++++ .../per_org/tasks/scheduler.py | 7 ++-- .../per_org/tasks/test_scheduler.py | 36 ++++++++++++++++++- 3 files changed, 51 insertions(+), 4 deletions(-) diff --git a/src/sentry/dynamic_sampling/per_org/tasks/configuration.py b/src/sentry/dynamic_sampling/per_org/tasks/configuration.py index 9bc52dfcf461..6b2275d1de89 100644 --- a/src/sentry/dynamic_sampling/per_org/tasks/configuration.py +++ b/src/sentry/dynamic_sampling/per_org/tasks/configuration.py @@ -62,6 +62,10 @@ def is_span_based(self) -> bool: def is_segment_based(self) -> bool: return self.measure == SamplingMeasure.SEGMENTS + @property + def should_balance_projects(self) -> bool: + return False + def _get_sampling_measure(self) -> SamplingMeasure: if options.get("dynamic-sampling.check_span_feature_flag") and self.organization.id in ( options.get("dynamic-sampling.measure.spans") or [] @@ -96,6 +100,10 @@ def __init__(self, organization: Organization) -> None: def is_enabled(self) -> bool: return self.sample_rate is not None + @property + def should_balance_projects(self) -> bool: + return True + class CustomDynamicSamplingOrganizationConfiguration(BaseDynamicSamplingConfiguration): sample_rate: TargetSampleRate @@ -112,6 +120,10 @@ def __init__(self, organization: Organization) -> None: def is_enabled(self) -> bool: return True + @property + def should_balance_projects(self) -> bool: + return True + class CustomDynamicSamplingProjectConfiguration(BaseDynamicSamplingConfiguration): project_target_sample_rates: ProjectTargetSampleRates diff --git a/src/sentry/dynamic_sampling/per_org/tasks/scheduler.py b/src/sentry/dynamic_sampling/per_org/tasks/scheduler.py index beca20adde43..e91d55ed86b3 100644 --- a/src/sentry/dynamic_sampling/per_org/tasks/scheduler.py +++ b/src/sentry/dynamic_sampling/per_org/tasks/scheduler.py @@ -108,8 +108,9 @@ def run_calculations_per_org_task(org_id: OrganizationId) -> TelemetryStatus | N if org_volume is None: return TelemetryStatus.NO_ORG_VOLUME - project_volumes = get_eap_project_volumes(config) - if not project_volumes: - return TelemetryStatus.NO_PROJECT_VOLUMES + if config.should_balance_projects: + project_volumes = get_eap_project_volumes(config) + if not project_volumes: + return TelemetryStatus.NO_PROJECT_VOLUMES return None diff --git a/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py b/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py index 00c85bff85d0..ada7f2228771 100644 --- a/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py +++ b/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py @@ -259,7 +259,41 @@ def test_run_calculations_per_org_uses_measure_options_for_project_mode(self) -> ) as get_volume, patch( "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_project_volumes", - return_value=[(project.id, 100, 25, 75)], + ) as get_project_volumes, + ): + result = run_calculations_per_org_task(org.id) + + assert result is None + get_blended_sample_rate.assert_not_called() + _assert_called_once_with_config(get_volume, org.id, measure_case.expected_measure) + get_project_volumes.assert_not_called() + + @override_options({"dynamic-sampling.per_org.rollout-rate": 1.0}) + def test_run_calculations_per_org_queries_projects_for_am3_org_mode(self) -> None: + for measure_case in MEASURE_OPTION_CASES: + with self.subTest(measure_case=measure_case.name): + org = self.create_organization() + org.update_option("sentry:sampling_mode", DynamicSamplingMode.ORGANIZATION) + org_volume = OrganizationDataVolume(org_id=org.id, total=100, indexed=25) + + with ( + self.feature("organizations:dynamic-sampling-custom"), + self.options( + { + "dynamic-sampling.check_span_feature_flag": measure_case.check_span_feature_flag, + "dynamic-sampling.measure.spans": measure_case.span_org_ids(org.id), + } + ), + patch( + "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate" + ) as get_blended_sample_rate, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_organization_volume", + return_value=org_volume, + ) as get_volume, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_project_volumes", + return_value=[(1, 100, 25, 75)], ) as get_project_volumes, ): result = run_calculations_per_org_task(org.id) From bcc27bf81dac5aea9fefb7ae8c53f77a92b20b63 Mon Sep 17 00:00:00 2001 From: Simon Hellmayr Date: Thu, 7 May 2026 16:31:44 +0200 Subject: [PATCH 11/15] property --- .../dynamic_sampling/per_org/tasks/configuration.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/sentry/dynamic_sampling/per_org/tasks/configuration.py b/src/sentry/dynamic_sampling/per_org/tasks/configuration.py index 6b2275d1de89..95c0758503f1 100644 --- a/src/sentry/dynamic_sampling/per_org/tasks/configuration.py +++ b/src/sentry/dynamic_sampling/per_org/tasks/configuration.py @@ -85,6 +85,7 @@ def is_enabled(self) -> bool: class AutomaticDynamicSamplingConfiguration(BaseDynamicSamplingConfiguration): sample_rate: TargetSampleRate + should_balance_projects: bool = True def __init__(self, organization: Organization) -> None: super().__init__(organization) @@ -100,13 +101,10 @@ def __init__(self, organization: Organization) -> None: def is_enabled(self) -> bool: return self.sample_rate is not None - @property - def should_balance_projects(self) -> bool: - return True - class CustomDynamicSamplingOrganizationConfiguration(BaseDynamicSamplingConfiguration): sample_rate: TargetSampleRate + should_balance_projects: bool = True def __init__(self, organization: Organization) -> None: super().__init__(organization) @@ -120,13 +118,10 @@ def __init__(self, organization: Organization) -> None: def is_enabled(self) -> bool: return True - @property - def should_balance_projects(self) -> bool: - return True - class CustomDynamicSamplingProjectConfiguration(BaseDynamicSamplingConfiguration): project_target_sample_rates: ProjectTargetSampleRates + should_balance_projects: bool = False def __init__(self, organization: Organization) -> None: super().__init__(organization) From 8b37eac083a990dd55b5a4cedd48e3d35d27aaed Mon Sep 17 00:00:00 2001 From: Simon Hellmayr Date: Thu, 7 May 2026 16:33:03 +0200 Subject: [PATCH 12/15] only set where necessary --- src/sentry/dynamic_sampling/per_org/tasks/configuration.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/sentry/dynamic_sampling/per_org/tasks/configuration.py b/src/sentry/dynamic_sampling/per_org/tasks/configuration.py index 95c0758503f1..2a05bd53f004 100644 --- a/src/sentry/dynamic_sampling/per_org/tasks/configuration.py +++ b/src/sentry/dynamic_sampling/per_org/tasks/configuration.py @@ -45,6 +45,7 @@ def get_configuration(organization_id: int) -> BaseDynamicSamplingConfiguration: class BaseDynamicSamplingConfiguration(ABC): measure: SamplingMeasure + should_balance_projects: bool = True def __init__(self, organization: Organization) -> None: self.organization = organization @@ -62,10 +63,6 @@ def is_span_based(self) -> bool: def is_segment_based(self) -> bool: return self.measure == SamplingMeasure.SEGMENTS - @property - def should_balance_projects(self) -> bool: - return False - def _get_sampling_measure(self) -> SamplingMeasure: if options.get("dynamic-sampling.check_span_feature_flag") and self.organization.id in ( options.get("dynamic-sampling.measure.spans") or [] @@ -85,7 +82,6 @@ def is_enabled(self) -> bool: class AutomaticDynamicSamplingConfiguration(BaseDynamicSamplingConfiguration): sample_rate: TargetSampleRate - should_balance_projects: bool = True def __init__(self, organization: Organization) -> None: super().__init__(organization) @@ -104,7 +100,6 @@ def is_enabled(self) -> bool: class CustomDynamicSamplingOrganizationConfiguration(BaseDynamicSamplingConfiguration): sample_rate: TargetSampleRate - should_balance_projects: bool = True def __init__(self, organization: Organization) -> None: super().__init__(organization) From b8f0123a4291e746e5e855df556e44ab3175b050 Mon Sep 17 00:00:00 2001 From: Simon Hellmayr Date: Fri, 8 May 2026 11:51:39 +0200 Subject: [PATCH 13/15] not this again --- src/sentry/dynamic_sampling/per_org/tasks/queries.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sentry/dynamic_sampling/per_org/tasks/queries.py b/src/sentry/dynamic_sampling/per_org/tasks/queries.py index 558db4371d50..2c5316ee3f8f 100644 --- a/src/sentry/dynamic_sampling/per_org/tasks/queries.py +++ b/src/sentry/dynamic_sampling/per_org/tasks/queries.py @@ -38,7 +38,7 @@ class ProjectVolume: def _get_aggregate_int(row: Mapping[str, Any], column: str) -> int: - return int(row.get(column) or 0) + return int(row.get(column, 0)) def run_eap_spans_table_query_in_chunks( From f0bf4f0857fcca5776236acaf9db1b47f2b94127 Mon Sep 17 00:00:00 2001 From: Simon Hellmayr Date: Fri, 8 May 2026 13:39:59 +0200 Subject: [PATCH 14/15] ref(dynamic-sampling): Remove EAP measure switching Keep the per-org project volume work on transaction segments so measure-specific EAP query changes can live on a dependent branch. Co-authored-by: Cursor --- .../dynamic_sampling/per_org/tasks/queries.py | 14 +- .../per_org/tasks/test_queries.py | 52 +---- .../per_org/tasks/test_scheduler.py | 206 +++++++----------- 3 files changed, 82 insertions(+), 190 deletions(-) diff --git a/src/sentry/dynamic_sampling/per_org/tasks/queries.py b/src/sentry/dynamic_sampling/per_org/tasks/queries.py index 2c5316ee3f8f..2df1b353101a 100644 --- a/src/sentry/dynamic_sampling/per_org/tasks/queries.py +++ b/src/sentry/dynamic_sampling/per_org/tasks/queries.py @@ -14,7 +14,6 @@ ACTIVE_ORGS_VOLUMES_DEFAULT_TIME_INTERVAL, OrganizationDataVolume, ) -from sentry.dynamic_sampling.types import SamplingMeasure from sentry.models.project import Project from sentry.search.eap.constants import SAMPLING_MODE_HIGHEST_ACCURACY from sentry.search.eap.types import SearchResolverConfig @@ -31,14 +30,9 @@ class ProjectVolume: drop: int -EAP_ORGANIZATION_VOLUME_QUERY_STRINGS = { - SamplingMeasure.SEGMENTS: "is_transaction:true", - SamplingMeasure.SPANS: "", -} - - def _get_aggregate_int(row: Mapping[str, Any], column: str) -> int: - return int(row.get(column, 0)) + value = row.get(column) + return int(value) if value is not None else 0 def run_eap_spans_table_query_in_chunks( @@ -84,7 +78,7 @@ def get_eap_organization_volume( projects=projects, organization=organization, ), - query_string=EAP_ORGANIZATION_VOLUME_QUERY_STRINGS[config.measure], + query_string="is_transaction:true", selected_columns=["count()", "count_sample()"], orderby=None, offset=0, @@ -133,7 +127,7 @@ def get_eap_project_volumes( projects=projects, organization=organization, ), - "query_string": EAP_ORGANIZATION_VOLUME_QUERY_STRINGS[config.measure], + "query_string": "is_transaction:true", "selected_columns": ["project.id", "count()", "count_sample()"], "orderby": ["project.id"], "referrer": Referrer.DYNAMIC_SAMPLING_PER_ORG_GET_EAP_PROJECT_VOLUMES.value, diff --git a/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py b/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py index d8bfc9403762..4351681d8569 100644 --- a/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py +++ b/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py @@ -16,7 +16,6 @@ run_eap_spans_table_query_in_chunks, ) from sentry.dynamic_sampling.tasks.common import OrganizationDataVolume -from sentry.dynamic_sampling.types import SamplingMeasure from sentry.models.organization import Organization from sentry.search.eap.constants import SAMPLING_MODE_HIGHEST_ACCURACY from sentry.search.eap.types import SearchResolverConfig @@ -85,21 +84,10 @@ class EAPOrganizationVolumeTest(TestCase, SnubaTestCase, SpanTestCase): def get_config( self, organization: Organization, - measure: SamplingMeasure = SamplingMeasure.SEGMENTS, ) -> BaseDynamicSamplingConfiguration: - option_values: dict[str, object] = {"dynamic-sampling.check_span_feature_flag": False} - if measure == SamplingMeasure.SPANS: - option_values = { - "dynamic-sampling.check_span_feature_flag": True, - "dynamic-sampling.measure.spans": [organization.id], - } - - with ( - self.options(option_values), - patch( - "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate", - return_value=1.0, - ), + with patch( + "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate", + return_value=1.0, ): return get_configuration(organization.id) @@ -170,23 +158,6 @@ def test_get_eap_organization_volume_returns_raw_and_extrapolated_counts(self) - assert org_volume == OrganizationDataVolume(org_id=organization.id, total=10, indexed=1) - def test_get_eap_organization_volume_with_spans_measure(self) -> None: - organization = self.create_organization() - self.create_project(organization=organization) - - with patch( - "sentry.dynamic_sampling.per_org.tasks.queries.Spans.run_table_query", - return_value={"data": [{"count()": 2, "count_sample()": 2}]}, - ) as run_table_query: - org_volume = get_eap_organization_volume( - self.get_config(organization, SamplingMeasure.SPANS), - time_interval=timedelta(hours=1), - ) - - assert org_volume == OrganizationDataVolume(org_id=organization.id, total=2, indexed=2) - run_table_query.assert_called_once() - assert run_table_query.call_args.kwargs["query_string"] == "" - def test_get_eap_organization_volume_without_traffic(self) -> None: organization = self.create_organization() self.create_project(organization=organization) @@ -241,23 +212,6 @@ def test_get_eap_project_volumes_existing_org(self) -> None: ] assert run_table_query.call_args.kwargs["query_string"] == "is_transaction:true" - def test_get_eap_project_volumes_with_spans_measure(self) -> None: - organization = self.create_organization() - self.create_project(organization=organization) - - with patch( - "sentry.dynamic_sampling.per_org.tasks.queries.Spans.run_table_query", - return_value={"data": []}, - ) as run_table_query: - project_volumes = get_eap_project_volumes( - self.get_config(organization, SamplingMeasure.SPANS), - time_interval=timedelta(hours=1), - ) - - assert project_volumes == [] - run_table_query.assert_called_once() - assert run_table_query.call_args.kwargs["query_string"] == "" - def test_get_eap_project_volumes_without_traffic(self) -> None: organization = self.create_organization() self.create_project(organization=organization) diff --git a/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py b/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py index ada7f2228771..ed58d41e5aa8 100644 --- a/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py +++ b/tests/sentry/dynamic_sampling/per_org/tasks/test_scheduler.py @@ -1,7 +1,5 @@ from __future__ import annotations -from collections.abc import Callable -from typing import NamedTuple from unittest.mock import Mock, patch from django.core.exceptions import ObjectDoesNotExist @@ -17,47 +15,21 @@ from sentry.dynamic_sampling.per_org.tasks.telemetry import TelemetryStatus from sentry.dynamic_sampling.rules.utils import get_redis_client_for_ds from sentry.dynamic_sampling.tasks.common import OrganizationDataVolume -from sentry.dynamic_sampling.types import DynamicSamplingMode, SamplingMeasure +from sentry.dynamic_sampling.types import DynamicSamplingMode from sentry.models.organization import OrganizationStatus from sentry.testutils.cases import TestCase from sentry.testutils.helpers.options import override_options from sentry.testutils.helpers.task_runner import BurstTaskRunner -SpanOrgIds = Callable[[int], list[int]] - - -class MeasureOptionCase(NamedTuple): - name: str - check_span_feature_flag: bool - span_org_ids: SpanOrgIds - expected_measure: SamplingMeasure - - -def _include_org_id(org_id: int) -> list[int]: - return [org_id] - - -def _exclude_org_id(org_id: int) -> list[int]: - return [] - - -MEASURE_OPTION_CASES = ( - MeasureOptionCase("span-option-disabled", False, _include_org_id, SamplingMeasure.SEGMENTS), - MeasureOptionCase("org-not-in-span-option", True, _exclude_org_id, SamplingMeasure.SEGMENTS), - MeasureOptionCase("org-in-span-option", True, _include_org_id, SamplingMeasure.SPANS), -) - def _assert_called_once_with_config( mock: Mock, organization_id: int, - measure: SamplingMeasure = SamplingMeasure.SEGMENTS, ) -> BaseDynamicSamplingConfiguration: mock.assert_called_once() config = mock.call_args.args[0] assert isinstance(config, BaseDynamicSamplingConfiguration) assert config.organization.id == organization_id - assert config.measure == measure return config @@ -233,77 +205,59 @@ def test_run_calculations_per_org_returns_no_volume_without_project_volumes(self _assert_called_once_with_config(get_project_volumes, org.id) @override_options({"dynamic-sampling.per_org.rollout-rate": 1.0}) - def test_run_calculations_per_org_uses_measure_options_for_project_mode(self) -> None: - for measure_case in MEASURE_OPTION_CASES: - with self.subTest(measure_case=measure_case.name): - org = self.create_organization() - project = self.create_project(organization=org) - org.update_option("sentry:sampling_mode", DynamicSamplingMode.PROJECT) - project.update_option("sentry:target_sample_rate", 0.2) - org_volume = OrganizationDataVolume(org_id=org.id, total=100, indexed=25) - - with ( - self.feature("organizations:dynamic-sampling-custom"), - self.options( - { - "dynamic-sampling.check_span_feature_flag": measure_case.check_span_feature_flag, - "dynamic-sampling.measure.spans": measure_case.span_org_ids(org.id), - } - ), - patch( - "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate" - ) as get_blended_sample_rate, - patch( - "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_organization_volume", - return_value=org_volume, - ) as get_volume, - patch( - "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_project_volumes", - ) as get_project_volumes, - ): - result = run_calculations_per_org_task(org.id) - - assert result is None - get_blended_sample_rate.assert_not_called() - _assert_called_once_with_config(get_volume, org.id, measure_case.expected_measure) - get_project_volumes.assert_not_called() + def test_run_calculations_per_org_skips_project_balancing_for_project_mode(self) -> None: + org = self.create_organization() + project = self.create_project(organization=org) + org.update_option("sentry:sampling_mode", DynamicSamplingMode.PROJECT) + project.update_option("sentry:target_sample_rate", 0.2) + org_volume = OrganizationDataVolume(org_id=org.id, total=100, indexed=25) + + with ( + self.feature("organizations:dynamic-sampling-custom"), + patch( + "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate" + ) as get_blended_sample_rate, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_organization_volume", + return_value=org_volume, + ) as get_volume, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_project_volumes", + ) as get_project_volumes, + ): + result = run_calculations_per_org_task(org.id) + + assert result is None + get_blended_sample_rate.assert_not_called() + _assert_called_once_with_config(get_volume, org.id) + get_project_volumes.assert_not_called() @override_options({"dynamic-sampling.per_org.rollout-rate": 1.0}) def test_run_calculations_per_org_queries_projects_for_am3_org_mode(self) -> None: - for measure_case in MEASURE_OPTION_CASES: - with self.subTest(measure_case=measure_case.name): - org = self.create_organization() - org.update_option("sentry:sampling_mode", DynamicSamplingMode.ORGANIZATION) - org_volume = OrganizationDataVolume(org_id=org.id, total=100, indexed=25) - - with ( - self.feature("organizations:dynamic-sampling-custom"), - self.options( - { - "dynamic-sampling.check_span_feature_flag": measure_case.check_span_feature_flag, - "dynamic-sampling.measure.spans": measure_case.span_org_ids(org.id), - } - ), - patch( - "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate" - ) as get_blended_sample_rate, - patch( - "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_organization_volume", - return_value=org_volume, - ) as get_volume, - patch( - "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_project_volumes", - return_value=[(1, 100, 25, 75)], - ) as get_project_volumes, - ): - result = run_calculations_per_org_task(org.id) - - assert result is None - get_blended_sample_rate.assert_not_called() - _assert_called_once_with_config(get_volume, org.id, measure_case.expected_measure) - _assert_called_once_with_config( - get_project_volumes, org.id, measure_case.expected_measure - ) + org = self.create_organization() + org.update_option("sentry:sampling_mode", DynamicSamplingMode.ORGANIZATION) + org_volume = OrganizationDataVolume(org_id=org.id, total=100, indexed=25) + + with ( + self.feature("organizations:dynamic-sampling-custom"), + patch( + "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate" + ) as get_blended_sample_rate, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_organization_volume", + return_value=org_volume, + ) as get_volume, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_project_volumes", + return_value=[(1, 100, 25, 75)], + ) as get_project_volumes, + ): + result = run_calculations_per_org_task(org.id) + + assert result is None + get_blended_sample_rate.assert_not_called() + _assert_called_once_with_config(get_volume, org.id) + _assert_called_once_with_config(get_project_volumes, org.id) @override_options({"dynamic-sampling.per_org.rollout-rate": 1.0}) def test_run_calculations_per_org_skips_project_mode_without_project_rates(self) -> None: @@ -331,40 +285,30 @@ def test_run_calculations_per_org_skips_project_mode_without_project_rates(self) get_project_volumes.assert_not_called() @override_options({"dynamic-sampling.per_org.rollout-rate": 1.0}) - def test_run_calculations_per_org_uses_measure_options_for_am2(self) -> None: - for measure_case in MEASURE_OPTION_CASES: - with self.subTest(measure_case=measure_case.name): - org = self.create_organization() - org_volume = OrganizationDataVolume(org_id=org.id, total=100, indexed=25) - - with ( - self.options( - { - "dynamic-sampling.check_span_feature_flag": measure_case.check_span_feature_flag, - "dynamic-sampling.measure.spans": measure_case.span_org_ids(org.id), - } - ), - patch( - "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate", - return_value=1.0, - ) as get_blended_sample_rate, - patch( - "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_organization_volume", - return_value=org_volume, - ) as get_volume, - patch( - "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_project_volumes", - return_value=[(1, 100, 25, 75)], - ) as get_project_volumes, - ): - result = run_calculations_per_org_task(org.id) - - assert result is None - get_blended_sample_rate.assert_called_once_with(organization_id=org.id) - _assert_called_once_with_config(get_volume, org.id, measure_case.expected_measure) - _assert_called_once_with_config( - get_project_volumes, org.id, measure_case.expected_measure - ) + def test_run_calculations_per_org_queries_projects_for_am2(self) -> None: + org = self.create_organization() + org_volume = OrganizationDataVolume(org_id=org.id, total=100, indexed=25) + + with ( + patch( + "sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate", + return_value=1.0, + ) as get_blended_sample_rate, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_organization_volume", + return_value=org_volume, + ) as get_volume, + patch( + "sentry.dynamic_sampling.per_org.tasks.scheduler.get_eap_project_volumes", + return_value=[(1, 100, 25, 75)], + ) as get_project_volumes, + ): + result = run_calculations_per_org_task(org.id) + + assert result is None + get_blended_sample_rate.assert_called_once_with(organization_id=org.id) + _assert_called_once_with_config(get_volume, org.id) + _assert_called_once_with_config(get_project_volumes, org.id) @override_options({"dynamic-sampling.per_org.rollout-rate": 1.0}) def test_run_calculations_per_org_skips_org_without_transaction_sample_rate(self) -> None: From 904975a1ebb36b99893f6bcc7535d401dbde9e24 Mon Sep 17 00:00:00 2001 From: Simon Hellmayr Date: Tue, 12 May 2026 09:03:14 +0200 Subject: [PATCH 15/15] use is_segment and dsc.root.project --- .../dynamic_sampling/per_org/tasks/queries.py | 8 ++--- .../per_org/tasks/test_queries.py | 32 ++++++++++++++++--- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/src/sentry/dynamic_sampling/per_org/tasks/queries.py b/src/sentry/dynamic_sampling/per_org/tasks/queries.py index 2df1b353101a..d5b36a8f765d 100644 --- a/src/sentry/dynamic_sampling/per_org/tasks/queries.py +++ b/src/sentry/dynamic_sampling/per_org/tasks/queries.py @@ -127,9 +127,9 @@ def get_eap_project_volumes( projects=projects, organization=organization, ), - "query_string": "is_transaction:true", - "selected_columns": ["project.id", "count()", "count_sample()"], - "orderby": ["project.id"], + "query_string": "is_segment:true", + "selected_columns": ["sentry.dsc.root_project", "count()", "count_sample()"], + "orderby": ["sentry.dsc.root_project"], "referrer": Referrer.DYNAMIC_SAMPLING_PER_ORG_GET_EAP_PROJECT_VOLUMES.value, "config": SearchResolverConfig( auto_fields=True, @@ -143,7 +143,7 @@ def get_eap_project_volumes( keep = _get_aggregate_int(row, "count_sample()") project_volumes.append( ProjectVolume( - project_id=ProjectId(int(row["project.id"])), + project_id=ProjectId(int(row["sentry.dsc.root_project"])), total=total, keep=keep, drop=max(total - keep, 0), diff --git a/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py b/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py index 4351681d8569..470b77a251fe 100644 --- a/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py +++ b/tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py @@ -192,8 +192,16 @@ def test_get_eap_project_volumes_existing_org(self) -> None: "sentry.dynamic_sampling.per_org.tasks.queries.Spans.run_table_query", return_value={ "data": [ - {"project.id": project.id, "count()": 2, "count_sample()": 2}, - {"project.id": other_project.id, "count()": 1, "count_sample()": 1}, + { + "sentry.dsc.root_project": project.id, + "count()": 2, + "count_sample()": 2, + }, + { + "sentry.dsc.root_project": other_project.id, + "count()": 1, + "count_sample()": 1, + }, ] }, ) as run_table_query: @@ -210,7 +218,17 @@ def test_get_eap_project_volumes_existing_org(self) -> None: project, other_project, ] - assert run_table_query.call_args.kwargs["query_string"] == "is_transaction:true" + assert run_table_query.call_args.kwargs["query_string"] == "is_segment:true" + assert run_table_query.call_args.kwargs["selected_columns"] == [ + "sentry.dsc.root_project", + "count()", + "count_sample()", + ] + assert run_table_query.call_args.kwargs["orderby"] == ["sentry.dsc.root_project"] + assert ( + run_table_query.call_args.kwargs["referrer"] + == Referrer.DYNAMIC_SAMPLING_PER_ORG_GET_EAP_PROJECT_VOLUMES.value + ) def test_get_eap_project_volumes_without_traffic(self) -> None: organization = self.create_organization() @@ -233,7 +251,13 @@ def test_get_eap_project_volumes_handles_none_aggregate_values(self) -> None: with patch( "sentry.dynamic_sampling.per_org.tasks.queries.Spans.run_table_query", return_value={ - "data": [{"project.id": project.id, "count()": None, "count_sample()": None}] + "data": [ + { + "sentry.dsc.root_project": project.id, + "count()": None, + "count_sample()": None, + } + ] }, ): project_volumes = get_eap_project_volumes(self.get_config(organization))