Skip to content
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 69 additions & 38 deletions sentry_sdk/_span_batcher.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import random
import threading
from collections import defaultdict
from datetime import datetime, timezone
from typing import TYPE_CHECKING

from sentry_sdk._batcher import Batcher
from sentry_sdk._queue import EmptyError, Queue
from sentry_sdk.envelope import Envelope, Item, PayloadRef
from sentry_sdk.utils import format_timestamp, serialize_attribute

Expand All @@ -14,10 +16,10 @@

class SpanBatcher(Batcher["StreamedSpan"]):
# MAX_BEFORE_FLUSH should be lower than MAX_BEFORE_DROP, so that there is
# a bit of a buffer for spans that appear between setting the flush event
# a bit of a buffer for spans that appear between the trigger to flush
# and actually flushing the buffer.
#
# The max limits are all per trace.
# The max limits are all per trace (per bucket).
MAX_ENVELOPE_SIZE = 1000 # spans
MAX_BEFORE_FLUSH = 1000
MAX_BEFORE_DROP = 2000
Expand Down Expand Up @@ -45,11 +47,22 @@
self._lock = threading.Lock()
self._active: "threading.local" = threading.local()

self._flush_event: "threading.Event" = threading.Event()
self._flush_queue: Queue = Queue()

self._flusher: "Optional[threading.Thread]" = None
self._flusher_pid: "Optional[int]" = None
Comment thread
sentrivana marked this conversation as resolved.

def _flush_loop(self) -> None:
Comment thread
sentrivana marked this conversation as resolved.
self._active.flag = True
while self._running:
try:
trace_id = self._flush_queue.get(
timeout=self.FLUSH_WAIT_TIME + random.random()
)
self._flush(trace_id=trace_id)
except EmptyError:
self._flush()

Check warning on line 64 in sentry_sdk/_span_batcher.py

View check run for this annotation

@sentry/warden / warden: code-review

Time-based fallback flush can be starved by busy traces, delaying other buckets indefinitely

The new `_flush_loop` only triggers the time-based full flush (`self._flush()` with no args) when `Queue.get(timeout=...)` raises `EmptyError`. If one or more very busy traces continuously cross `MAX_BEFORE_FLUSH`/`MAX_BYTES_BEFORE_FLUSH`, every `add()` call past the threshold enqueues another `trace_id`, so the queue is rarely empty and `EmptyError` may never be raised. As a result, spans accumulated in *other*, less-busy buckets can sit unflushed far longer than `FLUSH_WAIT_TIME`, regressing the previous behavior where the time-based flush always fired on the timeout. Consider tracking a separate deadline (e.g. `monotonic()` last_flush) and forcing a full flush when it elapses regardless of queue activity.
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Comment thread
sentrivana marked this conversation as resolved.

def add(self, span: "StreamedSpan") -> None:
# Bail out if the current thread is already executing batcher code.
# This prevents deadlocks when code running inside the batcher (e.g.
Expand Down Expand Up @@ -79,15 +92,23 @@
self._running_size[span.trace_id] += self._estimate_size(span)

if size + 1 >= self.MAX_BEFORE_FLUSH:
self._flush_event.set()
self._flush_queue.put(span.trace_id)
return

if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH:
self._flush_event.set()
self._flush_queue.put(span.trace_id)
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Outdated
return
finally:
self._active.flag = False

def kill(self) -> None:
if self._flusher is None:
return

self._running = False
self._flush_queue.put(None)
self._flusher = None
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Outdated

@staticmethod
def _estimate_size(item: "StreamedSpan") -> int:
# Rough estimate of serialized span size that's quick to compute.
Expand Down Expand Up @@ -128,50 +149,60 @@

return res

def _flush(self) -> None:
def _flush(self, trace_id: "Optional[str]" = None) -> None:
with self._lock:
if len(self._span_buffer) == 0:
return

if trace_id is None:
# flush whole buffer, e.g. if the SDK is shutting down
buckets = self._span_buffer.keys()
Comment thread
sentrivana marked this conversation as resolved.
Outdated
else:
buckets = [trace_id]

envelopes = []
for spans in self._span_buffer.values():
if spans:
dsc = spans[0]._dynamic_sampling_context()

# Max per envelope is 1000, so if we happen to have more than
# 1000 spans in one bucket, we'll need to separate them.
for start in range(0, len(spans), self.MAX_ENVELOPE_SIZE):
end = min(start + self.MAX_ENVELOPE_SIZE, len(spans))
for trace_id in buckets:
spans = self._span_buffer.get(trace_id)
if not spans:
continue

envelope = Envelope(
headers={
"sent_at": format_timestamp(datetime.now(timezone.utc)),
"trace": dsc,
}
)
dsc = spans[0]._dynamic_sampling_context()

# Max per envelope is 1000, so if we happen to have more than
# 1000 spans in one bucket, we'll need to separate them.
for start in range(0, len(spans), self.MAX_ENVELOPE_SIZE):
end = min(start + self.MAX_ENVELOPE_SIZE, len(spans))

envelope = Envelope(
headers={
"sent_at": format_timestamp(datetime.now(timezone.utc)),
"trace": dsc,
}
)

envelope.add_item(
Item(
type=self.TYPE,
content_type=self.CONTENT_TYPE,
headers={
"item_count": end - start,
},
payload=PayloadRef(
json={
"items": [
self._to_transport_format(spans[j])
for j in range(start, end)
]
}
),
)
envelope.add_item(
Item(
type=self.TYPE,
content_type=self.CONTENT_TYPE,
headers={
"item_count": end - start,
},
payload=PayloadRef(
json={
"items": [
self._to_transport_format(spans[j])
for j in range(start, end)
]
}
),
)
)

envelopes.append(envelope)
envelopes.append(envelope)

self._span_buffer.clear()
self._running_size.clear()
del self._span_buffer[trace_id]
del self._running_size[trace_id]

Check failure on line 205 in sentry_sdk/_span_batcher.py

View check run for this annotation

@sentry/warden / warden: code-review

Full flush raises RuntimeError due to dict mutation during iteration

When `_flush()` is called without a `trace_id` (full flush during SDK shutdown or manual `sentry_sdk.flush()`), `buckets` is assigned `self._span_buffer.keys()`, which is a live view. Inside the loop, `del self._span_buffer[trace_id]` mutates the underlying dict while iterating the key view, causing `RuntimeError: dictionary changed size during iteration`. This breaks shutdown flushes and manual flushes whenever more than one bucket exists, leading to lost spans.

Check failure on line 205 in sentry_sdk/_span_batcher.py

View check run for this annotation

@sentry/warden / warden: find-bugs

Full flush raises RuntimeError due to dict mutation during iteration

When `_flush()` is called with `trace_id=None` (e.g. during SDK shutdown or manual `sentry_sdk.flush()`), `buckets` is assigned `self._span_buffer.keys()`, which is a live view over the dict. The loop then executes `del self._span_buffer[trace_id]` on line 204, mutating the dict while iterating its key view. Python raises `RuntimeError: dictionary changed size during iteration`, so any full flush with more than one bucket will crash and spans will be lost (and not captured) — directly breaking the documented shutdown/manual-flush path the PR claims to preserve.

for envelope in envelopes:
self._capture_func(envelope)
Loading