Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/sentry/features/temporary.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,4 +465,5 @@ def register_temporary_features(manager: FeatureManager) -> None:

manager.add("projects:workflow-engine-performance-detectors", ProjectFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)

manager.add("organizations:relay-generate-billing-outcome", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The feature flag also needs to be exposed to Relay, search for EXPOSABLE_FEATURES

Comment thread
cursor[bot] marked this conversation as resolved.
# fmt: on
5 changes: 5 additions & 0 deletions src/sentry/ingest/billing_metrics_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class BillingTxCountMetricConsumerStrategy(ProcessingStrategy[KafkaPayload]):

span_metric_id = SPAN_METRICS_NAMES["c:spans/usage@none"]
span_is_segment_tag = str(SHARED_TAG_STRINGS["is_segment"])
billing_outcome_accepted_tag = str(SHARED_TAG_STRINGS["billing_outcome_emitted"])

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The variable billing_outcome_accepted_tag is misleadingly named, as it references the string "billing_outcome_emitted". If the external service sends "billing_outcome_accepted", the double-billing prevention will fail.
Severity: CRITICAL

Suggested Fix

Verify the exact tag name sent by the corresponding Relay change. If Relay sends "billing_outcome_accepted", update the string registered in src/sentry/sentry_metrics/indexer/strings.py to match. If Relay sends "billing_outcome_emitted", rename the variable billing_outcome_accepted_tag to billing_outcome_emitted_tag for clarity.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.

Location: src/sentry/ingest/billing_metrics_consumer.py#L45

Potential issue: A new mechanism to prevent double-billing for span metrics relies on a
tag check. The variable `billing_outcome_accepted_tag` is used for this check, but it is
assigned the string value `"billing_outcome_emitted"`. The pull request description and
the variable's name imply that the tag sent by the external service (Relay) is
`"billing_outcome_accepted"`. If Relay sends a tag with this name, the consumer's check
for `"billing_outcome_emitted"` will not find it. This would cause the double-billing
prevention logic to silently fail for all metrics, defeating the purpose of the change.

Also affects:

  • src/sentry/sentry_metrics/indexer/strings.py:206~206


def __init__(self, next_step: ProcessingStrategy[Any]) -> None:
self.__next_step = next_step
Expand Down Expand Up @@ -79,6 +80,10 @@ def _count_processed_items(self, generic_metric: GenericMetric) -> Mapping[DataC
if generic_metric["metric_id"] != self.span_metric_id:
return {}

# If relay already produced an outcome for this item, ignore it.
if generic_metric["tags"].get(self.billing_outcome_accepted_tag) == "true":
return {}

Comment thread
sentry[bot] marked this conversation as resolved.
value = generic_metric["value"]
try:
quantity = max(int(value), 0) # type: ignore[arg-type]
Expand Down
1 change: 1 addition & 0 deletions src/sentry/sentry_metrics/indexer/strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@
"has_transaction": PREFIX + 281,
"was_transaction": PREFIX + 282,
"is_segment": PREFIX + 283,
"billing_outcome_emitted": PREFIX + 284,
# GENERAL/MISC (don't have a category)
"": PREFIX + 1000,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,102 @@ def generate_kafka_message(generic_metric: GenericMetric) -> Message[KafkaPayloa

strategy.join()
assert next_step.join.call_count == 1


@mock.patch("sentry.ingest.billing_metrics_consumer.track_outcome")
def test_no_double_billing_outcomes(track_outcome) -> None:
topic = Topic("snuba-generic-metrics")

organization = 123
project_1 = 56789

span_usage_mri = "c:spans/usage@none"
span_usage_id = SPAN_METRICS_NAMES[span_usage_mri]

generic_metrics: list[GenericMetric] = [
{
"mapping_meta": {"c": {str(span_usage_id): span_usage_mri}},
"metric_id": span_usage_id,
"type": "d",
"org_id": organization,
"project_id": project_1,
"timestamp": 123456,
"value": 65.0,
"tags": {str(SHARED_TAG_STRINGS["billing_outcome_emitted"]): "true"},
"use_case_id": "spans",
"retention_days": 90,
},
{
"mapping_meta": {"c": {str(span_usage_id): span_usage_mri}},
"metric_id": span_usage_id,
"type": "d",
"org_id": organization,
"project_id": project_1,
"timestamp": 123456,
"value": 12.0,
"tags": {str(SHARED_TAG_STRINGS["is_segment"]): "true"},
"use_case_id": "spans",
"retention_days": 90,
},
]

next_step = mock.MagicMock()

strategy = BillingTxCountMetricConsumerStrategy(
next_step=next_step,
)

generate_kafka_message_counter = 0

def generate_kafka_message(generic_metric: GenericMetric) -> Message[KafkaPayload]:
nonlocal generate_kafka_message_counter

encoded = orjson.dumps(generic_metric)
payload = KafkaPayload(key=None, value=encoded, headers=[])
message = Message(
BrokerValue(
payload,
Partition(topic, index=0),
generate_kafka_message_counter,
datetime.now(timezone.utc),
)
)
generate_kafka_message_counter += 1
return message

# Mimick the behavior of StreamProcessor._run_once: Call poll repeatedly,
# then call submit when there is a message.
strategy.poll()
strategy.poll()
assert track_outcome.call_count == 0
for generic_metric in generic_metrics:
strategy.poll()
strategy.submit(generate_kafka_message(generic_metric))

assert track_outcome.mock_calls == [
mock.call(
org_id=organization,
project_id=project_1,
key_id=None,
outcome=Outcome.ACCEPTED,
reason=None,
timestamp=mock.ANY,
event_id=None,
category=DataCategory.SPAN,
quantity=12,
),
mock.call(
org_id=organization,
project_id=project_1,
key_id=None,
outcome=Outcome.ACCEPTED,
reason=None,
timestamp=mock.ANY,
event_id=None,
category=DataCategory.TRANSACTION,
quantity=12,
),
]

strategy.join()
assert next_step.join.call_count == 1
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from hashlib import sha256

LOCKED_FILE = "src/sentry/sentry_metrics/indexer/strings.py"
LOCKED_DIGEST = "f984bf497f587f7d52665714cb9a049cf9f515f0bbb288f8782cc018484e65ea"
LOCKED_DIGEST = "9a2c7aeb2abf0c8ce704504201073df1d4776ff2d74d7a7af349bfc0198dedb7"
MESSAGE = f"""{LOCKED_FILE} is locked.

* We have detected you made changes to this file.
Expand Down
Loading