Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/sentry/dynamic_sampling/per_org/tasks/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def get_configuration(organization_id: int) -> BaseDynamicSamplingConfiguration:

class BaseDynamicSamplingConfiguration(ABC):
measure: SamplingMeasure
should_balance_projects: bool = True

def __init__(self, organization: Organization) -> None:
self.organization = organization
Expand Down Expand Up @@ -115,6 +116,7 @@ def is_enabled(self) -> bool:

class CustomDynamicSamplingProjectConfiguration(BaseDynamicSamplingConfiguration):
project_target_sample_rates: ProjectTargetSampleRates
should_balance_projects: bool = False

def __init__(self, organization: Organization) -> None:
super().__init__(organization)
Expand Down
69 changes: 65 additions & 4 deletions src/sentry/dynamic_sampling/per_org/tasks/queries.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from __future__ import annotations

from collections.abc import Iterator, Mapping
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from typing import Any

from sentry_protos.snuba.v1.trace_item_attribute_pb2 import ExtrapolationMode

from sentry.constants import ObjectStatus
from sentry.dynamic_sampling.per_org.tasks.configuration import BaseDynamicSamplingConfiguration
from sentry.dynamic_sampling.rules.utils import ProjectId
from sentry.dynamic_sampling.tasks.common import (
ACTIVE_ORGS_VOLUMES_DEFAULT_TIME_INTERVAL,
OrganizationDataVolume,
Expand All @@ -20,8 +22,17 @@
from sentry.snuba.spans_rpc import Spans


@dataclass(order=True)
class ProjectVolume:
project_id: ProjectId
total: int
keep: int
drop: int


def _get_aggregate_int(row: Mapping[str, Any], column: str) -> int:
return int(row.get(column, 0))
value = row.get(column)
return int(value) if value is not None else 0


def run_eap_spans_table_query_in_chunks(
Expand Down Expand Up @@ -51,8 +62,9 @@
config: BaseDynamicSamplingConfiguration,
time_interval: timedelta = ACTIVE_ORGS_VOLUMES_DEFAULT_TIME_INTERVAL,
) -> OrganizationDataVolume | None:
organization = config.organization
projects = list(
Project.objects.filter(organization_id=config.organization.id, status=ObjectStatus.ACTIVE)
Project.objects.filter(organization_id=organization.id, status=ObjectStatus.ACTIVE)
)
if not projects:
return None
Expand All @@ -64,7 +76,7 @@
start=start_time,
end=end_time,
projects=projects,
organization=config.organization,
organization=organization,
),
query_string="is_transaction:true",
selected_columns=["count()", "count_sample()"],
Expand All @@ -89,4 +101,53 @@
return None
indexed = _get_aggregate_int(row, "count_sample()")

return OrganizationDataVolume(org_id=config.organization.id, total=total, indexed=indexed)
return OrganizationDataVolume(org_id=organization.id, total=total, indexed=indexed)


def get_eap_project_volumes(
config: BaseDynamicSamplingConfiguration,
time_interval: timedelta = timedelta(hours=1),
) -> list[ProjectVolume]:
organization = config.organization
projects = list(
Project.objects.filter(organization_id=organization.id, status=ObjectStatus.ACTIVE)
)
if not projects:
return []

end_time = datetime.now(UTC)
start_time = end_time - time_interval
project_volumes: list[ProjectVolume] = []

