Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ddtrace/debugging/_debugger.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from ddtrace.debugging._signal.collector import SignalCollector
from ddtrace.debugging._signal.model import Signal
from ddtrace.debugging._signal.model import SignalState
from ddtrace.debugging._uploader import LogsIntakeUploaderV1
from ddtrace.debugging._uploader import SignalUploader
from ddtrace.debugging._uploader import UploaderProduct
from ddtrace.internal import core
from ddtrace.internal.logger import get_logger
Expand Down Expand Up @@ -193,7 +193,7 @@ class Debugger(Service):
_probe_meter = _probe_metrics.get_meter("probe")

__rc_adapter__ = ProbeRCAdapter
__uploader__ = LogsIntakeUploaderV1
__uploader__ = SignalUploader
__watchdog__ = DebuggerModuleWatchdog
__logger__ = ProbeStatusLogger

Expand Down
4 changes: 2 additions & 2 deletions ddtrace/debugging/_exception/replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ddtrace.debugging._session import Session
from ddtrace.debugging._signal.snapshot import DEFAULT_CAPTURE_LIMITS
from ddtrace.debugging._signal.snapshot import Snapshot
from ddtrace.debugging._uploader import LogsIntakeUploaderV1
from ddtrace.debugging._uploader import SignalUploader
from ddtrace.debugging._uploader import UploaderProduct
from ddtrace.internal import core
from ddtrace.internal.logger import get_logger
Expand Down Expand Up @@ -242,7 +242,7 @@ def get_snapshot_count(span: Span) -> int:


class SpanExceptionHandler:
__uploader__ = LogsIntakeUploaderV1
__uploader__ = SignalUploader

_instance: t.Optional["SpanExceptionHandler"] = None

Expand Down
6 changes: 3 additions & 3 deletions ddtrace/debugging/_origin/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from ddtrace.debugging._session import Session
from ddtrace.debugging._signal.collector import SignalCollector
from ddtrace.debugging._signal.snapshot import Snapshot
from ddtrace.debugging._uploader import LogsIntakeUploaderV1
from ddtrace.debugging._uploader import SignalUploader
from ddtrace.debugging._uploader import UploaderProduct
from ddtrace.ext import EXIT_SPAN_TYPES
from ddtrace.internal import core
Expand Down Expand Up @@ -197,7 +197,7 @@ def __exit__(self, exc_type, exc_value, traceback):

@dataclass
class SpanCodeOriginProcessorEntry:
__uploader__ = LogsIntakeUploaderV1
__uploader__ = SignalUploader

_instance: t.Optional["SpanCodeOriginProcessorEntry"] = None
_handler: t.Optional[t.Callable] = None
Expand Down Expand Up @@ -232,7 +232,7 @@ def disable(cls):

@dataclass
class SpanCodeOriginProcessorExit(SpanProcessor):
__uploader__ = LogsIntakeUploaderV1
__uploader__ = SignalUploader

_instance: t.Optional["SpanCodeOriginProcessorExit"] = None

Expand Down
150 changes: 92 additions & 58 deletions ddtrace/debugging/_uploader.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from dataclasses import dataclass
from enum import Enum
from typing import Any
from typing import Dict
from typing import Optional
from typing import Set
from urllib.parse import quote
Expand All @@ -14,14 +13,17 @@
from ddtrace.debugging._signal.collector import SignalCollector
from ddtrace.debugging._signal.model import SignalTrack
from ddtrace.internal import agent
from ddtrace.internal import logger
from ddtrace.internal.logger import get_logger
from ddtrace.internal.periodic import ForksafeAwakeablePeriodicService
from ddtrace.internal.utils.http import connector
from ddtrace.internal.utils.retry import fibonacci_backoff_with_jitter
from ddtrace.internal.utils.time import HourGlass


log = get_logger(__name__)
UNSUPPORTED_AGENT = "unsupported_agent"
logger.set_tag_rate_limit(UNSUPPORTED_AGENT, logger.HOUR)


meter = metrics.get_meter("uploader")


Expand All @@ -37,16 +39,23 @@ class UploaderProduct(str, Enum):
class UploaderTrack:
endpoint: str
queue: SignalQueue
enabled: bool = True


class SignalUploaderError(Exception):
"""Signal uploader error."""

pass


class LogsIntakeUploaderV1(ForksafeAwakeablePeriodicService):
"""Logs intake uploader.
class SignalUploader(agent.AgentCheckPeriodicService):
"""Signal uploader.

