Skip to content

Commit 5b4a26c

Browse files
authored
chore(debugger): fix issue with re-initializing uploader tracks (#14635)
`set_track_endpoints` was recently introduced to protect against scenarios where the agent version can change over time (#14568). Unfortunately, this introduced a bug where the track queue would be reset such that the counter would look empty but the buffer would fill, leading to unrecoverable BufferFull errors. This fixes that by preserving the queue metadata when the cache is reset. refs: DEBUG-4516
1 parent 84bb122 commit 5b4a26c

File tree

2 files changed

+119
-11
lines changed

2 files changed

+119
-11
lines changed

ddtrace/debugging/_uploader.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from dataclasses import dataclass
22
from enum import Enum
33
from typing import Any
4+
from typing import Dict
45
from typing import Optional
56
from typing import Set
67
from urllib.parse import quote
@@ -59,6 +60,7 @@ def __init__(self, interval: Optional[float] = None) -> None:
5960

6061
self._agent_endpoints_cache: HourGlass = HourGlass(duration=60.0)
6162

63+
self._tracks: Dict[SignalTrack, UploaderTrack] = {}
6264
self.set_track_endpoints()
6365
self._headers = {
6466
"Content-type": "application/json; charset=utf-8",
@@ -102,18 +104,25 @@ def set_track_endpoints(self) -> None:
102104

103105
endpoint_suffix = f"?ddtags={quote(di_config.tags)}" if di_config._tags_in_qs and di_config.tags else ""
104106

105-
self._tracks = {
106-
SignalTrack.LOGS: UploaderTrack(
107-
endpoint=f"/debugger/v1/input{endpoint_suffix}",
108-
queue=self.__queue__(
109-
encoder=LogSignalJsonEncoder(di_config.service_name), on_full=self._on_buffer_full
107+
# Only create the tracks if they don't exist to preserve the track queue metadata.
108+
if not self._tracks:
109+
self._tracks = {
110+
SignalTrack.LOGS: UploaderTrack(
111+
endpoint=f"/debugger/v1/input{endpoint_suffix}",
112+
queue=self.__queue__(
113+
encoder=LogSignalJsonEncoder(di_config.service_name), on_full=self._on_buffer_full
114+
),
110115
),
111-
),
112-
SignalTrack.SNAPSHOT: UploaderTrack(
113-
endpoint=f"{snapshot_track}{endpoint_suffix}",
114-
queue=self.__queue__(encoder=SnapshotJsonEncoder(di_config.service_name), on_full=self._on_buffer_full),
115-
),
116-
}
116+
SignalTrack.SNAPSHOT: UploaderTrack(
117+
endpoint=f"{snapshot_track}{endpoint_suffix}",
118+
queue=self.__queue__(
119+
encoder=SnapshotJsonEncoder(di_config.service_name), on_full=self._on_buffer_full
120+
),
121+
),
122+
}
123+
else:
124+
self._tracks[SignalTrack.SNAPSHOT].endpoint = f"{snapshot_track}{endpoint_suffix}"
125+
117126
self._collector = self.__collector__({t: ut.queue for t, ut in self._tracks.items()})
118127

119128
def _write(self, payload: bytes, endpoint: str) -> None:

tests/debugging/test_uploader.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,102 @@ def test_uploader_full_buffer():
6969
# wakeup to mimic next interval
7070
uploader.periodic()
7171
assert uploader.queue.qsize() == 0
72+
73+
74+
def test_uploader_preserves_queue_metadata_on_agent_endpoint_refresh():
75+
"""Test that track queue metadata is preserved when agent endpoints are refreshed."""
76+
import mock
77+
78+
from ddtrace.debugging._signal.model import SignalTrack
79+
from ddtrace.internal import agent
80+
81+
# Mock agent.info to return initial endpoints
82+
initial_agent_info = {"endpoints": ["/debugger/v1/input", "/debugger/v1/diagnostics"]}
83+
updated_agent_info = {"endpoints": ["/debugger/v1/input", "/debugger/v2/input"]}
84+
85+
with mock.patch.object(agent, "info", return_value=initial_agent_info):
86+
uploader = MockLogsIntakeUploaderV1(interval=LONG_INTERVAL)
87+
88+
# Add some data to the queues
89+
logs_queue = uploader._tracks[SignalTrack.LOGS].queue
90+
snapshot_queue = uploader._tracks[SignalTrack.SNAPSHOT].queue
91+
92+
# Put some encoded data in the queues
93+
logs_queue.put_encoded(None, "log_data".encode("utf-8"))
94+
snapshot_queue.put_encoded(None, "snapshot_data".encode("utf-8"))
95+
96+
# Store queue references and verify they have data
97+
original_logs_queue = logs_queue
98+
original_snapshot_queue = snapshot_queue
99+
original_logs_count = logs_queue.count
100+
original_snapshot_count = snapshot_queue.count
101+
102+
assert original_logs_count > 0, "Logs queue should have data"
103+
assert original_snapshot_count > 0, "Snapshot queue should have data"
104+
105+
# Force the cache to expire by mocking trickling to return False
106+
with mock.patch.object(uploader._agent_endpoints_cache, "trickling", return_value=False):
107+
# Mock agent.info to return updated endpoints (v2 instead of v1 diagnostics)
108+
with mock.patch.object(agent, "info", return_value=updated_agent_info):
109+
# This should trigger set_track_endpoints to refresh but preserve queue metadata
110+
uploader.set_track_endpoints()
111+
112+
# Verify that the track queues are the same objects (not recreated)
113+
assert uploader._tracks[SignalTrack.LOGS].queue is original_logs_queue
114+
assert uploader._tracks[SignalTrack.SNAPSHOT].queue is original_snapshot_queue
115+
116+
# Verify that queue counts are preserved
117+
assert uploader._tracks[SignalTrack.LOGS].queue.count == original_logs_count
118+
assert uploader._tracks[SignalTrack.SNAPSHOT].queue.count == original_snapshot_count
119+
120+
# Verify that the endpoint was updated for snapshot track
121+
assert "/debugger/v2/input" in uploader._tracks[SignalTrack.SNAPSHOT].endpoint
122+
123+
# Verify we can still flush without BufferFull errors
124+
uploader.periodic()
125+
126+
# The data should have been uploaded
127+
assert uploader.queue.qsize() == 2 # One payload for logs, one for snapshots
128+
129+
130+
def test_uploader_agent_endpoint_refresh_multiple_calls():
131+
"""Test that multiple calls to set_track_endpoints with cache expiry work correctly."""
132+
import mock
133+
134+
from ddtrace.debugging._signal.model import SignalTrack
135+
from ddtrace.internal import agent
136+
137+
agent_responses = [
138+
{"endpoints": ["/debugger/v1/input"]},
139+
{"endpoints": ["/debugger/v1/input", "/debugger/v1/diagnostics"]},
140+
{"endpoints": ["/debugger/v1/input", "/debugger/v2/input"]},
141+
]
142+
143+
with mock.patch.object(agent, "info", return_value=agent_responses[0]):
144+
uploader = MockLogsIntakeUploaderV1(interval=LONG_INTERVAL)
145+
146+
# Add data to track buffer state
147+
snapshot_queue = uploader._tracks[SignalTrack.SNAPSHOT].queue
148+
snapshot_queue.put_encoded(None, "test_data".encode("utf-8"))
149+
original_count = snapshot_queue.count
150+
151+
# Track the original queue object
152+
original_queue = snapshot_queue
153+
154+
# Simulate multiple agent endpoint updates
155+
for i, agent_response in enumerate(agent_responses[1:], 1):
156+
with mock.patch.object(uploader._agent_endpoints_cache, "trickling", return_value=False):
157+
with mock.patch.object(agent, "info", return_value=agent_response):
158+
uploader.set_track_endpoints()
159+
160+
# Queue should be preserved across all updates
161+
assert uploader._tracks[SignalTrack.SNAPSHOT].queue is original_queue
162+
assert uploader._tracks[SignalTrack.SNAPSHOT].queue.count == original_count
163+
164+
# Add more data to ensure buffer state is maintained
165+
snapshot_queue.put_encoded(None, f"test_data_{i}".encode("utf-8"))
166+
original_count = snapshot_queue.count
167+
168+
# Final verification - queue should still be functional
169+
uploader.periodic()
170+
assert uploader.queue.qsize() > 0

0 commit comments

Comments
 (0)