for data in run_eap_spans_table_query_in_chunks(
{
"params": SnubaParams(
start=start_time,
end=end_time,
projects=projects,
organization=organization,
),
"query_string": "is_segment:true",
"selected_columns": ["sentry.dsc.root_project", "count()", "count_sample()"],
"orderby": ["sentry.dsc.root_project"],
"referrer": Referrer.DYNAMIC_SAMPLING_PER_ORG_GET_EAP_PROJECT_VOLUMES.value,
"config": SearchResolverConfig(
auto_fields=True,
extrapolation_mode=ExtrapolationMode.EXTRAPOLATION_MODE_SERVER_ONLY,
),
"sampling_mode": SAMPLING_MODE_HIGHEST_ACCURACY,
}
):
for row in data:
total = _get_aggregate_int(row, "count()")
keep = _get_aggregate_int(row, "count_sample()")
project_volumes.append(
ProjectVolume(
project_id=ProjectId(int(row["sentry.dsc.root_project"])),

Check warning on line 146 in src/sentry/dynamic_sampling/per_org/tasks/queries.py

View check run for this annotation

@sentry/warden / warden: sentry-backend-bugs

Unguarded dict access on 'sentry.dsc.root_project' can raise KeyError/TypeError

Row access uses `row["sentry.dsc.root_project"]` without a presence/None check, then wraps it with `int(...)`. If Snuba returns rows where this column is missing (e.g., spans without a DSC root project tag) or where the value is None, this raises KeyError or TypeError, aborting the entire per-project volume computation for the organization. The sibling helper `_get_aggregate_int` was explicitly hardened in this same diff to handle None, but the group-by key was not, leaving an inconsistent safety posture.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Calling int() on row["sentry.dsc.root_project"] will raise a TypeError when the value is None, which occurs for spans lacking a DSC header.
Severity: HIGH

Suggested Fix

Add a null check before casting the sentry.dsc.root_project value to an integer. Handle the None case gracefully, for example by assigning a default value or skipping the row, similar to how _get_aggregate_int handles nulls.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: src/sentry/dynamic_sampling/per_org/tasks/queries.py#L146

Potential issue: The code directly accesses `row["sentry.dsc.root_project"]` and
attempts to cast it to an integer using `int()`. However, when processing spans that
lack a Dynamic Sampling Context (DSC) header, which is a realistic scenario with older
SDKs or incomplete trace propagation, the value for `sentry.dsc.root_project` can be
`None`. Attempting to execute `int(None)` will raise a `TypeError`, causing the task to
crash. While other parts of the code handle null values correctly, this specific access
point lacks the necessary null check.

Did we get this right? 👍 / 👎 to inform future reviews.

total=total,
keep=keep,
drop=max(total - keep, 0),
)
)

return project_volumes
12 changes: 10 additions & 2 deletions src/sentry/dynamic_sampling/per_org/tasks/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@

from sentry.dynamic_sampling.per_org.tasks.configuration import get_configuration
from sentry.dynamic_sampling.per_org.tasks.gate import is_org_in_rollout
from sentry.dynamic_sampling.per_org.tasks.queries import get_eap_organization_volume
from sentry.dynamic_sampling.per_org.tasks.queries import (
get_eap_organization_volume,
get_eap_project_volumes,
)
from sentry.dynamic_sampling.per_org.tasks.telemetry import (
SCHEDULER_BUCKET_ORG_STATUS_METRIC,
TelemetryStatus,
Expand Down Expand Up @@ -103,6 +106,11 @@ def run_calculations_per_org_task(org_id: OrganizationId) -> TelemetryStatus | N

org_volume = get_eap_organization_volume(config)
if org_volume is None:
return TelemetryStatus.NO_VOLUME
return TelemetryStatus.NO_ORG_VOLUME

if config.should_balance_projects:
project_volumes = get_eap_project_volumes(config)
if not project_volumes:
return TelemetryStatus.NO_PROJECT_VOLUMES

return None
3 changes: 2 additions & 1 deletion src/sentry/dynamic_sampling/per_org/tasks/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ class TelemetryStatus(StrEnum):
FAILED = "failed"
KILLSWITCHED = "killswitched"
NO_SUBSCRIPTION = "no_subscription"
NO_VOLUME = "no_volume"
NO_ORG_VOLUME = "no_org_volume"
NO_PROJECT_VOLUMES = "no_project_volumes"
NOT_IN_ROLLOUT = "not_in_rollout"
ORG_HAS_NO_DYNAMIC_SAMPLING = "org_has_no_dynamic_sampling"
ORG_NOT_FOUND = "org_not_found"
Expand Down
3 changes: 3 additions & 0 deletions src/sentry/snuba/referrer.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,9 @@ class Referrer(StrEnum):
"dynamic_sampling.distribution.fetch_projects_with_count_per_root_total_volumes"
)
DYNAMIC_SAMPLING_PER_ORG_GET_EAP_ORG_VOLUME = "dynamic_sampling.per_org.get_eap_org_volume"
DYNAMIC_SAMPLING_PER_ORG_GET_EAP_PROJECT_VOLUMES = (
"dynamic_sampling.per_org.get_eap_project_volumes"
)
DYNAMIC_SAMPLING_COUNTERS_FETCH_PROJECTS_WITH_COUNT_PER_TRANSACTION = (
"dynamic_sampling.counters.fetch_projects_with_count_per_transaction_volumes"
)
Expand Down
113 changes: 109 additions & 4 deletions tests/sentry/dynamic_sampling/per_org/tasks/test_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
get_configuration,
)
from sentry.dynamic_sampling.per_org.tasks.queries import (
ProjectVolume,
get_eap_organization_volume,
get_eap_project_volumes,
run_eap_spans_table_query_in_chunks,
)
from sentry.dynamic_sampling.tasks.common import OrganizationDataVolume
Expand Down Expand Up @@ -79,7 +81,10 @@ def test_iterates_query_data_in_offset_chunks(self) -> None:


