Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/sentry/features/temporary.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,4 +465,5 @@ def register_temporary_features(manager: FeatureManager) -> None:

manager.add("projects:workflow-engine-performance-detectors", ProjectFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)

manager.add("organizations:relay-generate-billing-outcome", OrganizationFeature, FeatureHandlerStrategy.FLAGPOLE, api_expose=False)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The feature flag also needs to be exposed to Relay, search for EXPOSABLE_FEATURES

Comment thread
cursor[bot] marked this conversation as resolved.
# fmt: on
5 changes: 5 additions & 0 deletions src/sentry/ingest/billing_metrics_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class BillingTxCountMetricConsumerStrategy(ProcessingStrategy[KafkaPayload]):

span_metric_id = SPAN_METRICS_NAMES["c:spans/usage@none"]
span_is_segment_tag = str(SHARED_TAG_STRINGS["is_segment"])
billing_outcome_accepted_tag = str(SHARED_TAG_STRINGS["billing_outcome_accepted"])

def __init__(self, next_step: ProcessingStrategy[Any]) -> None:
self.__next_step = next_step
Expand Down Expand Up @@ -79,6 +80,10 @@ def _count_processed_items(self, generic_metric: GenericMetric) -> Mapping[DataC
if generic_metric["metric_id"] != self.span_metric_id:
return {}

# If relay already produced an outcome for this item, ignore it.
if generic_metric["tags"].get(self.billing_outcome_accepted_tag) == "true":
return {}

Comment thread
sentry[bot] marked this conversation as resolved.
value = generic_metric["value"]
try:
quantity = max(int(value), 0) # type: ignore[arg-type]
Expand Down
1 change: 1 addition & 0 deletions src/sentry/sentry_metrics/indexer/strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@
"has_transaction": PREFIX + 281,
"was_transaction": PREFIX + 282,
"is_segment": PREFIX + 283,
"billing_outcome_accepted": PREFIX + 284,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if you already bikeshedded on the name but billing_outcome_emitted would make slightly more sense to me (the outcome is Accepted but the thing relay does is emitting it).

# GENERAL/MISC (don't have a category)
"": PREFIX + 1000,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
SPAN_METRICS_NAMES,
TRANSACTION_METRICS_NAMES,
)
from sentry.testutils.helpers.features import with_feature
from sentry.utils.outcomes import Outcome


Expand Down Expand Up @@ -141,3 +142,103 @@ def generate_kafka_message(generic_metric: GenericMetric) -> Message[KafkaPayloa

strategy.join()
assert next_step.join.call_count == 1


@with_feature("organizations:relay-generate-billing-outcome")

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
@with_feature("organizations:relay-generate-billing-outcome")

Mentioning the feature here should not be necessary because the consumer-side behavior is determined by the tag, not the feature flag, right? This might also be what confused sentry[bot] in their comment above.

@mock.patch("sentry.ingest.billing_metrics_consumer.track_outcome")
def test_no_double_billing_outcomes(track_outcome) -> None:
topic = Topic("snuba-generic-metrics")

organization = 123
project_1 = 56789

span_usage_mri = "c:spans/usage@none"
span_usage_id = SPAN_METRICS_NAMES[span_usage_mri]

generic_metrics: list[GenericMetric] = [
{
"mapping_meta": {"c": {str(span_usage_id): span_usage_mri}},
"metric_id": span_usage_id,
"type": "d",
"org_id": organization,
"project_id": project_1,
"timestamp": 123456,
"value": 65.0,
"tags": {str(SHARED_TAG_STRINGS["billing_outcome_accepted"]): "true"},
"use_case_id": "spans",
"retention_days": 90,
},
{
"mapping_meta": {"c": {str(span_usage_id): span_usage_mri}},
"metric_id": span_usage_id,
"type": "d",
"org_id": organization,
"project_id": project_1,
"timestamp": 123456,
"value": 12.0,
"tags": {str(SHARED_TAG_STRINGS["is_segment"]): "true"},
"use_case_id": "spans",
"retention_days": 90,
},
]

next_step = mock.MagicMock()

strategy = BillingTxCountMetricConsumerStrategy(
next_step=next_step,
)

generate_kafka_message_counter = 0

def generate_kafka_message(generic_metric: GenericMetric) -> Message[KafkaPayload]:
nonlocal generate_kafka_message_counter

encoded = orjson.dumps(generic_metric)
payload = KafkaPayload(key=None, value=encoded, headers=[])
message = Message(
BrokerValue(
payload,
Partition(topic, index=0),
generate_kafka_message_counter,
datetime.now(timezone.utc),
)
)
generate_kafka_message_counter += 1
return message

# Mimick the behavior of StreamProcessor._run_once: Call poll repeatedly,
# then call submit when there is a message.
strategy.poll()
strategy.poll()
assert track_outcome.call_count == 0
for generic_metric in generic_metrics:
strategy.poll()
strategy.submit(generate_kafka_message(generic_metric))

assert track_outcome.mock_calls == [
mock.call(
org_id=organization,
project_id=project_1,
key_id=None,
outcome=Outcome.ACCEPTED,
reason=None,
timestamp=mock.ANY,
event_id=None,
category=DataCategory.SPAN,
quantity=12,
),
mock.call(
org_id=organization,
project_id=project_1,
key_id=None,
outcome=Outcome.ACCEPTED,
reason=None,
timestamp=mock.ANY,
event_id=None,
category=DataCategory.TRANSACTION,
quantity=12,
),
]

strategy.join()
assert next_step.join.call_count == 1
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from hashlib import sha256

LOCKED_FILE = "src/sentry/sentry_metrics/indexer/strings.py"
LOCKED_DIGEST = "f984bf497f587f7d52665714cb9a049cf9f515f0bbb288f8782cc018484e65ea"
LOCKED_DIGEST = "5f3eba88824639ef59501cbc9ae6959285f262e226de306b68da8a64d9c463fb"
MESSAGE = f"""{LOCKED_FILE} is locked.

* We have detected you made changes to this file.
Expand Down
Loading