ref(batcher): Only flush the bucket that triggered the flush event #6168
4 issues
code-review: Found 4 issues (1 high, 1 medium, 2 low)
High
Full flush raises RuntimeError due to dict mutation during iteration - `sentry_sdk/_span_batcher.py:159-205`
When _flush() is called without a trace_id (full flush during SDK shutdown or manual sentry_sdk.flush()), buckets is assigned self._span_buffer.keys(), which is a live view. Inside the loop, del self._span_buffer[trace_id] mutates the underlying dict while iterating the key view, causing RuntimeError: dictionary changed size during iteration. This breaks shutdown flushes and manual flushes whenever more than one bucket exists, leading to lost spans.
Medium
Time-based fallback flush can be starved by busy traces, delaying other buckets indefinitely - `sentry_sdk/_span_batcher.py:55-64`
The new _flush_loop only triggers the time-based full flush (self._flush() with no args) when Queue.get(timeout=...) raises EmptyError. If one or more very busy traces continuously cross MAX_BEFORE_FLUSH/MAX_BYTES_BEFORE_FLUSH, every add() call past the threshold enqueues another trace_id, so the queue is rarely empty and EmptyError may never be raised. As a result, spans accumulated in other, less-busy buckets can sit unflushed far longer than FLUSH_WAIT_TIME, regressing the previous behavior where the time-based flush always fired on the timeout. Consider tracking a separate deadline (e.g. monotonic() last_flush) and forcing a full flush when it elapses regardless of queue activity.
Low
`add()` enqueues a trace_id on every span past the flush threshold, growing the queue without bound - `sentry_sdk/_span_batcher.py:94-100`
Once a bucket crosses MAX_BEFORE_FLUSH or MAX_BYTES_BEFORE_FLUSH, every subsequent add() for that trace (up to MAX_BEFORE_DROP) calls self._flush_queue.put(span.trace_id) again, even though one enqueued entry is sufficient to trigger the bucket flush. The flusher thread will process each duplicate, but after the first _flush(trace_id=...) deletes the bucket, the rest are no-op iterations that still consume CPU and momentarily hold self._lock. The queue is unbounded (maxsize=0), so under sustained high-rate ingestion this can also cause the flush queue itself to accumulate many duplicate entries. Consider tracking a per-bucket 'flush already requested' flag (cleared on flush) so only the first crossing enqueues the trace_id.
PR description acknowledges tests are needed but the diff adds none for the new flush behavior
The change replaces a global threading.Event-based flush trigger with a per-bucket Queue-based mechanism and adds a new kill() override and _flush_loop in SpanBatcher. Per the Sentry code review checklist, behavior changes to a critical path (span flushing) should include functional tests covering: (a) only the triggering bucket is flushed on size/byte threshold, (b) other buckets remain until the time-based flush fires, (c) kill() correctly drains via the None sentinel without double-flushing, and (d) manual flush() still flushes all buckets. No test changes are visible in this hunk; reviewers should confirm tests exist elsewhere in the PR before approving.
Duration: 1m 21s · Tokens: 126.3k in / 4.4k out · Cost: $0.83 (+merge: $0.00)
Annotations
Check failure on line 205 in sentry_sdk/_span_batcher.py
sentry-warden / warden: code-review
Full flush raises RuntimeError due to dict mutation during iteration
When `_flush()` is called without a `trace_id` (full flush during SDK shutdown or manual `sentry_sdk.flush()`), `buckets` is assigned `self._span_buffer.keys()`, which is a live view. Inside the loop, `del self._span_buffer[trace_id]` mutates the underlying dict while iterating the key view, causing `RuntimeError: dictionary changed size during iteration`. This breaks shutdown flushes and manual flushes whenever more than one bucket exists, leading to lost spans.
Check warning on line 64 in sentry_sdk/_span_batcher.py
sentry-warden / warden: code-review
Time-based fallback flush can be starved by busy traces, delaying other buckets indefinitely
The new `_flush_loop` only triggers the time-based full flush (`self._flush()` with no args) when `Queue.get(timeout=...)` raises `EmptyError`. If one or more very busy traces continuously cross `MAX_BEFORE_FLUSH`/`MAX_BYTES_BEFORE_FLUSH`, every `add()` call past the threshold enqueues another `trace_id`, so the queue is rarely empty and `EmptyError` may never be raised. As a result, spans accumulated in *other*, less-busy buckets can sit unflushed far longer than `FLUSH_WAIT_TIME`, regressing the previous behavior where the time-based flush always fired on the timeout. Consider tracking a separate deadline (e.g. `monotonic()` last_flush) and forcing a full flush when it elapses regardless of queue activity.