class EAPOrganizationVolumeTest(TestCase, SnubaTestCase, SpanTestCase):
def get_config(self, organization: Organization) -> BaseDynamicSamplingConfiguration:
def get_config(
self,
organization: Organization,
) -> BaseDynamicSamplingConfiguration:
with patch(
"sentry.dynamic_sampling.per_org.tasks.configuration.quotas.backend.get_blended_sample_rate",
return_value=1.0,
Expand Down Expand Up @@ -166,8 +171,108 @@ def test_get_eap_organization_volume_without_traffic(self) -> None:
def test_get_eap_organization_volume_without_projects(self) -> None:
organization = self.create_organization()

org_volume = get_eap_organization_volume(
self.get_config(organization), time_interval=timedelta(hours=1)
)
with patch(
"sentry.dynamic_sampling.per_org.tasks.queries.Spans.run_table_query"
) as run_table_query:
org_volume = get_eap_organization_volume(
self.get_config(organization), time_interval=timedelta(hours=1)
)

assert org_volume is None
run_table_query.assert_not_called()

def test_get_eap_project_volumes_existing_org(self) -> None:
organization = self.create_organization()
project = self.create_project(organization=organization)
other_project = self.create_project(organization=organization)
other_organization = self.create_organization()
self.create_project(organization=other_organization)

with patch(
"sentry.dynamic_sampling.per_org.tasks.queries.Spans.run_table_query",
return_value={
"data": [
{
"sentry.dsc.root_project": project.id,
"count()": 2,
"count_sample()": 2,
},
{
"sentry.dsc.root_project": other_project.id,
"count()": 1,
"count_sample()": 1,
},
]
},
) as run_table_query:
project_volumes = get_eap_project_volumes(
self.get_config(organization), time_interval=timedelta(hours=1)
)

assert sorted(project_volumes) == [
ProjectVolume(project_id=project.id, total=2, keep=2, drop=0),
ProjectVolume(project_id=other_project.id, total=1, keep=1, drop=0),
]
run_table_query.assert_called_once()
assert sorted(run_table_query.call_args.kwargs["params"].projects, key=lambda p: p.id) == [
project,
other_project,
]
assert run_table_query.call_args.kwargs["query_string"] == "is_segment:true"
assert run_table_query.call_args.kwargs["selected_columns"] == [
"sentry.dsc.root_project",
"count()",
"count_sample()",
]
assert run_table_query.call_args.kwargs["orderby"] == ["sentry.dsc.root_project"]
assert (
run_table_query.call_args.kwargs["referrer"]
== Referrer.DYNAMIC_SAMPLING_PER_ORG_GET_EAP_PROJECT_VOLUMES.value
)

def test_get_eap_project_volumes_without_traffic(self) -> None:
organization = self.create_organization()
self.create_project(organization=organization)

with patch(
"sentry.dynamic_sampling.per_org.tasks.queries.Spans.run_table_query",
return_value={"data": []},
):
project_volumes = get_eap_project_volumes(
self.get_config(organization), time_interval=timedelta(hours=1)
)

assert project_volumes == []

def test_get_eap_project_volumes_handles_none_aggregate_values(self) -> None:
organization = self.create_organization()
project = self.create_project(organization=organization)

with patch(
"sentry.dynamic_sampling.per_org.tasks.queries.Spans.run_table_query",
return_value={
"data": [
{
"sentry.dsc.root_project": project.id,
"count()": None,
"count_sample()": None,
}
]
},
):
project_volumes = get_eap_project_volumes(self.get_config(organization))

assert project_volumes == [ProjectVolume(project_id=project.id, total=0, keep=0, drop=0)]

def test_get_eap_project_volumes_without_projects(self) -> None:
organization = self.create_organization()

with patch(
"sentry.dynamic_sampling.per_org.tasks.queries.Spans.run_table_query"
) as run_table_query:
project_volumes = get_eap_project_volumes(
self.get_config(organization), time_interval=timedelta(hours=1)
)

assert project_volumes == []
run_table_query.assert_not_called()
Loading
Loading