Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions ddtrace/debugging/_encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from typing import Iterator
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union

from ddtrace.debugging._config import di_config
Expand Down Expand Up @@ -74,7 +75,7 @@ def put(self, item: Any) -> int:
"""Enqueue the given item and returns its encoded size."""

@abc.abstractmethod
def flush(self) -> Optional[Union[bytes, bytearray]]:
def flush(self) -> Optional[Tuple[Union[bytes, bytearray], int]]:
"""Flush the buffer and return the encoded data."""


Expand Down Expand Up @@ -341,17 +342,18 @@ def put_encoded(self, item: Snapshot, encoded: bytes) -> int:
self._on_full(item, encoded)
raise

def flush(self) -> Optional[Union[bytes, bytearray]]:
def flush(self) -> Optional[Tuple[Union[bytes, bytearray], int]]:
with self._lock:
if self.count == 0:
# Reclaim memory
self._buffer._reset()
return None

encoded = self._buffer.flush()
count = self.count
self.count = 0
self._full = False
return encoded
return encoded, count

def is_full(self) -> bool:
with self._lock:
Expand Down
5 changes: 4 additions & 1 deletion ddtrace/debugging/_signal/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ def __init__(self, tracks: Dict[SignalTrack, BufferedEncoder]) -> None:
def _enqueue(self, log_signal: LogSignal) -> None:
try:
log.debug(
"[%s][P: %s] SignalCollector enqueu signal on track %s", os.getpid(), os.getppid(), log_signal.__track__
"[%s][P: %s] SignalCollector enqueue signal on track %s",
os.getpid(),
os.getppid(),
log_signal.__track__,
)
self._tracks[log_signal.__track__].put(log_signal)
except BufferFull:
Expand Down
8 changes: 4 additions & 4 deletions ddtrace/debugging/_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,18 +180,18 @@ def reset(self) -> None:
self._collector._tracks = {t: ut.queue for t, ut in self._tracks.items()}

def _flush_track(self, track: UploaderTrack) -> None:
queue = track.queue
if (payload := queue.flush()) is not None and track.enabled:
if (data := track.queue.flush()) is not None and track.enabled:
payload, count = data
try:
self._write_with_backoff(payload, track.endpoint)
meter.distribution("batch.cardinality", queue.count)
meter.distribution("batch.cardinality", count)
except SignalUploaderError:
if track.track is SignalTrack.SNAPSHOT and not track.endpoint.startswith("/debugger/v1/diagnostics"):
# Downgrade to diagnostics endpoint and retry once
track.endpoint = f"/debugger/v1/diagnostics{self._endpoint_suffix}"
log.debug("Downgrading snapshot endpoint to %s and trying again", track.endpoint)
self._write_with_backoff(payload, track.endpoint)
meter.distribution("batch.cardinality", queue.count)
meter.distribution("batch.cardinality", count)
else:
raise # Propagate error to transition to agent check state
except Exception:
Expand Down
19 changes: 13 additions & 6 deletions tests/debugging/test_encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,9 @@ def test_batch_json_encoder():
for _ in range(2 * n_snapshots):
queue.put(s)

count = queue.count
payload = queue.flush()
data = queue.flush()
assert data is not None
payload, count = data
decoded = json.loads(payload.decode())
assert len(decoded) == count
assert n_snapshots <= count + 1 # Allow for rounding errors
Expand All @@ -261,13 +262,19 @@ def test_batch_flush_reencode():
queue = SignalQueue(LogSignalJsonEncoder(None))

snapshot_total_size = sum(queue.put(s) for _ in range(2))
assert queue.count == 2
assert len(queue.flush()) == snapshot_total_size + 3
data = queue.flush()
assert data is not None
payload, count = data
assert count == 2
assert len(payload) == snapshot_total_size + 3

a, b = queue.put(s), queue.put(s)
assert abs(a - b) < 1024
assert queue.count == 2
assert len(queue.flush()) == a + b + 3
data = queue.flush()
assert data is not None
payload, count = data
assert count == 2
assert len(payload) == a + b + 3


# ---- Side effects ----
Expand Down
Loading