Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 44 additions & 88 deletions src/sentry/spans/buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,14 @@
from sentry.constants import DataCategory
from sentry.models.project import Project
from sentry.processing.backpressure.memory import ServiceMemory, iter_cluster_memory_usage
from sentry.spans.buffer_logger import BufferLogger, EvalshaData, emit_observability_metrics
from sentry.spans.buffer_logger import (
BufferLogger,
DeadlineUpdateLog,
FlushSegmentLog,
ProcessSpansObservability,
SubsegmentDebugLog,
)
from sentry.spans.buffer_types import EvalshaResult
from sentry.spans.consumers.process_segments.types import attribute_value
from sentry.spans.debug_trace_logger import DebugTraceLogger
from sentry.spans.segment_key import (
Expand Down Expand Up @@ -267,6 +274,11 @@ def _get_payload_key_index(self, segment_key: SegmentKey) -> bytes:
def _get_flush_lock_key(self, segment_key: SegmentKey) -> bytes:
return b"span-buf:fl:" + segment_key

def _get_debug_trace_logger(self) -> DebugTraceLogger:
if self._debug_trace_logger is None:
self._debug_trace_logger = DebugTraceLogger(self.client)
return self._debug_trace_logger

@metrics.wraps("spans.buffer.process_spans")
def process_spans(self, spans: Sequence[Span], now: int):
"""
Expand Down Expand Up @@ -336,14 +348,11 @@ def process_spans(self, spans: Sequence[Span], now: int):
for (project_and_trace, parent_span_id), salt, subsegment in batch:
byte_count = sum(len(span.payload) for span in subsegment)

try:
if self._debug_trace_logger is None:
self._debug_trace_logger = DebugTraceLogger(self.client)
self._debug_trace_logger.log_subsegment_info(
project_and_trace, parent_span_id, subsegment
)
except Exception:
logger.exception("process_spans: Failed to log debug trace info")
SubsegmentDebugLog(
project_and_trace=project_and_trace,
parent_span_id=parent_span_id,
subsegment=subsegment,
).emit(self._get_debug_trace_logger)

span_ids = [span.span_id for span in subsegment]
is_segment_span = (
Expand Down Expand Up @@ -378,29 +387,15 @@ def process_spans(self, spans: Sequence[Span], now: int):
with metrics.timer("spans.buffer.process_spans.update_queue"):
queue_deletes: dict[bytes, set[bytes]] = {}
queue_adds: dict[bytes, MutableMapping[str | bytes, int]] = {}
latency_entries: list[tuple[str, int]] = []
latency_metrics = []
gauge_metrics = []
longest_evalsha_data: tuple[float, EvalshaData, EvalshaData] = (
-1.0,
[],
[],
)
observability = ProcessSpansObservability()

assert len(result_meta) == len(results)

for (project_and_trace, parent_span_id, partition, salt), result in zip(
result_meta, results
):
(
segment_key,
has_root_span,
evalsha_latency_ms,
_,
_,
) = result

latency_entries.append((project_and_trace, evalsha_latency_ms))
evalsha_result = EvalshaResult.from_redis_result(result)
observability.record_evalsha_result(project_and_trace, evalsha_result)

# The Kafka partition is used directly as the queue shard
# so that routing is stable across rebalances.
Expand All @@ -410,65 +405,35 @@ def process_spans(self, spans: Sequence[Span], now: int):
# if the currently processed span is a root span, OR the buffer
# already had a root span inside, use a different timeout than
# usual.
if has_root_span:
if evalsha_result.has_root_span:
offset = root_timeout
else:
offset = timeout

zadd_items = queue_adds.setdefault(queue_key, {})

new_deadline = now + offset
zadd_items[segment_key] = new_deadline

# Debug logging
try:
old_deadline = None
if self._debug_trace_logger is None:
self._debug_trace_logger = DebugTraceLogger(self.client)
if self._debug_trace_logger._should_log_trace(project_and_trace):
old_deadline_score = self.client.zscore(queue_key, segment_key)
old_deadline = (
int(old_deadline_score) if old_deadline_score is not None else None
)
zadd_items[evalsha_result.segment_key] = new_deadline

self._debug_trace_logger.log_deadline_update(
segment_key=segment_key,
project_and_trace=project_and_trace,
old_deadline=old_deadline,
new_deadline=new_deadline,
message_timestamp=now,
has_root_span=has_root_span,
)
except Exception:
logger.exception("process_spans: Failed to log deadline update")
DeadlineUpdateLog(
segment_key=evalsha_result.segment_key,
project_and_trace=project_and_trace,
queue_key=queue_key,
new_deadline=new_deadline,
message_timestamp=now,
has_root_span=evalsha_result.has_root_span,
).emit(self.client, self._get_debug_trace_logger)

subsegment_spans = trees[project_and_trace, parent_span_id]
delete_set = queue_deletes.setdefault(queue_key, set())
if not segment_key.endswith(salt.encode("ascii")):
if not evalsha_result.segment_key.endswith(salt.encode("ascii")):
delete_set.update(
self._get_span_key(project_and_trace, span.span_id)
for span in subsegment_spans
)
delete_set.discard(segment_key)

for result in results:
(
_,
_,
evalsha_latency_ms,
evalsha_latency_metrics,
evalsha_gauge_metrics,
) = result
latency_metrics.append(evalsha_latency_metrics)
gauge_metrics.append(evalsha_gauge_metrics)
if evalsha_latency_ms > longest_evalsha_data[0]:
longest_evalsha_data = (
evalsha_latency_ms,
evalsha_latency_metrics,
evalsha_gauge_metrics,
)
delete_set.discard(evalsha_result.segment_key)

self._buffer_logger.log(latency_entries)
self._buffer_logger.log(observability.evalsha_latency_entries)

with self.client.pipeline(transaction=False) as p:
for queue_key, adds in queue_adds.items():
Expand All @@ -488,11 +453,7 @@ def process_spans(self, spans: Sequence[Span], now: int):
metrics.timing("spans.buffer.process_spans.num_is_root_spans", is_root_span_count)
metrics.timing("spans.buffer.process_spans.num_subsegments", len(trees))
metrics.timing("spans.buffer.process_spans.num_evalsha_calls", len(tree_items))

try:
emit_observability_metrics(latency_metrics, gauge_metrics, longest_evalsha_data)
except Exception as e:
logger.exception("Error emitting observability metrics: %s", e)
observability.emit_metrics()

def _ensure_script(self) -> str:
"""
Expand Down Expand Up @@ -710,20 +671,15 @@ def flush_segments(self, now: int) -> dict[SegmentKey, FlushedSegment]:
)
num_has_root_spans += int(has_root_span)

