Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 79 additions & 37 deletions sentry_sdk/_span_batcher.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import random
import threading
import time
from collections import defaultdict
from datetime import datetime, timezone
from typing import TYPE_CHECKING

from sentry_sdk._batcher import Batcher
from sentry_sdk._queue import EmptyError, Queue
from sentry_sdk.envelope import Envelope, Item, PayloadRef
from sentry_sdk.utils import format_timestamp, serialize_attribute

Expand All @@ -14,14 +17,15 @@

class SpanBatcher(Batcher["StreamedSpan"]):
# MAX_BEFORE_FLUSH should be lower than MAX_BEFORE_DROP, so that there is
# a bit of a buffer for spans that appear between setting the flush event
# a bit of a buffer for spans that appear between the trigger to flush
# and actually flushing the buffer.
#
# The max limits are all per trace.
# The max limits are all per trace (per bucket).
MAX_ENVELOPE_SIZE = 1000 # spans
MAX_BEFORE_FLUSH = 1000
MAX_BEFORE_DROP = 2000
MAX_BYTES_BEFORE_FLUSH = 5 * 1024 * 1024 # 5 MB

FLUSH_WAIT_TIME = 5.0

TYPE = "span"
Expand All @@ -45,11 +49,29 @@
self._lock = threading.Lock()
self._active: "threading.local" = threading.local()

self._flush_event: "threading.Event" = threading.Event()
self._last_full_flush: float = time.monotonic()
self._flush_queue: Queue = Queue()

self._flusher: "Optional[threading.Thread]" = None
self._flusher_pid: "Optional[int]" = None
Comment thread
sentrivana marked this conversation as resolved.

def _flush_loop(self) -> None:
Comment thread
sentrivana marked this conversation as resolved.
self._active.flag = True
while self._running:
jitter = random.random() * self.FLUSH_WAIT_TIME * 0.1
try:
trace_id = self._flush_queue.get(timeout=self.FLUSH_WAIT_TIME + jitter)
self._flush(trace_id=trace_id)
except EmptyError:
pass

if (
time.monotonic() - self._last_full_flush
>= self.FLUSH_WAIT_TIME + jitter
):
self._flush()
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Comment thread
sentrivana marked this conversation as resolved.
self._last_full_flush = time.monotonic()

def add(self, span: "StreamedSpan") -> None:
# Bail out if the current thread is already executing batcher code.
# This prevents deadlocks when code running inside the batcher (e.g.
Expand Down Expand Up @@ -79,15 +101,23 @@
self._running_size[span.trace_id] += self._estimate_size(span)

if size + 1 >= self.MAX_BEFORE_FLUSH:
self._flush_event.set()
self._flush_queue.put(span.trace_id)
return

if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH:
self._flush_event.set()
self._flush_queue.put(span.trace_id)
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Outdated
return

Check warning on line 109 in sentry_sdk/_span_batcher.py

View check run for this annotation

@sentry/warden / warden: code-review

_flush_queue grows unboundedly when same trace_id repeatedly hits flush threshold

In `add()`, every span that pushes a bucket past `MAX_BEFORE_FLUSH` or `MAX_BYTES_BEFORE_FLUSH` enqueues `span.trace_id` onto the unbounded `_flush_queue` (default `maxsize=0`). For a hot trace producing many spans before the flusher wakes, the same `trace_id` is appended repeatedly. After the bucket is flushed, the queue still contains the stale duplicates, which the flusher will dequeue and call `_flush(trace_id=...)` on—each one re-acquires the lock and no-ops, but the queue itself can accumulate without bound under load. The author's own `# XXX remove trace_id from queue` comment in `_flush` confirms this is a known gap. Under sustained high span throughput on a single trace this is an O(n) memory leak in the flush queue that lives until process exit.
finally:
self._active.flag = False

def kill(self) -> None:
if self._flusher is None:
return

self._running = False
self._flush_queue.put(None)
self._flusher = None
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Outdated

@staticmethod
def _estimate_size(item: "StreamedSpan") -> int:
# Rough estimate of serialized span size that's quick to compute.
Expand Down Expand Up @@ -128,50 +158,62 @@

return res

def _flush(self) -> None:
def _flush(self, trace_id: "Optional[str]" = None) -> None:
with self._lock:
if len(self._span_buffer) == 0:
return

if trace_id is None:
# flush whole buffer, e.g. if the SDK is shutting down
buckets = list(self._span_buffer.keys())
else:
buckets = [trace_id]

envelopes = []
for spans in self._span_buffer.values():
if spans:
dsc = spans[0]._dynamic_sampling_context()

# Max per envelope is 1000, so if we happen to have more than
# 1000 spans in one bucket, we'll need to separate them.
for start in range(0, len(spans), self.MAX_ENVELOPE_SIZE):
end = min(start + self.MAX_ENVELOPE_SIZE, len(spans))
for bucket_id in buckets:
spans = self._span_buffer.get(bucket_id)
if not spans:
continue

envelope = Envelope(
dsc = spans[0]._dynamic_sampling_context()

# Max per envelope is 1000, so if we happen to have more than
# 1000 spans in one bucket, we'll need to separate them.
for start in range(0, len(spans), self.MAX_ENVELOPE_SIZE):
end = min(start + self.MAX_ENVELOPE_SIZE, len(spans))

envelope = Envelope(
headers={
"sent_at": format_timestamp(datetime.now(timezone.utc)),
"trace": dsc,
}
)

envelope.add_item(
Item(
type=self.TYPE,
content_type=self.CONTENT_TYPE,
headers={
"sent_at": format_timestamp(datetime.now(timezone.utc)),
"trace": dsc,
}
"item_count": end - start,
},
payload=PayloadRef(
json={
"items": [
self._to_transport_format(spans[j])
for j in range(start, end)
]
}
),
)
)

envelope.add_item(
Item(
type=self.TYPE,
content_type=self.CONTENT_TYPE,
headers={
"item_count": end - start,
},
payload=PayloadRef(
json={
"items": [
self._to_transport_format(spans[j])
for j in range(start, end)
]
}
),
)
)
envelopes.append(envelope)

envelopes.append(envelope)
del self._span_buffer[bucket_id]
del self._running_size[bucket_id]

self._span_buffer.clear()
self._running_size.clear()
# XXX remove trace_id from queue

for envelope in envelopes:
self._capture_func(envelope)
Loading
Loading