diff --git a/ddtrace/debugging/_debugger.py b/ddtrace/debugging/_debugger.py index 40b13ce7163..58836780293 100644 --- a/ddtrace/debugging/_debugger.py +++ b/ddtrace/debugging/_debugger.py @@ -43,7 +43,7 @@ from ddtrace.debugging._signal.collector import SignalCollector from ddtrace.debugging._signal.model import Signal from ddtrace.debugging._signal.model import SignalState -from ddtrace.debugging._uploader import LogsIntakeUploaderV1 +from ddtrace.debugging._uploader import SignalUploader from ddtrace.debugging._uploader import UploaderProduct from ddtrace.internal import core from ddtrace.internal.logger import get_logger @@ -193,7 +193,7 @@ class Debugger(Service): _probe_meter = _probe_metrics.get_meter("probe") __rc_adapter__ = ProbeRCAdapter - __uploader__ = LogsIntakeUploaderV1 + __uploader__ = SignalUploader __watchdog__ = DebuggerModuleWatchdog __logger__ = ProbeStatusLogger diff --git a/ddtrace/debugging/_exception/replay.py b/ddtrace/debugging/_exception/replay.py index 1b3a70bbfa3..f9af26a78e4 100644 --- a/ddtrace/debugging/_exception/replay.py +++ b/ddtrace/debugging/_exception/replay.py @@ -12,7 +12,7 @@ from ddtrace.debugging._session import Session from ddtrace.debugging._signal.snapshot import DEFAULT_CAPTURE_LIMITS from ddtrace.debugging._signal.snapshot import Snapshot -from ddtrace.debugging._uploader import LogsIntakeUploaderV1 +from ddtrace.debugging._uploader import SignalUploader from ddtrace.debugging._uploader import UploaderProduct from ddtrace.internal import core from ddtrace.internal.logger import get_logger @@ -242,7 +242,7 @@ def get_snapshot_count(span: Span) -> int: class SpanExceptionHandler: - __uploader__ = LogsIntakeUploaderV1 + __uploader__ = SignalUploader _instance: t.Optional["SpanExceptionHandler"] = None diff --git a/ddtrace/debugging/_origin/span.py b/ddtrace/debugging/_origin/span.py index 819dc7db401..e247445cdb5 100644 --- a/ddtrace/debugging/_origin/span.py +++ b/ddtrace/debugging/_origin/span.py @@ -20,7 +20,7 @@ from ddtrace.debugging._session import Session from ddtrace.debugging._signal.collector import SignalCollector from ddtrace.debugging._signal.snapshot import Snapshot -from ddtrace.debugging._uploader import LogsIntakeUploaderV1 +from ddtrace.debugging._uploader import SignalUploader from ddtrace.debugging._uploader import UploaderProduct from ddtrace.ext import EXIT_SPAN_TYPES from ddtrace.internal import core @@ -197,7 +197,7 @@ def __exit__(self, exc_type, exc_value, traceback): @dataclass class SpanCodeOriginProcessorEntry: - __uploader__ = LogsIntakeUploaderV1 + __uploader__ = SignalUploader _instance: t.Optional["SpanCodeOriginProcessorEntry"] = None _handler: t.Optional[t.Callable] = None @@ -232,7 +232,7 @@ def disable(cls): @dataclass class SpanCodeOriginProcessorExit(SpanProcessor): - __uploader__ = LogsIntakeUploaderV1 + __uploader__ = SignalUploader _instance: t.Optional["SpanCodeOriginProcessorExit"] = None diff --git a/ddtrace/debugging/_uploader.py b/ddtrace/debugging/_uploader.py index 9b26465fa8c..a90a0e723d8 100644 --- a/ddtrace/debugging/_uploader.py +++ b/ddtrace/debugging/_uploader.py @@ -1,7 +1,6 @@ from dataclasses import dataclass from enum import Enum from typing import Any -from typing import Dict from typing import Optional from typing import Set from urllib.parse import quote @@ -14,14 +13,17 @@ from ddtrace.debugging._signal.collector import SignalCollector from ddtrace.debugging._signal.model import SignalTrack from ddtrace.internal import agent +from ddtrace.internal import logger from ddtrace.internal.logger import get_logger -from ddtrace.internal.periodic import ForksafeAwakeablePeriodicService from ddtrace.internal.utils.http import connector from ddtrace.internal.utils.retry import fibonacci_backoff_with_jitter -from ddtrace.internal.utils.time import HourGlass log = get_logger(__name__) +UNSUPPORTED_AGENT = "unsupported_agent" +logger.set_tag_rate_limit(UNSUPPORTED_AGENT, logger.HOUR) + + meter = metrics.get_meter("uploader") @@ -35,18 +37,26 @@ class UploaderProduct(str, Enum): @dataclass class UploaderTrack: + track: SignalTrack endpoint: str queue: SignalQueue + enabled: bool = True + +class SignalUploaderError(Exception): + """Signal uploader error.""" -class LogsIntakeUploaderV1(ForksafeAwakeablePeriodicService): - """Logs intake uploader. + pass - This class implements an interface with the debugger logs intake for both + +class SignalUploader(agent.AgentCheckPeriodicService): + """Signal uploader. + + This class implements an interface with the debugger signal intake for both the debugger and the events platform. """ - _instance: Optional["LogsIntakeUploaderV1"] = None + _instance: Optional["SignalUploader"] = None _products: Set[UploaderProduct] = set() _agent_endpoints: Set[str] = set() @@ -58,10 +68,25 @@ class LogsIntakeUploaderV1(ForksafeAwakeablePeriodicService): def __init__(self, interval: Optional[float] = None) -> None: super().__init__(interval if interval is not None else di_config.upload_interval_seconds) - self._agent_endpoints_cache: HourGlass = HourGlass(duration=60.0) + self._endpoint_suffix = endpoint_suffix = ( + f"?ddtags={quote(di_config.tags)}" if di_config._tags_in_qs and di_config.tags else "" + ) - self._tracks: Dict[SignalTrack, UploaderTrack] = {} - self.set_track_endpoints() + self._tracks = { + SignalTrack.LOGS: UploaderTrack( + track=SignalTrack.LOGS, + endpoint=f"/debugger/v1/input{endpoint_suffix}", + queue=self.__queue__( + encoder=LogSignalJsonEncoder(di_config.service_name), on_full=self._on_buffer_full + ), + ), + SignalTrack.SNAPSHOT: UploaderTrack( + track=SignalTrack.SNAPSHOT, + endpoint=f"/debugger/v2/input{endpoint_suffix}", # start optimistically + queue=self.__queue__(encoder=SnapshotJsonEncoder(di_config.service_name), on_full=self._on_buffer_full), + ), + } + self._collector = self.__collector__({t: ut.queue for t, ut in self._tracks.items()}) self._headers = { "Content-type": "application/json; charset=utf-8", "Accept": "text/plain", @@ -76,7 +101,7 @@ def __init__(self, interval: Optional[float] = None) -> None: )(self._write) log.debug( - "Logs intake uploader initialized (url: %s, endpoints: %s, interval: %f)", + "Signal uploader initialized (url: %s, endpoints: %s, interval: %f)", di_config._intake_url, {t: ut.endpoint for t, ut in self._tracks.items()}, self.interval, @@ -84,49 +109,43 @@ def __init__(self, interval: Optional[float] = None) -> None: self._flush_full = False - def set_track_endpoints(self) -> None: - if self._agent_endpoints_cache.trickling(): - return - - try: - agent_info = agent.info() - self._agent_endpoints = set(agent_info.get("endpoints", [])) if agent_info is not None else set() - except Exception: - pass # nosec B110 - finally: - self._agent_endpoints_cache.turn() - - snapshot_track = "/debugger/v1/input" - if "/debugger/v2/input" in self._agent_endpoints: - snapshot_track = "/debugger/v2/input" - elif "/debugger/v1/diagnostics" in self._agent_endpoints: - snapshot_track = "/debugger/v1/diagnostics" - - endpoint_suffix = f"?ddtags={quote(di_config.tags)}" if di_config._tags_in_qs and di_config.tags else "" - - # Only create the tracks if they don't exist to preserve the track queue metadata. - if not self._tracks: - self._tracks = { - SignalTrack.LOGS: UploaderTrack( - endpoint=f"/debugger/v1/input{endpoint_suffix}", - queue=self.__queue__( - encoder=LogSignalJsonEncoder(di_config.service_name), on_full=self._on_buffer_full - ), - ), - SignalTrack.SNAPSHOT: UploaderTrack( - endpoint=f"{snapshot_track}{endpoint_suffix}", - queue=self.__queue__( - encoder=SnapshotJsonEncoder(di_config.service_name), on_full=self._on_buffer_full - ), - ), - } + def info_check(self, agent_info: Optional[dict]) -> bool: + if agent_info is None: + # Agent is unreachable + return False + + if "endpoints" not in agent_info: + # Agent not supported + log.debug("Unsupported Datadog agent detected. Please upgrade to 7.49.0.") + return False + + endpoints = set(agent_info.get("endpoints", [])) + snapshot_track = self._tracks[SignalTrack.SNAPSHOT] + snapshot_track.enabled = True + + if "/debugger/v2/input" in endpoints: + log.debug("Detected /debugger/v2/input endpoint") + snapshot_track.endpoint = f"/debugger/v2/input{self._endpoint_suffix}" + elif "/debugger/v1/diagnostics" in endpoints: + log.debug("Detected /debugger/v1/diagnostics endpoint fallback") + snapshot_track.endpoint = f"/debugger/v1/diagnostics{self._endpoint_suffix}" else: - self._tracks[SignalTrack.SNAPSHOT].endpoint = f"{snapshot_track}{endpoint_suffix}" + snapshot_track.enabled = False + log.warning( + UNSUPPORTED_AGENT, + extra={ + "product": "debugger", + "more_info": ( + "Unsupported Datadog agent detected. Snapshots from Dynamic Instrumentation/" + "Exception Replay/Code Origin for Spans will not be uploaded. " + "Please upgrade to version 7.49.0 or later" + ), + }, + ) - self._collector = self.__collector__({t: ut.queue for t, ut in self._tracks.items()}) + return True def _write(self, payload: bytes, endpoint: str) -> None: - self.set_track_endpoints() try: with self._connect() as conn: conn.request("POST", endpoint, payload, headers=self._headers) @@ -134,9 +153,14 @@ def _write(self, payload: bytes, endpoint: str) -> None: if not (200 <= resp.status < 300): log.error("Failed to upload payload to endpoint %s: [%d] %r", endpoint, resp.status, resp.read()) meter.increment("upload.error", tags={"status": str(resp.status)}) + if 400 <= resp.status < 500: + msg = "Failed to upload payload" + raise SignalUploaderError(msg) else: meter.increment("upload.success") meter.distribution("upload.size", len(payload)) + except SignalUploaderError: + raise except Exception: log.error("Failed to write payload to endpoint %s", endpoint, exc_info=True) meter.increment("error") @@ -157,28 +181,43 @@ def reset(self) -> None: def _flush_track(self, track: UploaderTrack) -> None: queue = track.queue - payload = queue.flush() - if payload is not None: + if (payload := queue.flush()) is not None and track.enabled: try: self._write_with_backoff(payload, track.endpoint) meter.distribution("batch.cardinality", queue.count) + except SignalUploaderError: + if track.track is SignalTrack.SNAPSHOT and not track.endpoint.startswith("/debugger/v1/diagnostics"): + # Downgrade to diagnostics endpoint and retry once + track.endpoint = f"/debugger/v1/diagnostics{self._endpoint_suffix}" + log.debug("Downgrading snapshot endpoint to %s and trying again", track.endpoint) + self._write_with_backoff(payload, track.endpoint) + meter.distribution("batch.cardinality", queue.count) + else: + raise # Propagate error to transition to agent check state except Exception: log.debug("Cannot upload logs payload", exc_info=True) - def periodic(self) -> None: - """Upload the buffer content to the logs intake.""" + def online(self) -> None: + """Upload the buffer content to the agent.""" if self._flush_full: # We received the signal to flush a full buffer self._flush_full = False - for track in self._tracks.values(): - if track.queue.is_full(): - self._flush_track(track) + for uploader_track in self._tracks.values(): + if uploader_track.queue.is_full(): + self._flush_track(uploader_track) for track in self._tracks.values(): if track.queue.count: self._flush_track(track) - on_shutdown = periodic + if not self._tracks[SignalTrack.SNAPSHOT].enabled: + # If the snapshot track is not enabled, we raise an exception to + # transition back to the agent check state in case we detect an + # agent that can handle snapshots safely. + msg = "Snapshot track not enabled" + raise ValueError(msg) + + on_shutdown = online @classmethod def get_collector(cls) -> SignalCollector: diff --git a/ddtrace/debugging/uploader.plantuml b/ddtrace/debugging/uploader.plantuml new file mode 100644 index 00000000000..29eb450d910 --- /dev/null +++ b/ddtrace/debugging/uploader.plantuml @@ -0,0 +1,34 @@ +@startuml +title Signal Uploader State Machine + +state "Agent Check" as AgentCheck + +[*] -u-> AgentCheck + +AgentCheck : check available\nendpoints + +state Online { + state "v2/input" as InputV2 + state "v1/diagnostics" as DiagnosticsV1 + state "No Snapshots" as NoSnapshots + + InputV2 : use the v2/input\nendpoint + DiagnosticsV1 : fallback to v1/diagnostics\nendpoint + NoSnapshots : agent does not support\nredacted snapshots +} + +AgentCheck --> AgentCheck : no info +AgentCheck --> InputV2 : v2/input +AgentCheck --> DiagnosticsV1 : v1/diagnostics +AgentCheck --> NoSnapshots : unsupported + +InputV2 --> AgentCheck : error +DiagnosticsV1 --> AgentCheck : error +NoSnapshots --> AgentCheck : error + +NoSnapshots --> AgentCheck : periodically + +InputV2 --> InputV2 : periodically +DiagnosticsV1 --> DiagnosticsV1 : periodically + +@enduml diff --git a/ddtrace/internal/agent.py b/ddtrace/internal/agent.py index 7a024ca8f0e..c420fedb611 100644 --- a/ddtrace/internal/agent.py +++ b/ddtrace/internal/agent.py @@ -1,6 +1,9 @@ +import abc import json +import typing as t from ddtrace.internal.logger import get_logger +from ddtrace.internal.periodic import ForksafeAwakeablePeriodicService from ddtrace.settings._agent import config from .utils.http import get_connection @@ -29,3 +32,38 @@ def info(url=None): return None return json.loads(data) + + +class AgentCheckPeriodicService(ForksafeAwakeablePeriodicService, metaclass=abc.ABCMeta): + def __init__(self, interval: float = 0.0): + super().__init__(interval=interval) + + self._state = self._agent_check + + @abc.abstractmethod + def info_check(self, agent_info: t.Optional[dict]) -> bool: + ... + + def _agent_check(self) -> None: + try: + agent_info = info() + except Exception: + agent_info = None + + if self.info_check(agent_info): + self._state = self._online + self._online() + + def _online(self) -> None: + try: + self.online() + except Exception: + self._state = self._agent_check + log.debug("Error during online operation, reverting to agent check", exc_info=True) + + @abc.abstractmethod + def online(self) -> None: + ... + + def periodic(self) -> None: + return self._state() diff --git a/releasenotes/notes/chore-debugger-agent-check-uploader-5d644d20cf9b4af5.yaml b/releasenotes/notes/chore-debugger-agent-check-uploader-5d644d20cf9b4af5.yaml new file mode 100644 index 00000000000..81dfbd07623 --- /dev/null +++ b/releasenotes/notes/chore-debugger-agent-check-uploader-5d644d20cf9b4af5.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + dynamic instrumentation/exception replay/code origin for spans: added + support for the latest Datadog agent intake for snapshots. This requires a + minimum agent version of 7.49.0. diff --git a/tests/debugging/exploration/debugger.py b/tests/debugging/exploration/debugger.py index 64f3ae8768d..a1d7eed0305 100644 --- a/tests/debugging/exploration/debugger.py +++ b/tests/debugging/exploration/debugger.py @@ -22,7 +22,7 @@ from ddtrace.debugging._probe.remoteconfig import ProbePollerEvent from ddtrace.debugging._signal.collector import SignalCollector from ddtrace.debugging._signal.snapshot import Snapshot -from ddtrace.debugging._uploader import LogsIntakeUploaderV1 +from ddtrace.debugging._uploader import SignalUploader from ddtrace.internal.remoteconfig.worker import RemoteConfigPoller @@ -157,7 +157,7 @@ def probes(self) -> t.List[t.Optional[Probe]]: return self._probes or [None] -class NoopLogsIntakeUploader(LogsIntakeUploaderV1): +class NoopSignalUploader(SignalUploader): __collector__ = ExplorationSignalCollector _count = 0 @@ -184,7 +184,7 @@ def set_emitting(self, probe: Probe) -> None: class ExplorationDebugger(Debugger): __rc__ = NoopDebuggerRC - __uploader__ = NoopLogsIntakeUploader + __uploader__ = NoopSignalUploader __watchdog__ = ModuleCollector __logger__ = NoopProbeStatusLogger diff --git a/tests/debugging/live/test_live_debugger.py b/tests/debugging/live/test_live_debugger.py index 6e9e1a4019a..4b97c6476b4 100644 --- a/tests/debugging/live/test_live_debugger.py +++ b/tests/debugging/live/test_live_debugger.py @@ -5,7 +5,7 @@ from ddtrace.debugging._origin.span import SpanCodeOriginProcessorExit from ddtrace.debugging._probe.model import ProbeEvalTiming from ddtrace.internal import core -from tests.debugging.mocking import MockLogsIntakeUploaderV1 +from tests.debugging.mocking import MockSignalUploader from tests.debugging.mocking import debugger from tests.debugging.utils import create_snapshot_function_probe from tests.debugging.utils import create_trigger_function_probe @@ -13,11 +13,11 @@ class MockSpanCodeOriginProcessor(SpanCodeOriginProcessorExit): - __uploader__ = MockLogsIntakeUploaderV1 + __uploader__ = MockSignalUploader @classmethod - def get_uploader(cls) -> MockLogsIntakeUploaderV1: - return t.cast(MockLogsIntakeUploaderV1, cls.__uploader__._instance) + def get_uploader(cls) -> MockSignalUploader: + return t.cast(MockSignalUploader, cls.__uploader__._instance) class SpanProbeTestCase(TracerTestCase): diff --git a/tests/debugging/mocking.py b/tests/debugging/mocking.py index 6d916c65c04..6b339a8512f 100644 --- a/tests/debugging/mocking.py +++ b/tests/debugging/mocking.py @@ -19,7 +19,7 @@ from ddtrace.debugging._redaction import redact from ddtrace.debugging._signal.collector import SignalCollector from ddtrace.debugging._signal.snapshot import Snapshot -from ddtrace.debugging._uploader import LogsIntakeUploaderV1 +from ddtrace.debugging._uploader import SignalUploader from ddtrace.settings._core import DDConfig from tests.debugging.probe.test_status import DummyProbeStatusLogger @@ -89,12 +89,13 @@ def wait(self, cond=lambda q: q, timeout=1.0): raise PayloadWaitTimeout() -class MockLogsIntakeUploaderV1(LogsIntakeUploaderV1): +class MockSignalUploader(SignalUploader): __collector__ = TestSignalCollector def __init__(self, interval=0.0): - super(MockLogsIntakeUploaderV1, self).__init__(interval) + super(MockSignalUploader, self).__init__(interval) self.queue = [] + self._state = self._online def _write(self, payload, endpoint): self.queue.append(payload.decode()) @@ -126,7 +127,7 @@ def snapshots(self) -> List[Snapshot]: class TestDebugger(Debugger): __logger__ = MockProbeStatusLogger - __uploader__ = MockLogsIntakeUploaderV1 + __uploader__ = MockSignalUploader def add_probes(self, *probes: Probe) -> None: self._on_configuration(ProbePollerEvent.NEW_PROBES, probes) @@ -216,11 +217,11 @@ def debugger(**config_overrides: Any) -> Generator[TestDebugger, None, None]: class MockSpanExceptionHandler(SpanExceptionHandler): - __uploader__ = MockLogsIntakeUploaderV1 + __uploader__ = MockSignalUploader @contextmanager -def exception_replay(**config_overrides: Any) -> Generator[MockLogsIntakeUploaderV1, None, None]: +def exception_replay(**config_overrides: Any) -> Generator[MockSignalUploader, None, None]: MockSpanExceptionHandler.enable() handler = MockSpanExceptionHandler._instance diff --git a/tests/debugging/origin/test_span.py b/tests/debugging/origin/test_span.py index b9b3e8cb6ad..62267401480 100644 --- a/tests/debugging/origin/test_span.py +++ b/tests/debugging/origin/test_span.py @@ -7,24 +7,24 @@ from ddtrace.debugging._session import Session from ddtrace.ext import SpanTypes from ddtrace.internal import core -from tests.debugging.mocking import MockLogsIntakeUploaderV1 +from tests.debugging.mocking import MockSignalUploader from tests.utils import TracerTestCase class MockSpanCodeOriginProcessorEntry(SpanCodeOriginProcessorEntry): - __uploader__ = MockLogsIntakeUploaderV1 + __uploader__ = MockSignalUploader @classmethod - def get_uploader(cls) -> MockLogsIntakeUploaderV1: - return t.cast(MockLogsIntakeUploaderV1, cls.__uploader__._instance) + def get_uploader(cls) -> MockSignalUploader: + return t.cast(MockSignalUploader, cls.__uploader__._instance) class MockSpanCodeOriginProcessor(SpanCodeOriginProcessorExit): - __uploader__ = MockLogsIntakeUploaderV1 + __uploader__ = MockSignalUploader @classmethod - def get_uploader(cls) -> MockLogsIntakeUploaderV1: - return t.cast(MockLogsIntakeUploaderV1, cls.__uploader__._instance) + def get_uploader(cls) -> MockSignalUploader: + return t.cast(MockSignalUploader, cls.__uploader__._instance) class SpanProbeTestCase(TracerTestCase): diff --git a/tests/debugging/test_uploader.py b/tests/debugging/test_uploader.py index f9c1bb6db54..b99d7d4a698 100644 --- a/tests/debugging/test_uploader.py +++ b/tests/debugging/test_uploader.py @@ -5,7 +5,7 @@ from ddtrace.debugging._encoding import BufferFull from ddtrace.debugging._encoding import SignalQueue -from ddtrace.debugging._uploader import LogsIntakeUploaderV1 +from ddtrace.debugging._uploader import SignalUploader # DEV: Using float('inf') with lock wait intervals may cause an OverflowError @@ -13,10 +13,11 @@ LONG_INTERVAL = 2147483647.0 -class MockLogsIntakeUploaderV1(LogsIntakeUploaderV1): +class MockSignalUploader(SignalUploader): def __init__(self, *args, **kwargs): - super(MockLogsIntakeUploaderV1, self).__init__(*args, **kwargs) + super(MockSignalUploader, self).__init__(*args, **kwargs) self.queue = Queue() + self._state = self._online def _write(self, payload, endpoint): self.queue.put(payload.decode()) @@ -26,7 +27,7 @@ def payloads(self): return [json.loads(data) for data in self.queue] -class ActiveBatchJsonEncoder(MockLogsIntakeUploaderV1): +class ActiveBatchJsonEncoder(MockSignalUploader): def __init__(self, size=1 << 10, interval=1): super(ActiveBatchJsonEncoder, self).__init__(interval) @@ -69,102 +70,3 @@ def test_uploader_full_buffer(): # wakeup to mimic next interval uploader.periodic() assert uploader.queue.qsize() == 0 - - -def test_uploader_preserves_queue_metadata_on_agent_endpoint_refresh(): - """Test that track queue metadata is preserved when agent endpoints are refreshed.""" - import mock - - from ddtrace.debugging._signal.model import SignalTrack - from ddtrace.internal import agent - - # Mock agent.info to return initial endpoints - initial_agent_info = {"endpoints": ["/debugger/v1/input", "/debugger/v1/diagnostics"]} - updated_agent_info = {"endpoints": ["/debugger/v1/input", "/debugger/v2/input"]} - - with mock.patch.object(agent, "info", return_value=initial_agent_info): - uploader = MockLogsIntakeUploaderV1(interval=LONG_INTERVAL) - - # Add some data to the queues - logs_queue = uploader._tracks[SignalTrack.LOGS].queue - snapshot_queue = uploader._tracks[SignalTrack.SNAPSHOT].queue - - # Put some encoded data in the queues - logs_queue.put_encoded(None, "log_data".encode("utf-8")) - snapshot_queue.put_encoded(None, "snapshot_data".encode("utf-8")) - - # Store queue references and verify they have data - original_logs_queue = logs_queue - original_snapshot_queue = snapshot_queue - original_logs_count = logs_queue.count - original_snapshot_count = snapshot_queue.count - - assert original_logs_count > 0, "Logs queue should have data" - assert original_snapshot_count > 0, "Snapshot queue should have data" - - # Force the cache to expire by mocking trickling to return False - with mock.patch.object(uploader._agent_endpoints_cache, "trickling", return_value=False): - # Mock agent.info to return updated endpoints (v2 instead of v1 diagnostics) - with mock.patch.object(agent, "info", return_value=updated_agent_info): - # This should trigger set_track_endpoints to refresh but preserve queue metadata - uploader.set_track_endpoints() - - # Verify that the track queues are the same objects (not recreated) - assert uploader._tracks[SignalTrack.LOGS].queue is original_logs_queue - assert uploader._tracks[SignalTrack.SNAPSHOT].queue is original_snapshot_queue - - # Verify that queue counts are preserved - assert uploader._tracks[SignalTrack.LOGS].queue.count == original_logs_count - assert uploader._tracks[SignalTrack.SNAPSHOT].queue.count == original_snapshot_count - - # Verify that the endpoint was updated for snapshot track - assert "/debugger/v2/input" in uploader._tracks[SignalTrack.SNAPSHOT].endpoint - - # Verify we can still flush without BufferFull errors - uploader.periodic() - - # The data should have been uploaded - assert uploader.queue.qsize() == 2 # One payload for logs, one for snapshots - - -def test_uploader_agent_endpoint_refresh_multiple_calls(): - """Test that multiple calls to set_track_endpoints with cache expiry work correctly.""" - import mock - - from ddtrace.debugging._signal.model import SignalTrack - from ddtrace.internal import agent - - agent_responses = [ - {"endpoints": ["/debugger/v1/input"]}, - {"endpoints": ["/debugger/v1/input", "/debugger/v1/diagnostics"]}, - {"endpoints": ["/debugger/v1/input", "/debugger/v2/input"]}, - ] - - with mock.patch.object(agent, "info", return_value=agent_responses[0]): - uploader = MockLogsIntakeUploaderV1(interval=LONG_INTERVAL) - - # Add data to track buffer state - snapshot_queue = uploader._tracks[SignalTrack.SNAPSHOT].queue - snapshot_queue.put_encoded(None, "test_data".encode("utf-8")) - original_count = snapshot_queue.count - - # Track the original queue object - original_queue = snapshot_queue - - # Simulate multiple agent endpoint updates - for i, agent_response in enumerate(agent_responses[1:], 1): - with mock.patch.object(uploader._agent_endpoints_cache, "trickling", return_value=False): - with mock.patch.object(agent, "info", return_value=agent_response): - uploader.set_track_endpoints() - - # Queue should be preserved across all updates - assert uploader._tracks[SignalTrack.SNAPSHOT].queue is original_queue - assert uploader._tracks[SignalTrack.SNAPSHOT].queue.count == original_count - - # Add more data to ensure buffer state is maintained - snapshot_queue.put_encoded(None, f"test_data_{i}".encode("utf-8")) - original_count = snapshot_queue.count - - # Final verification - queue should still be functional - uploader.periodic() - assert uploader.queue.qsize() > 0