try:
if self._debug_trace_logger is None:
self._debug_trace_logger = DebugTraceLogger(self.client)
self._debug_trace_logger.log_flush_info(
segment_key,
segment_span_id,
has_root_span,
len(segment),
shard,
queue_key,
now,
)
except Exception:
logger.exception("flush_segments: Failed to log debug trace flush info")
FlushSegmentLog(
segment_key=segment_key,
segment_span_id=segment_span_id,
has_root_span=has_root_span,
num_spans=len(segment),
shard=shard,
queue_key=queue_key,
timestamp=now,
).emit(self._get_debug_trace_logger)

metrics.timing("spans.buffer.flush_segments.num_segments", len(return_segments))
metrics.timing("spans.buffer.flush_segments.has_root_span", num_has_root_spans)
Expand Down
118 changes: 115 additions & 3 deletions src/sentry/spans/buffer_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@

import logging
import time
from collections.abc import Callable
from collections.abc import Callable, Sequence
from typing import Any, NamedTuple, TypeVar

from sentry_redis_tools.clients import RedisCluster, StrictRedis

from sentry import options
from sentry.spans.buffer_types import EvalshaData, EvalshaResult
from sentry.spans.debug_trace_logger import DebugTraceLogger
from sentry.spans.segment_key import SegmentKey
from sentry.utils import metrics

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -123,8 +128,115 @@ def log(self, entries: list[tuple[str, int]]) -> None:
)


type DataPoint = tuple[bytes, float]
type EvalshaData = list[DataPoint]
type QueueKey = bytes


class SubsegmentDebugLog(NamedTuple):
project_and_trace: str
parent_span_id: str
subsegment: Sequence[Any]

def emit(self, get_debug_trace_logger: Callable[[], DebugTraceLogger]) -> None:
try:
get_debug_trace_logger().log_subsegment_info(
self.project_and_trace, self.parent_span_id, self.subsegment
)
except Exception:
logger.exception("process_spans: Failed to log debug trace info")


