Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 78 additions & 38 deletions sentry_sdk/_span_batcher.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import random
import threading
import time
from collections import defaultdict
from datetime import datetime, timezone
from typing import TYPE_CHECKING

from sentry_sdk._batcher import Batcher
from sentry_sdk._queue import EmptyError, Queue
from sentry_sdk.envelope import Envelope, Item, PayloadRef
from sentry_sdk.utils import format_timestamp, serialize_attribute

Expand All @@ -14,14 +17,15 @@

class SpanBatcher(Batcher["StreamedSpan"]):
# MAX_BEFORE_FLUSH should be lower than MAX_BEFORE_DROP, so that there is
# a bit of a buffer for spans that appear between setting the flush event
# a bit of a buffer for spans that appear between the trigger to flush
# and actually flushing the buffer.
#
# The max limits are all per trace.
# The max limits are all per trace (per bucket).
MAX_ENVELOPE_SIZE = 1000 # spans
MAX_BEFORE_FLUSH = 1000
MAX_BEFORE_DROP = 2000
MAX_BYTES_BEFORE_FLUSH = 5 * 1024 * 1024 # 5 MB

FLUSH_WAIT_TIME = 5.0

TYPE = "span"
Expand All @@ -45,11 +49,29 @@
self._lock = threading.Lock()
self._active: "threading.local" = threading.local()

self._flush_event: "threading.Event" = threading.Event()
self._last_full_flush: float = time.monotonic()
self._flush_queue: Queue = Queue()

self._flusher: "Optional[threading.Thread]" = None
self._flusher_pid: "Optional[int]" = None
Comment thread
sentrivana marked this conversation as resolved.

def _flush_loop(self) -> None:
Comment thread
sentrivana marked this conversation as resolved.
self._active.flag = True
while self._running:
jitter = random.random() * self.FLUSH_WAIT_TIME * 0.1
try:
trace_id = self._flush_queue.get(timeout=self.FLUSH_WAIT_TIME + jitter)
self._flush(trace_id=trace_id)
except EmptyError:
pass

if (
time.monotonic() - self._last_full_flush
>= self.FLUSH_WAIT_TIME + jitter
):
self._flush()
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Comment thread
sentrivana marked this conversation as resolved.
self._last_full_flush = time.monotonic()

Check warning on line 73 in sentry_sdk/_span_batcher.py

View check run for this annotation

@sentry/warden / warden: find-bugs

[29F-LA3] kill() can lose buffered spans if flush loop exits before consuming the sentinel (additional location)

kill() sets self._running = False and then puts None on the flush queue. The flush loop only performs the shutdown flush as a side effect of consuming a queued trace_id (or the None sentinel) and calling _flush(). However, the loop's continuation is gated by `while self._running:` checked at the top of each iteration. If kill() sets _running = False after the loop has finished an iteration but before it re-enters get(), the loop exits without ever consuming the None and without calling _flush(), so any spans still in self._span_buffer are silently dropped on shutdown. The previous Event-based implementation in the parent Batcher.kill() relied on _flush_loop calling _flush() unconditionally each iteration, which avoided this hazard.

def add(self, span: "StreamedSpan") -> None:
# Bail out if the current thread is already executing batcher code.
# This prevents deadlocks when code running inside the batcher (e.g.
Expand Down Expand Up @@ -79,15 +101,23 @@
self._running_size[span.trace_id] += self._estimate_size(span)

if size + 1 >= self.MAX_BEFORE_FLUSH:
self._flush_event.set()
self._flush_queue.put(span.trace_id)
return

if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH:
self._flush_event.set()
self._flush_queue.put(span.trace_id)
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Outdated
return
finally:
self._active.flag = False

def kill(self) -> None:
if self._flusher is None:
return

self._running = False
self._flush_queue.put(None)
self._flusher = None

Check warning on line 119 in sentry_sdk/_span_batcher.py

View check run for this annotation

@sentry/warden / warden: code-review

