From c9dd1508ba6c9a752953cef51857d0dba5267e47 Mon Sep 17 00:00:00 2001 From: Ryan Hall Date: Fri, 15 Nov 2024 14:26:19 -0500 Subject: [PATCH] AMLII-2166 - Add UDS Streams support to the DogStatsD client Includes full support for the unix://, unixstream://, and unixgram:// socket_path prefixes utilized by DD_DOGSTATSD_URL in preparation to support that feature. Autodetects SOCK_DGRAM vs SOCK_STREAM for users currently providing a raw socket path. --- datadog/dogstatsd/aggregator.py | 4 +- datadog/dogstatsd/base.py | 84 ++++++++++++++++-- .../dogstatsd/test_statsd_sender.py | 51 ++++++++++- tests/unit/dogstatsd/test_statsd.py | 85 ++++++++++++++----- 4 files changed, 192 insertions(+), 32 deletions(-) diff --git a/datadog/dogstatsd/aggregator.py b/datadog/dogstatsd/aggregator.py index 4a805b75e..62326be8a 100644 --- a/datadog/dogstatsd/aggregator.py +++ b/datadog/dogstatsd/aggregator.py @@ -31,8 +31,8 @@ def flush_aggregated_metrics(self): return metrics def get_context(self, name, tags): - tags_str = ",".join(tags) if tags is not None else "" - return "{}:{}".format(name, tags_str) + tags_str = u",".join(tags) if tags is not None else "" + return u"{}:{}".format(name, tags_str) def count(self, name, value, tags, rate, timestamp=0): return self.add_metric( diff --git a/datadog/dogstatsd/base.py b/datadog/dogstatsd/base.py index da9ece563..9689d8370 100644 --- a/datadog/dogstatsd/base.py +++ b/datadog/dogstatsd/base.py @@ -13,6 +13,7 @@ import os import socket import errno +import struct import threading import time from threading import Lock, RLock @@ -49,6 +50,11 @@ DEFAULT_HOST = "localhost" DEFAULT_PORT = 8125 +# Socket prefixes +UNIX_ADDRESS_SCHEME = "unix://" +UNIX_ADDRESS_DATAGRAM_SCHEME = "unixgram://" +UNIX_ADDRESS_STREAM_SCHEME = "unixstream://" + # Buffering-related values (in seconds) DEFAULT_BUFFERING_FLUSH_INTERVAL = 0.3 MIN_FLUSH_INTERVAL = 0.0001 @@ -489,6 +495,30 @@ def socket_path(self, path): self._transport = "uds" self._max_payload_size = self._max_buffer_len or UDS_OPTIMAL_PAYLOAD_LENGTH + @property + def socket(self): + return self._socket + + @socket.setter + def socket(self, new_socket): + self._socket = new_socket + if new_socket: + self._socket_kind = new_socket.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE) + else: + self._socket_kind = None + + @property + def telemetry_socket(self): + return self._telemetry_socket + + @telemetry_socket.setter + def telemetry_socket(self, t_socket): + self._telemetry_socket = t_socket + if t_socket: + self._telemetry_socket_kind = t_socket.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE) + else: + self._telemetry_socket_kind = None + def enable_background_sender(self, sender_queue_size=0, sender_queue_timeout=0): """ Use a background thread to communicate with the dogstatsd server. @@ -731,11 +761,37 @@ def _ensure_min_send_buffer_size(cls, sock, min_size=MIN_SEND_BUFFER_SIZE): @classmethod def _get_uds_socket(cls, socket_path, timeout): - sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - sock.settimeout(timeout) - cls._ensure_min_send_buffer_size(sock) - sock.connect(socket_path) - return sock + valid_socket_kinds = [socket.SOCK_DGRAM, socket.SOCK_STREAM] + if socket_path.startswith(UNIX_ADDRESS_DATAGRAM_SCHEME): + valid_socket_kinds = [socket.SOCK_DGRAM] + socket_path = socket_path[len(UNIX_ADDRESS_DATAGRAM_SCHEME):] + elif socket_path.startswith(UNIX_ADDRESS_STREAM_SCHEME): + valid_socket_kinds = [socket.SOCK_STREAM] + socket_path = socket_path[len(UNIX_ADDRESS_STREAM_SCHEME):] + elif socket_path.startswith(UNIX_ADDRESS_SCHEME): + socket_path = socket_path[len(UNIX_ADDRESS_SCHEME):] + + last_error = ValueError("Invalid socket path") + for socket_kind in valid_socket_kinds: + # py2 stores socket kinds differently than py3, determine the name independently from version + sk_name = {socket.SOCK_STREAM: "stream", socket.SOCK_DGRAM: "datagram"}[socket_kind] + + try: + sock = socket.socket(socket.AF_UNIX, socket_kind) + sock.settimeout(timeout) + cls._ensure_min_send_buffer_size(sock) + sock.connect(socket_path) + log.debug("Connected to socket %s with kind %s", socket_path, sk_name) + return sock + except Exception as e: + if sock is not None: + sock.close() + log.debug("Failed to connect to %s with kind %s: %s", socket_path, sk_name, e) + if e.errno == errno.EPROTOTYPE: + last_error = e + continue + raise e + raise last_error @classmethod def _get_udp_socket(cls, host, port, timeout): @@ -1216,14 +1272,22 @@ def _xmit_packet_with_telemetry(self, packet): self.packets_dropped_writer += 1 def _xmit_packet(self, packet, is_telemetry): + socket_kind = None try: if is_telemetry and self._dedicated_telemetry_destination(): mysocket = self.telemetry_socket or self.get_socket(telemetry=True) + socket_kind = self._telemetry_socket_kind else: # If set, use socket directly mysocket = self.socket or self.get_socket() + socket_kind = self._socket_kind - mysocket.send(packet.encode(self.encoding)) + encoded_packet = packet.encode(self.encoding) + if socket_kind == socket.SOCK_STREAM: + mysocket.sendall(struct.pack(' len(self.payloads): + payload_len = len(self.payloads) + if self._socket_kind == socket.SOCK_STREAM: + if payload_len % 2 != 0 or count > (payload_len / 2): + return None + elif count > len(self.payloads): return None out = [] for _ in range(count): - out.append(self.payloads.popleft().decode('utf-8')) + if self._socket_kind == socket.SOCK_DGRAM: + out.append(self.payloads.popleft().decode('utf-8')) + else: + length = struct.unpack('