diff --git a/ddtrace/debugging/_encoding.py b/ddtrace/debugging/_encoding.py index 701809a8593..3d4ecbc0f8c 100644 --- a/ddtrace/debugging/_encoding.py +++ b/ddtrace/debugging/_encoding.py @@ -13,6 +13,7 @@ from typing import Iterator from typing import List from typing import Optional +from typing import Tuple from typing import Union from ddtrace.debugging._config import di_config @@ -74,7 +75,7 @@ def put(self, item: Any) -> int: """Enqueue the given item and returns its encoded size.""" @abc.abstractmethod - def flush(self) -> Optional[Union[bytes, bytearray]]: + def flush(self) -> Optional[Tuple[Union[bytes, bytearray], int]]: """Flush the buffer and return the encoded data.""" @@ -341,7 +342,7 @@ def put_encoded(self, item: Snapshot, encoded: bytes) -> int: self._on_full(item, encoded) raise - def flush(self) -> Optional[Union[bytes, bytearray]]: + def flush(self) -> Optional[Tuple[Union[bytes, bytearray], int]]: with self._lock: if self.count == 0: # Reclaim memory @@ -349,9 +350,10 @@ def flush(self) -> Optional[Union[bytes, bytearray]]: return None encoded = self._buffer.flush() + count = self.count self.count = 0 self._full = False - return encoded + return encoded, count def is_full(self) -> bool: with self._lock: diff --git a/ddtrace/debugging/_signal/collector.py b/ddtrace/debugging/_signal/collector.py index 1017231e863..0d6125cdc70 100644 --- a/ddtrace/debugging/_signal/collector.py +++ b/ddtrace/debugging/_signal/collector.py @@ -37,7 +37,10 @@ def __init__(self, tracks: Dict[SignalTrack, BufferedEncoder]) -> None: def _enqueue(self, log_signal: LogSignal) -> None: try: log.debug( - "[%s][P: %s] SignalCollector enqueu signal on track %s", os.getpid(), os.getppid(), log_signal.__track__ + "[%s][P: %s] SignalCollector enqueue signal on track %s", + os.getpid(), + os.getppid(), + log_signal.__track__, ) self._tracks[log_signal.__track__].put(log_signal) except BufferFull: diff --git a/ddtrace/debugging/_uploader.py b/ddtrace/debugging/_uploader.py index a90a0e723d8..bb21c570e16 100644 --- a/ddtrace/debugging/_uploader.py +++ b/ddtrace/debugging/_uploader.py @@ -180,18 +180,18 @@ def reset(self) -> None: self._collector._tracks = {t: ut.queue for t, ut in self._tracks.items()} def _flush_track(self, track: UploaderTrack) -> None: - queue = track.queue - if (payload := queue.flush()) is not None and track.enabled: + if (data := track.queue.flush()) is not None and track.enabled: + payload, count = data try: self._write_with_backoff(payload, track.endpoint) - meter.distribution("batch.cardinality", queue.count) + meter.distribution("batch.cardinality", count) except SignalUploaderError: if track.track is SignalTrack.SNAPSHOT and not track.endpoint.startswith("/debugger/v1/diagnostics"): # Downgrade to diagnostics endpoint and retry once track.endpoint = f"/debugger/v1/diagnostics{self._endpoint_suffix}" log.debug("Downgrading snapshot endpoint to %s and trying again", track.endpoint) self._write_with_backoff(payload, track.endpoint) - meter.distribution("batch.cardinality", queue.count) + meter.distribution("batch.cardinality", count) else: raise # Propagate error to transition to agent check state except Exception: diff --git a/tests/debugging/test_encoding.py b/tests/debugging/test_encoding.py index bae78362838..7604c98afe3 100644 --- a/tests/debugging/test_encoding.py +++ b/tests/debugging/test_encoding.py @@ -235,8 +235,9 @@ def test_batch_json_encoder(): for _ in range(2 * n_snapshots): queue.put(s) - count = queue.count - payload = queue.flush() + data = queue.flush() + assert data is not None + payload, count = data decoded = json.loads(payload.decode()) assert len(decoded) == count assert n_snapshots <= count + 1 # Allow for rounding errors @@ -261,13 +262,19 @@ def test_batch_flush_reencode(): queue = SignalQueue(LogSignalJsonEncoder(None)) snapshot_total_size = sum(queue.put(s) for _ in range(2)) - assert queue.count == 2 - assert len(queue.flush()) == snapshot_total_size + 3 + data = queue.flush() + assert data is not None + payload, count = data + assert count == 2 + assert len(payload) == snapshot_total_size + 3 a, b = queue.put(s), queue.put(s) assert abs(a - b) < 1024 - assert queue.count == 2 - assert len(queue.flush()) == a + b + 3 + data = queue.flush() + assert data is not None + payload, count = data + assert count == 2 + assert len(payload) == a + b + 3 # ---- Side effects ----