kill() can race with _flush_loop and skip the final full flush

kill() sets self._running = False before putting the sentinel None onto _flush_queue. If the flusher thread is between iterations (just finished a _flush call and about to re-evaluate the 'while self._running' condition) when kill() runs, it will observe _running=False and exit without ever consuming the None sentinel — meaning no final full flush is performed and any remaining buffered spans are dropped on shutdown. The previous Event-based design had the same shape, but moving to a queue does not fix this and the docstring/issue explicitly aims to preserve full-flush-on-shutdown semantics. Consider performing a final self._flush() inline in kill() (or before setting _running=False) to guarantee shutdown drainage.

Check warning on line 119 in sentry_sdk/_span_batcher.py

View check run for this annotation

@sentry/warden / warden: find-bugs

kill() can lose buffered spans if flush loop exits before consuming the sentinel

kill() sets self._running = False and then puts None on the flush queue. The flush loop only performs the shutdown flush as a side effect of consuming a queued trace_id (or the None sentinel) and calling _flush(). However, the loop's continuation is gated by `while self._running:` checked at the top of each iteration. If kill() sets _running = False after the loop has finished an iteration but before it re-enters get(), the loop exits without ever consuming the None and without calling _flush(), so any spans still in self._span_buffer are silently dropped on shutdown. The previous Event-based implementation in the parent Batcher.kill() relied on _flush_loop calling _flush() unconditionally each iteration, which avoided this hazard.
Comment thread
sentry-warden[bot] marked this conversation as resolved.
Outdated

@staticmethod
def _estimate_size(item: "StreamedSpan") -> int:
# Rough estimate of serialized span size that's quick to compute.
Expand Down Expand Up @@ -128,50 +158,60 @@

return res

def _flush(self) -> None:
def _flush(self, trace_id: "Optional[str]" = None) -> None:
with self._lock:
if len(self._span_buffer) == 0:
return

if trace_id is None:
# flush whole buffer, e.g. if the SDK is shutting down
buckets = list(self._span_buffer.keys())
else:
buckets = [trace_id]

envelopes = []
for spans in self._span_buffer.values():
if spans:
dsc = spans[0]._dynamic_sampling_context()

# Max per envelope is 1000, so if we happen to have more than
# 1000 spans in one bucket, we'll need to separate them.
for start in range(0, len(spans), self.MAX_ENVELOPE_SIZE):
end = min(start + self.MAX_ENVELOPE_SIZE, len(spans))
for bucket_id in buckets:
spans = self._span_buffer.get(bucket_id)
if not spans:
continue

envelope = Envelope(
headers={
"sent_at": format_timestamp(datetime.now(timezone.utc)),
"trace": dsc,
}
)
dsc = spans[0]._dynamic_sampling_context()

envelope.add_item(
Item(
type=self.TYPE,
content_type=self.CONTENT_TYPE,
headers={
"item_count": end - start,
},
payload=PayloadRef(
json={
"items": [
self._to_transport_format(spans[j])
for j in range(start, end)
]
}
),
)
# Max per envelope is 1000, so if we happen to have more than
# 1000 spans in one bucket, we'll need to separate them.
for start in range(0, len(spans), self.MAX_ENVELOPE_SIZE):
end = min(start + self.MAX_ENVELOPE_SIZE, len(spans))

envelope = Envelope(
headers={
"sent_at": format_timestamp(datetime.now(timezone.utc)),
"trace": dsc,
}
)

envelope.add_item(
Item(
type=self.TYPE,
content_type=self.CONTENT_TYPE,
headers={
"item_count": end - start,
},
payload=PayloadRef(
json={
"items": [
self._to_transport_format(spans[j])
for j in range(start, end)
]
}
),
)
)

envelopes.append(envelope)
envelopes.append(envelope)

self._span_buffer.clear()
self._running_size.clear()
del self._span_buffer[bucket_id]
del self._running_size[bucket_id]

for envelope in envelopes:
self._capture_func(envelope)
Loading
Loading