class ProcessSpansObservability:
def __init__(self) -> None:
self._latency_entries: list[tuple[str, int]] = []
self._latency_metrics: list[EvalshaData] = []
self._gauge_metrics: list[EvalshaData] = []
self._longest_evalsha_data: tuple[float, EvalshaData, EvalshaData] = (
-1.0,
[],
[],
)

def record_evalsha_result(self, project_and_trace: str, result: EvalshaResult) -> None:
self._latency_entries.append((project_and_trace, result.latency_ms))
self._latency_metrics.append(result.latency_metrics)
self._gauge_metrics.append(result.gauge_metrics)

if result.latency_ms > self._longest_evalsha_data[0]:
self._longest_evalsha_data = (
result.latency_ms,
result.latency_metrics,
result.gauge_metrics,
)

@property
def evalsha_latency_entries(self) -> list[tuple[str, int]]:
return self._latency_entries

def emit_metrics(self) -> None:
try:
emit_observability_metrics(
self._latency_metrics,
self._gauge_metrics,
self._longest_evalsha_data,
)
except Exception as e:
logger.exception("Error emitting observability metrics: %s", e)


class DeadlineUpdateLog(NamedTuple):
segment_key: SegmentKey
project_and_trace: str
queue_key: QueueKey
new_deadline: int
message_timestamp: int
has_root_span: bool

def emit(
self,
client: RedisCluster[bytes] | StrictRedis[bytes],
get_debug_trace_logger: Callable[[], DebugTraceLogger],
) -> None:
try:
old_deadline = None
debug_trace_logger = get_debug_trace_logger()
if debug_trace_logger._should_log_trace(self.project_and_trace):
old_deadline_score = client.zscore(self.queue_key, self.segment_key)
old_deadline = int(old_deadline_score) if old_deadline_score is not None else None

debug_trace_logger.log_deadline_update(
segment_key=self.segment_key,
project_and_trace=self.project_and_trace,
old_deadline=old_deadline,
new_deadline=self.new_deadline,
message_timestamp=self.message_timestamp,
has_root_span=self.has_root_span,
)
except Exception:
logger.exception("process_spans: Failed to log deadline update")


class FlushSegmentLog(NamedTuple):
segment_key: SegmentKey
segment_span_id: str
has_root_span: bool
num_spans: int
shard: int
queue_key: QueueKey
timestamp: int

def emit(self, get_debug_trace_logger: Callable[[], DebugTraceLogger]) -> None:
try:
get_debug_trace_logger().log_flush_info(
self.segment_key,
self.segment_span_id,
self.has_root_span,
self.num_spans,
self.shard,
self.queue_key,
self.timestamp,
)
except Exception:
logger.exception("flush_segments: Failed to log debug trace flush info")


def emit_observability_metrics(
Expand Down
35 changes: 35 additions & 0 deletions src/sentry/spans/buffer_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""
Shared value types for the span buffer.

These types describe data passed between buffer pipeline steps. They do not
own Redis operations, logging behavior, or Django model state.
"""

from __future__ import annotations

from collections.abc import Sequence
from typing import Any, NamedTuple

from sentry.spans.segment_key import SegmentKey

type DataPoint = tuple[bytes, float]
type EvalshaData = list[DataPoint]


class EvalshaResult(NamedTuple):
segment_key: SegmentKey
has_root_span: bool
latency_ms: int
latency_metrics: EvalshaData
gauge_metrics: EvalshaData

@classmethod
def from_redis_result(cls, result: Sequence[Any]) -> EvalshaResult:
(
segment_key,
has_root_span,
latency_ms,
latency_metrics,
gauge_metrics,
) = result
return cls(segment_key, has_root_span, latency_ms, latency_metrics, gauge_metrics)
2 changes: 1 addition & 1 deletion tests/sentry/spans/test_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def test_basic(buffer: SpansBuffer, spans) -> None:
assert_clean(buffer.client)


@mock.patch("sentry.spans.buffer.emit_observability_metrics")
@mock.patch("sentry.spans.buffer_logger.emit_observability_metrics")
def test_observability_metrics(
emit_observability_metrics: mock.MagicMock, buffer: SpansBuffer
) -> None:
Expand Down
Loading
Loading