This class implements an interface with the debugger logs intake for both
This class implements an interface with the debugger signal intake for both
the debugger and the events platform.
"""

_instance: Optional["LogsIntakeUploaderV1"] = None
_instance: Optional["SignalUploader"] = None
_products: Set[UploaderProduct] = set()
_agent_endpoints: Set[str] = set()

Expand All @@ -58,10 +67,23 @@ class LogsIntakeUploaderV1(ForksafeAwakeablePeriodicService):
def __init__(self, interval: Optional[float] = None) -> None:
super().__init__(interval if interval is not None else di_config.upload_interval_seconds)

self._agent_endpoints_cache: HourGlass = HourGlass(duration=60.0)
self._endpoint_suffix = endpoint_suffix = (
f"?ddtags={quote(di_config.tags)}" if di_config._tags_in_qs and di_config.tags else ""
)

self._tracks: Dict[SignalTrack, UploaderTrack] = {}
self.set_track_endpoints()
self._tracks = {
SignalTrack.LOGS: UploaderTrack(
endpoint=f"/debugger/v1/input{endpoint_suffix}",
queue=self.__queue__(
encoder=LogSignalJsonEncoder(di_config.service_name), on_full=self._on_buffer_full
),
),
SignalTrack.SNAPSHOT: UploaderTrack(
endpoint=f"/debugger/v2/input{endpoint_suffix}", # start optimistically
queue=self.__queue__(encoder=SnapshotJsonEncoder(di_config.service_name), on_full=self._on_buffer_full),
),
}
self._collector = self.__collector__({t: ut.queue for t, ut in self._tracks.items()})
self._headers = {
"Content-type": "application/json; charset=utf-8",
"Accept": "text/plain",
Expand All @@ -76,64 +98,67 @@ def __init__(self, interval: Optional[float] = None) -> None:
)(self._write)

log.debug(
"Logs intake uploader initialized (url: %s, endpoints: %s, interval: %f)",
"Signal uploader initialized (url: %s, endpoints: %s, interval: %f)",
di_config._intake_url,
{t: ut.endpoint for t, ut in self._tracks.items()},
self.interval,
)

self._flush_full = False

def set_track_endpoints(self) -> None:
if self._agent_endpoints_cache.trickling():
return

try:
agent_info = agent.info()
self._agent_endpoints = set(agent_info.get("endpoints", [])) if agent_info is not None else set()
except Exception:
pass # nosec B110
finally:
self._agent_endpoints_cache.turn()

snapshot_track = "/debugger/v1/input"
if "/debugger/v2/input" in self._agent_endpoints:
snapshot_track = "/debugger/v2/input"
elif "/debugger/v1/diagnostics" in self._agent_endpoints:
snapshot_track = "/debugger/v1/diagnostics"

endpoint_suffix = f"?ddtags={quote(di_config.tags)}" if di_config._tags_in_qs and di_config.tags else ""

# Only create the tracks if they don't exist to preserve the track queue metadata.
if not self._tracks:
self._tracks = {
SignalTrack.LOGS: UploaderTrack(
endpoint=f"/debugger/v1/input{endpoint_suffix}",
queue=self.__queue__(
encoder=LogSignalJsonEncoder(di_config.service_name), on_full=self._on_buffer_full
),
),
SignalTrack.SNAPSHOT: UploaderTrack(
endpoint=f"{snapshot_track}{endpoint_suffix}",
queue=self.__queue__(
encoder=SnapshotJsonEncoder(di_config.service_name), on_full=self._on_buffer_full
),
),
}
def info_check(self, agent_info: Optional[dict]) -> bool:
if agent_info is None:
# Agent is unreachable
return False

if "endpoints" not in agent_info:
# Agent not supported
log.debug("Unsupported Datadog agent detected. Please upgrade to 7.49.")
return False

endpoints = set(agent_info.get("endpoints", []))
snapshot_track = self._tracks[SignalTrack.SNAPSHOT]
snapshot_track.enabled = True

if "/debugger/v2/input" in endpoints:
log.debug("Detected /debugger/v2/input endpoint")
snapshot_track.endpoint = f"/debugger/v2/input{self._endpoint_suffix}"
elif "/debugger/v1/diagnostics" in endpoints:
log.debug("Detected /debugger/v1/diagnostics endpoint fallback")
snapshot_track.endpoint = f"/debugger/v1/diagnostics{self._endpoint_suffix}"
else:
self._tracks[SignalTrack.SNAPSHOT].endpoint = f"{snapshot_track}{endpoint_suffix}"
snapshot_track.enabled = False
log.warning(
UNSUPPORTED_AGENT,
extra={
"product": "debugger",
"more_info": (
": Unsupported Datadog agent detected. Snapshots from Dynamic Instrumentation/"
"Exception Replay/Code Origin for Spans will not be uploaded. "
"Please upgrade to version 7.49.0 or later"
),
},
)

self._collector = self.__collector__({t: ut.queue for t, ut in self._tracks.items()})
return True

def _write(self, payload: bytes, endpoint: str) -> None:
self.set_track_endpoints()
try:
with self._connect() as conn:
conn.request("POST", endpoint, payload, headers=self._headers)
resp = conn.getresponse()
if not (200 <= resp.status < 300):
log.error("Failed to upload payload to endpoint %s: [%d] %r", endpoint, resp.status, resp.read())
meter.increment("upload.error", tags={"status": str(resp.status)})
if 400 <= resp.status < 500:
log.error(
"Downgrading debugger endpoint after failed upload attempt to %s: [%d] %r",
endpoint,
resp.status,
resp.read(),
)
msg = "Failed to upload payload"
raise SignalUploaderError(msg)
else:
meter.increment("upload.success")
meter.distribution("upload.size", len(payload))
Expand All @@ -157,28 +182,37 @@ def reset(self) -> None:

def _flush_track(self, track: UploaderTrack) -> None:
queue = track.queue
payload = queue.flush()
if payload is not None:
if (payload := queue.flush()) is not None and track.enabled:
try:
self._write_with_backoff(payload, track.endpoint)
meter.distribution("batch.cardinality", queue.count)
except SignalUploaderError:
raise # Propagate error to transition to agent check state
except Exception:
log.debug("Cannot upload logs payload", exc_info=True)

def periodic(self) -> None:
"""Upload the buffer content to the logs intake."""
def online(self) -> None:
"""Upload the buffer content to the agent."""
if self._flush_full:
# We received the signal to flush a full buffer
self._flush_full = False
for track in self._tracks.values():
if track.queue.is_full():
self._flush_track(track)
for signal_track, uploader_track in self._tracks.items():
if uploader_track.queue.is_full():
try:
self._flush_track(uploader_track)
except SignalUploaderError:
if signal_track is SignalTrack.SNAPSHOT:
uploader_track.endpoint = f"/debugger/v1/diagnostics{self._endpoint_suffix}"
log.debug("Downgrading snapshot endpoint to %s", uploader_track.endpoint)
# Try again immediately. If this fails for the same
# reason we transition to agent check state
self._flush_track(uploader_track)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't the track be empty after the call to queue.flush() since the buffer is emptied?


for track in self._tracks.values():
if track.queue.count:
self._flush_track(track)

on_shutdown = periodic
on_shutdown = online

@classmethod
def get_collector(cls) -> SignalCollector:
Expand Down
38 changes: 38 additions & 0 deletions ddtrace/internal/agent.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import abc
import json
import typing as t

from ddtrace.internal.logger import get_logger
from ddtrace.internal.periodic import ForksafeAwakeablePeriodicService
from ddtrace.settings._agent import config

from .utils.http import get_connection
Expand Down Expand Up @@ -29,3 +32,38 @@ def info(url=None):
return None

return json.loads(data)


class AgentCheckPeriodicService(ForksafeAwakeablePeriodicService, metaclass=abc.ABCMeta):
def __init__(self, interval: float = 0.0):
super().__init__(interval=interval)

self._state = self._agent_check

@abc.abstractmethod
def info_check(self, agent_info: t.Optional[dict]) -> bool:
...

def _agent_check(self) -> None:
try:
agent_info = info()
except Exception:
agent_info = None

if self.info_check(agent_info):
self._state = self._online
self._online()

def _online(self) -> None:
try:
self.online()
except Exception:
self._state = self._agent_check
log.debug("Error during online operation, reverting to agent check", exc_info=True)

@abc.abstractmethod
def online(self) -> None:
...

def periodic(self) -> None:
return self._state()
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
features:
- |
dynamic instrumentation/exception replay/code origin for spans: added
support for the latest Datadog agent intake for snapshots. This requires a
minimum agent version of 7.49.0.
6 changes: 3 additions & 3 deletions tests/debugging/exploration/debugger.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from ddtrace.debugging._probe.remoteconfig import ProbePollerEvent
from ddtrace.debugging._signal.collector import SignalCollector
from ddtrace.debugging._signal.snapshot import Snapshot
from ddtrace.debugging._uploader import LogsIntakeUploaderV1
from ddtrace.debugging._uploader import SignalUploader
from ddtrace.internal.remoteconfig.worker import RemoteConfigPoller


Expand Down Expand Up @@ -157,7 +157,7 @@ def probes(self) -> t.List[t.Optional[Probe]]:
return self._probes or [None]


class NoopLogsIntakeUploader(LogsIntakeUploaderV1):
class NoopSignalUploader(SignalUploader):
__collector__ = ExplorationSignalCollector
_count = 0

Expand All @@ -184,7 +184,7 @@ def set_emitting(self, probe: Probe) -> None:

class ExplorationDebugger(Debugger):
__rc__ = NoopDebuggerRC
__uploader__ = NoopLogsIntakeUploader
__uploader__ = NoopSignalUploader
__watchdog__ = ModuleCollector
__logger__ = NoopProbeStatusLogger

Expand Down
Loading
Loading