From f8064bb0f4b7dbd7c9d21fc3b3be00462e3d6598 Mon Sep 17 00:00:00 2001 From: Tony Le Date: Tue, 19 May 2026 17:35:47 -0400 Subject: [PATCH 1/2] ref(spans): Extract span buffer observability models --- src/sentry/spans/buffer.py | 132 ++++++++--------------- src/sentry/spans/buffer_logger.py | 117 +++++++++++++++++++- src/sentry/spans/buffer_types.py | 35 ++++++ tests/sentry/spans/test_buffer.py | 2 +- tests/sentry/spans/test_buffer_logger.py | 117 ++++++++++++++++++++ 5 files changed, 311 insertions(+), 92 deletions(-) create mode 100644 src/sentry/spans/buffer_types.py diff --git a/src/sentry/spans/buffer.py b/src/sentry/spans/buffer.py index f239999187a12e..a7a4bf9f21de11 100644 --- a/src/sentry/spans/buffer.py +++ b/src/sentry/spans/buffer.py @@ -116,7 +116,14 @@ from sentry.constants import DataCategory from sentry.models.project import Project from sentry.processing.backpressure.memory import ServiceMemory, iter_cluster_memory_usage -from sentry.spans.buffer_logger import BufferLogger, EvalshaData, emit_observability_metrics +from sentry.spans.buffer_logger import ( + BufferLogger, + DeadlineUpdateLog, + FlushSegmentLog, + ProcessSpansObservability, + SubsegmentDebugLog, +) +from sentry.spans.buffer_types import EvalshaResult from sentry.spans.consumers.process_segments.types import attribute_value from sentry.spans.debug_trace_logger import DebugTraceLogger from sentry.spans.segment_key import ( @@ -267,6 +274,11 @@ def _get_payload_key_index(self, segment_key: SegmentKey) -> bytes: def _get_flush_lock_key(self, segment_key: SegmentKey) -> bytes: return b"span-buf:fl:" + segment_key + def _get_debug_trace_logger(self) -> DebugTraceLogger: + if self._debug_trace_logger is None: + self._debug_trace_logger = DebugTraceLogger(self.client) + return self._debug_trace_logger + @metrics.wraps("spans.buffer.process_spans") def process_spans(self, spans: Sequence[Span], now: int): """ @@ -336,14 +348,11 @@ def process_spans(self, spans: Sequence[Span], now: int): for (project_and_trace, parent_span_id), salt, subsegment in batch: byte_count = sum(len(span.payload) for span in subsegment) - try: - if self._debug_trace_logger is None: - self._debug_trace_logger = DebugTraceLogger(self.client) - self._debug_trace_logger.log_subsegment_info( - project_and_trace, parent_span_id, subsegment - ) - except Exception: - logger.exception("process_spans: Failed to log debug trace info") + SubsegmentDebugLog( + project_and_trace=project_and_trace, + parent_span_id=parent_span_id, + subsegment=subsegment, + ).emit(self._get_debug_trace_logger) span_ids = [span.span_id for span in subsegment] is_segment_span = ( @@ -378,29 +387,15 @@ def process_spans(self, spans: Sequence[Span], now: int): with metrics.timer("spans.buffer.process_spans.update_queue"): queue_deletes: dict[bytes, set[bytes]] = {} queue_adds: dict[bytes, MutableMapping[str | bytes, int]] = {} - latency_entries: list[tuple[str, int]] = [] - latency_metrics = [] - gauge_metrics = [] - longest_evalsha_data: tuple[float, EvalshaData, EvalshaData] = ( - -1.0, - [], - [], - ) + observability = ProcessSpansObservability() assert len(result_meta) == len(results) for (project_and_trace, parent_span_id, partition, salt), result in zip( result_meta, results ): - ( - segment_key, - has_root_span, - evalsha_latency_ms, - _, - _, - ) = result - - latency_entries.append((project_and_trace, evalsha_latency_ms)) + evalsha_result = EvalshaResult.from_redis_result(result) + observability.record_evalsha_result(project_and_trace, evalsha_result) # The Kafka partition is used directly as the queue shard # so that routing is stable across rebalances. @@ -410,7 +405,7 @@ def process_spans(self, spans: Sequence[Span], now: int): # if the currently processed span is a root span, OR the buffer # already had a root span inside, use a different timeout than # usual. - if has_root_span: + if evalsha_result.has_root_span: offset = root_timeout else: offset = timeout @@ -418,57 +413,27 @@ def process_spans(self, spans: Sequence[Span], now: int): zadd_items = queue_adds.setdefault(queue_key, {}) new_deadline = now + offset - zadd_items[segment_key] = new_deadline - - # Debug logging - try: - old_deadline = None - if self._debug_trace_logger is None: - self._debug_trace_logger = DebugTraceLogger(self.client) - if self._debug_trace_logger._should_log_trace(project_and_trace): - old_deadline_score = self.client.zscore(queue_key, segment_key) - old_deadline = ( - int(old_deadline_score) if old_deadline_score is not None else None - ) + zadd_items[evalsha_result.segment_key] = new_deadline - self._debug_trace_logger.log_deadline_update( - segment_key=segment_key, - project_and_trace=project_and_trace, - old_deadline=old_deadline, - new_deadline=new_deadline, - message_timestamp=now, - has_root_span=has_root_span, - ) - except Exception: - logger.exception("process_spans: Failed to log deadline update") + DeadlineUpdateLog( + segment_key=evalsha_result.segment_key, + project_and_trace=project_and_trace, + queue_key=queue_key, + new_deadline=new_deadline, + message_timestamp=now, + has_root_span=evalsha_result.has_root_span, + ).emit(self.client, self._get_debug_trace_logger) subsegment_spans = trees[project_and_trace, parent_span_id] delete_set = queue_deletes.setdefault(queue_key, set()) - if not segment_key.endswith(salt.encode("ascii")): + if not evalsha_result.segment_key.endswith(salt.encode("ascii")): delete_set.update( self._get_span_key(project_and_trace, span.span_id) for span in subsegment_spans ) - delete_set.discard(segment_key) - - for result in results: - ( - _, - _, - evalsha_latency_ms, - evalsha_latency_metrics, - evalsha_gauge_metrics, - ) = result - latency_metrics.append(evalsha_latency_metrics) - gauge_metrics.append(evalsha_gauge_metrics) - if evalsha_latency_ms > longest_evalsha_data[0]: - longest_evalsha_data = ( - evalsha_latency_ms, - evalsha_latency_metrics, - evalsha_gauge_metrics, - ) + delete_set.discard(evalsha_result.segment_key) - self._buffer_logger.log(latency_entries) + observability.emit_evalsha_latency_log(self._buffer_logger) with self.client.pipeline(transaction=False) as p: for queue_key, adds in queue_adds.items(): @@ -488,11 +453,7 @@ def process_spans(self, spans: Sequence[Span], now: int): metrics.timing("spans.buffer.process_spans.num_is_root_spans", is_root_span_count) metrics.timing("spans.buffer.process_spans.num_subsegments", len(trees)) metrics.timing("spans.buffer.process_spans.num_evalsha_calls", len(tree_items)) - - try: - emit_observability_metrics(latency_metrics, gauge_metrics, longest_evalsha_data) - except Exception as e: - logger.exception("Error emitting observability metrics: %s", e) + observability.emit_observability_metrics() def _ensure_script(self) -> str: """ @@ -710,20 +671,15 @@ def flush_segments(self, now: int) -> dict[SegmentKey, FlushedSegment]: ) num_has_root_spans += int(has_root_span) - try: - if self._debug_trace_logger is None: - self._debug_trace_logger = DebugTraceLogger(self.client) - self._debug_trace_logger.log_flush_info( - segment_key, - segment_span_id, - has_root_span, - len(segment), - shard, - queue_key, - now, - ) - except Exception: - logger.exception("flush_segments: Failed to log debug trace flush info") + FlushSegmentLog( + segment_key=segment_key, + segment_span_id=segment_span_id, + has_root_span=has_root_span, + num_spans=len(segment), + shard=shard, + queue_key=queue_key, + timestamp=now, + ).emit(self._get_debug_trace_logger) metrics.timing("spans.buffer.flush_segments.num_segments", len(return_segments)) metrics.timing("spans.buffer.flush_segments.has_root_span", num_has_root_spans) diff --git a/src/sentry/spans/buffer_logger.py b/src/sentry/spans/buffer_logger.py index 0d065a307cf89b..376e553cb08cc6 100644 --- a/src/sentry/spans/buffer_logger.py +++ b/src/sentry/spans/buffer_logger.py @@ -2,10 +2,15 @@ import logging import time -from collections.abc import Callable +from collections.abc import Callable, Sequence from typing import Any, NamedTuple, TypeVar +from sentry_redis_tools.clients import RedisCluster, StrictRedis + from sentry import options +from sentry.spans.buffer_types import EvalshaData, EvalshaResult +from sentry.spans.debug_trace_logger import DebugTraceLogger +from sentry.spans.segment_key import SegmentKey from sentry.utils import metrics logger = logging.getLogger(__name__) @@ -123,8 +128,114 @@ def log(self, entries: list[tuple[str, int]]) -> None: ) -type DataPoint = tuple[bytes, float] -type EvalshaData = list[DataPoint] +type QueueKey = bytes + + +class SubsegmentDebugLog(NamedTuple): + project_and_trace: str + parent_span_id: str + subsegment: Sequence[Any] + + def emit(self, get_debug_trace_logger: Callable[[], DebugTraceLogger]) -> None: + try: + get_debug_trace_logger().log_subsegment_info( + self.project_and_trace, self.parent_span_id, self.subsegment + ) + except Exception: + logger.exception("process_spans: Failed to log debug trace info") + + +class ProcessSpansObservability: + def __init__(self) -> None: + self._latency_entries: list[tuple[str, int]] = [] + self._latency_metrics: list[EvalshaData] = [] + self._gauge_metrics: list[EvalshaData] = [] + self._longest_evalsha_data: tuple[float, EvalshaData, EvalshaData] = ( + -1.0, + [], + [], + ) + + def record_evalsha_result(self, project_and_trace: str, result: EvalshaResult) -> None: + self._latency_entries.append((project_and_trace, result.latency_ms)) + self._latency_metrics.append(result.latency_metrics) + self._gauge_metrics.append(result.gauge_metrics) + + if result.latency_ms > self._longest_evalsha_data[0]: + self._longest_evalsha_data = ( + result.latency_ms, + result.latency_metrics, + result.gauge_metrics, + ) + + def emit_evalsha_latency_log(self, buffer_logger: BufferLogger) -> None: + buffer_logger.log(self._latency_entries) + + def emit_observability_metrics(self) -> None: + try: + emit_observability_metrics( + self._latency_metrics, + self._gauge_metrics, + self._longest_evalsha_data, + ) + except Exception as e: + logger.exception("Error emitting observability metrics: %s", e) + + +class DeadlineUpdateLog(NamedTuple): + segment_key: SegmentKey + project_and_trace: str + queue_key: QueueKey + new_deadline: int + message_timestamp: int + has_root_span: bool + + def emit( + self, + client: RedisCluster[bytes] | StrictRedis[bytes], + get_debug_trace_logger: Callable[[], DebugTraceLogger], + ) -> None: + try: + old_deadline = None + debug_trace_logger = get_debug_trace_logger() + if debug_trace_logger._should_log_trace(self.project_and_trace): + old_deadline_score = client.zscore(self.queue_key, self.segment_key) + old_deadline = int(old_deadline_score) if old_deadline_score is not None else None + + debug_trace_logger.log_deadline_update( + segment_key=self.segment_key, + project_and_trace=self.project_and_trace, + old_deadline=old_deadline, + new_deadline=self.new_deadline, + message_timestamp=self.message_timestamp, + has_root_span=self.has_root_span, + ) + except Exception: + logger.exception("process_spans: Failed to log deadline update") + + +class FlushSegmentLog(NamedTuple): + segment_key: SegmentKey + segment_span_id: str + has_root_span: bool + num_spans: int + shard: int + queue_key: QueueKey + timestamp: int + + def emit(self, get_debug_trace_logger: Callable[[], DebugTraceLogger]) -> None: + try: + get_debug_trace_logger().log_flush_info( + self.segment_key, + self.segment_span_id, + self.has_root_span, + self.num_spans, + self.shard, + self.queue_key, + self.timestamp, + ) + except Exception: + logger.exception("flush_segments: Failed to log debug trace flush info") def emit_observability_metrics( diff --git a/src/sentry/spans/buffer_types.py b/src/sentry/spans/buffer_types.py new file mode 100644 index 00000000000000..7989ae5c82f0a6 --- /dev/null +++ b/src/sentry/spans/buffer_types.py @@ -0,0 +1,35 @@ +""" +Shared value types for the span buffer. + +These types describe data passed between buffer pipeline steps. They do not +own Redis operations, logging behavior, or Django model state. +""" + +from __future__ import annotations + +from collections.abc import Sequence +from typing import Any, NamedTuple + +from sentry.spans.segment_key import SegmentKey + +type DataPoint = tuple[bytes, float] +type EvalshaData = list[DataPoint] + + +class EvalshaResult(NamedTuple): + segment_key: SegmentKey + has_root_span: bool + latency_ms: int + latency_metrics: EvalshaData + gauge_metrics: EvalshaData + + @classmethod + def from_redis_result(cls, result: Sequence[Any]) -> EvalshaResult: + ( + segment_key, + has_root_span, + latency_ms, + latency_metrics, + gauge_metrics, + ) = result + return cls(segment_key, has_root_span, latency_ms, latency_metrics, gauge_metrics) diff --git a/tests/sentry/spans/test_buffer.py b/tests/sentry/spans/test_buffer.py index e6e0cd13336637..4343138931b209 100644 --- a/tests/sentry/spans/test_buffer.py +++ b/tests/sentry/spans/test_buffer.py @@ -245,7 +245,7 @@ def test_basic(buffer: SpansBuffer, spans) -> None: assert_clean(buffer.client) -@mock.patch("sentry.spans.buffer.emit_observability_metrics") +@mock.patch("sentry.spans.buffer_logger.emit_observability_metrics") def test_observability_metrics( emit_observability_metrics: mock.MagicMock, buffer: SpansBuffer ) -> None: diff --git a/tests/sentry/spans/test_buffer_logger.py b/tests/sentry/spans/test_buffer_logger.py index aec4529c7cef53..3b2698606b821b 100644 --- a/tests/sentry/spans/test_buffer_logger.py +++ b/tests/sentry/spans/test_buffer_logger.py @@ -5,11 +5,128 @@ from sentry.spans.buffer_logger import ( BufferLogger, + DeadlineUpdateLog, + FlushSegmentLog, + ProcessSpansObservability, + SubsegmentDebugLog, emit_observability_metrics, ) +from sentry.spans.buffer_types import EvalshaResult +from sentry.spans.segment_key import SegmentKey from sentry.testutils.helpers.options import override_options +def _segment_id(project_id: int, trace_id: str, span_id: str) -> SegmentKey: + return f"span-buf:s:{{{project_id}:{trace_id}}}:{span_id}".encode("ascii") + + +def test_subsegment_debug_log_emits_debug_log() -> None: + span = mock.Mock() + debug_trace_logger = mock.Mock() + + SubsegmentDebugLog( + project_and_trace=f"1:{'a' * 32}", + parent_span_id="b" * 16, + subsegment=[span], + ).emit(lambda: debug_trace_logger) + + debug_trace_logger.log_subsegment_info.assert_called_once_with( + f"1:{'a' * 32}", "b" * 16, [span] + ) + + +def test_process_spans_observability_emits_evalsha_data() -> None: + observability = ProcessSpansObservability() + buffer_logger = mock.Mock() + latency_metrics = [(b"step", 20.0)] + gauge_metrics = [(b"gauge", 2.0)] + + observability.record_evalsha_result( + "1:" + "a" * 32, + EvalshaResult( + segment_key=_segment_id(1, "a" * 32, "a" * 16), + has_root_span=False, + latency_ms=5, + latency_metrics=[(b"step", 5.0)], + gauge_metrics=[(b"gauge", 1.0)], + ), + ) + observability.record_evalsha_result( + "1:" + "b" * 32, + EvalshaResult( + segment_key=_segment_id(1, "b" * 32, "b" * 16), + has_root_span=True, + latency_ms=20, + latency_metrics=latency_metrics, + gauge_metrics=gauge_metrics, + ), + ) + + with mock.patch("sentry.spans.buffer_logger.emit_observability_metrics") as emit_metrics: + observability.emit_evalsha_latency_log(buffer_logger) + observability.emit_observability_metrics() + + buffer_logger.log.assert_called_once_with([("1:" + "a" * 32, 5), ("1:" + "b" * 32, 20)]) + emit_metrics.assert_called_once_with( + [[(b"step", 5.0)], latency_metrics], + [[(b"gauge", 1.0)], gauge_metrics], + (20, latency_metrics, gauge_metrics), + ) + + +def test_deadline_update_log_reads_old_deadline_when_trace_is_enabled() -> None: + segment_key = _segment_id(1, "a" * 32, "b" * 16) + queue_key = b"span-buf:q:0" + client = mock.Mock() + client.zscore.return_value = 7 + debug_trace_logger = mock.Mock() + debug_trace_logger._should_log_trace.return_value = True + + DeadlineUpdateLog( + segment_key=segment_key, + project_and_trace="1:" + "a" * 32, + queue_key=queue_key, + new_deadline=11, + message_timestamp=1, + has_root_span=True, + ).emit(client, lambda: debug_trace_logger) + + client.zscore.assert_called_once_with(queue_key, segment_key) + debug_trace_logger.log_deadline_update.assert_called_once_with( + segment_key=segment_key, + project_and_trace="1:" + "a" * 32, + old_deadline=7, + new_deadline=11, + message_timestamp=1, + has_root_span=True, + ) + + +def test_flush_segment_log_emits_debug_log() -> None: + segment_key = _segment_id(1, "a" * 32, "b" * 16) + debug_trace_logger = mock.Mock() + + FlushSegmentLog( + segment_key=segment_key, + segment_span_id="b" * 16, + has_root_span=True, + num_spans=2, + shard=0, + queue_key=b"span-buf:q:0", + timestamp=11, + ).emit(lambda: debug_trace_logger) + + debug_trace_logger.log_flush_info.assert_called_once_with( + segment_key, + "b" * 16, + True, + 2, + 0, + b"span-buf:q:0", + 11, + ) + + @mock.patch("sentry.spans.buffer_logger.time") def test_accumulates_batches_and_tracks_cumulative_latency(mock_time): """ From 5b27d3d51e9689daa342f6cdc8704949bf6d4c29 Mon Sep 17 00:00:00 2001 From: Tony Le Date: Tue, 19 May 2026 17:44:32 -0400 Subject: [PATCH 2/2] better buffer observability handoff --- src/sentry/spans/buffer.py | 4 ++-- src/sentry/spans/buffer_logger.py | 7 ++++--- tests/sentry/spans/test_buffer_logger.py | 4 ++-- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/sentry/spans/buffer.py b/src/sentry/spans/buffer.py index a7a4bf9f21de11..f37f54a6f66975 100644 --- a/src/sentry/spans/buffer.py +++ b/src/sentry/spans/buffer.py @@ -433,7 +433,7 @@ def process_spans(self, spans: Sequence[Span], now: int): ) delete_set.discard(evalsha_result.segment_key) - observability.emit_evalsha_latency_log(self._buffer_logger) + self._buffer_logger.log(observability.evalsha_latency_entries) with self.client.pipeline(transaction=False) as p: for queue_key, adds in queue_adds.items(): @@ -453,7 +453,7 @@ def process_spans(self, spans: Sequence[Span], now: int): metrics.timing("spans.buffer.process_spans.num_is_root_spans", is_root_span_count) metrics.timing("spans.buffer.process_spans.num_subsegments", len(trees)) metrics.timing("spans.buffer.process_spans.num_evalsha_calls", len(tree_items)) - observability.emit_observability_metrics() + observability.emit_metrics() def _ensure_script(self) -> str: """ diff --git a/src/sentry/spans/buffer_logger.py b/src/sentry/spans/buffer_logger.py index 376e553cb08cc6..49aecaf95574ea 100644 --- a/src/sentry/spans/buffer_logger.py +++ b/src/sentry/spans/buffer_logger.py @@ -168,10 +168,11 @@ def record_evalsha_result(self, project_and_trace: str, result: EvalshaResult) - result.gauge_metrics, ) - def emit_evalsha_latency_log(self, buffer_logger: BufferLogger) -> None: - buffer_logger.log(self._latency_entries) + @property + def evalsha_latency_entries(self) -> list[tuple[str, int]]: + return self._latency_entries - def emit_observability_metrics(self) -> None: + def emit_metrics(self) -> None: try: emit_observability_metrics( self._latency_metrics, diff --git a/tests/sentry/spans/test_buffer_logger.py b/tests/sentry/spans/test_buffer_logger.py index 3b2698606b821b..d9607211677c61 100644 --- a/tests/sentry/spans/test_buffer_logger.py +++ b/tests/sentry/spans/test_buffer_logger.py @@ -63,8 +63,8 @@ def test_process_spans_observability_emits_evalsha_data() -> None: ) with mock.patch("sentry.spans.buffer_logger.emit_observability_metrics") as emit_metrics: - observability.emit_evalsha_latency_log(buffer_logger) - observability.emit_observability_metrics() + buffer_logger.log(observability.evalsha_latency_entries) + observability.emit_metrics() buffer_logger.log.assert_called_once_with([("1:" + "a" * 32, 5), ("1:" + "b" * 32, 20)]) emit_metrics.assert_called_once_with(