Skip to content

Commit

Permalink
AMLII-2166 - Add UDS Streams support to the DogStatsD client
Browse files Browse the repository at this point in the history
Includes full support for the unix://, unixstream://, and unixgram:// socket_path prefixes
utilized by DD_DOGSTATSD_URL in preparation to support that feature.

Autodetects SOCK_DGRAM vs SOCK_STREAM for users currently providing a raw socket path.
  • Loading branch information
ddrthall committed Nov 26, 2024
1 parent 362e187 commit c9dd150
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 32 deletions.
4 changes: 2 additions & 2 deletions datadog/dogstatsd/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def flush_aggregated_metrics(self):
return metrics

def get_context(self, name, tags):
tags_str = ",".join(tags) if tags is not None else ""
return "{}:{}".format(name, tags_str)
tags_str = u",".join(tags) if tags is not None else ""
return u"{}:{}".format(name, tags_str)

def count(self, name, value, tags, rate, timestamp=0):
return self.add_metric(
Expand Down
84 changes: 77 additions & 7 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import os
import socket
import errno
import struct
import threading
import time
from threading import Lock, RLock
Expand Down Expand Up @@ -49,6 +50,11 @@
DEFAULT_HOST = "localhost"
DEFAULT_PORT = 8125

# Socket prefixes
UNIX_ADDRESS_SCHEME = "unix://"
UNIX_ADDRESS_DATAGRAM_SCHEME = "unixgram://"
UNIX_ADDRESS_STREAM_SCHEME = "unixstream://"

# Buffering-related values (in seconds)
DEFAULT_BUFFERING_FLUSH_INTERVAL = 0.3
MIN_FLUSH_INTERVAL = 0.0001
Expand Down Expand Up @@ -489,6 +495,30 @@ def socket_path(self, path):
self._transport = "uds"
self._max_payload_size = self._max_buffer_len or UDS_OPTIMAL_PAYLOAD_LENGTH

@property
def socket(self):
return self._socket

@socket.setter
def socket(self, new_socket):
self._socket = new_socket
if new_socket:
self._socket_kind = new_socket.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)
else:
self._socket_kind = None

@property
def telemetry_socket(self):
return self._telemetry_socket

@telemetry_socket.setter
def telemetry_socket(self, t_socket):
self._telemetry_socket = t_socket
if t_socket:
self._telemetry_socket_kind = t_socket.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)
else:
self._telemetry_socket_kind = None

def enable_background_sender(self, sender_queue_size=0, sender_queue_timeout=0):
"""
Use a background thread to communicate with the dogstatsd server.
Expand Down Expand Up @@ -731,11 +761,37 @@ def _ensure_min_send_buffer_size(cls, sock, min_size=MIN_SEND_BUFFER_SIZE):

@classmethod
def _get_uds_socket(cls, socket_path, timeout):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.settimeout(timeout)
cls._ensure_min_send_buffer_size(sock)
sock.connect(socket_path)
return sock
valid_socket_kinds = [socket.SOCK_DGRAM, socket.SOCK_STREAM]
if socket_path.startswith(UNIX_ADDRESS_DATAGRAM_SCHEME):
valid_socket_kinds = [socket.SOCK_DGRAM]
socket_path = socket_path[len(UNIX_ADDRESS_DATAGRAM_SCHEME):]
elif socket_path.startswith(UNIX_ADDRESS_STREAM_SCHEME):
valid_socket_kinds = [socket.SOCK_STREAM]
socket_path = socket_path[len(UNIX_ADDRESS_STREAM_SCHEME):]
elif socket_path.startswith(UNIX_ADDRESS_SCHEME):
socket_path = socket_path[len(UNIX_ADDRESS_SCHEME):]

last_error = ValueError("Invalid socket path")
for socket_kind in valid_socket_kinds:
# py2 stores socket kinds differently than py3, determine the name independently from version
sk_name = {socket.SOCK_STREAM: "stream", socket.SOCK_DGRAM: "datagram"}[socket_kind]

try:
sock = socket.socket(socket.AF_UNIX, socket_kind)
sock.settimeout(timeout)
cls._ensure_min_send_buffer_size(sock)
sock.connect(socket_path)
log.debug("Connected to socket %s with kind %s", socket_path, sk_name)
return sock
except Exception as e:
if sock is not None:
sock.close()
log.debug("Failed to connect to %s with kind %s: %s", socket_path, sk_name, e)
if e.errno == errno.EPROTOTYPE:
last_error = e
continue
raise e
raise last_error

@classmethod
def _get_udp_socket(cls, host, port, timeout):
Expand Down Expand Up @@ -1216,14 +1272,22 @@ def _xmit_packet_with_telemetry(self, packet):
self.packets_dropped_writer += 1

def _xmit_packet(self, packet, is_telemetry):
socket_kind = None
try:
if is_telemetry and self._dedicated_telemetry_destination():
mysocket = self.telemetry_socket or self.get_socket(telemetry=True)
socket_kind = self._telemetry_socket_kind
else:
# If set, use socket directly
mysocket = self.socket or self.get_socket()
socket_kind = self._socket_kind

mysocket.send(packet.encode(self.encoding))
encoded_packet = packet.encode(self.encoding)
if socket_kind == socket.SOCK_STREAM:
mysocket.sendall(struct.pack('<I', len(encoded_packet)))
mysocket.sendall(encoded_packet)
else:
mysocket.send(encoded_packet)

if not is_telemetry and self._telemetry:
self.packets_sent += 1
Expand Down Expand Up @@ -1256,13 +1320,19 @@ def _xmit_packet(self, packet, is_telemetry):
)
self.close_socket()
except Exception as exc:
print("Unexpected error: %s", exc)
print("Unexpected error: ", exc)
log.error("Unexpected error: %s", str(exc))

if not is_telemetry and self._telemetry:
self.bytes_dropped_writer += len(packet)
self.packets_dropped_writer += 1

# if in stream mode we need to shut down the socket; we can't recover from a
# partial send
if socket_kind == socket.SOCK_STREAM:
log.debug("Confirming socket closure after error streaming")
self.close_socket()

return False

def _send_to_buffer(self, packet):
Expand Down
51 changes: 47 additions & 4 deletions tests/integration/dogstatsd/test_statsd_sender.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
from contextlib import closing
import itertools
import os
import shutil
import socket
import tempfile
from threading import Thread
import uuid

import pytest

from datadog.dogstatsd.base import DogStatsd

@pytest.mark.parametrize(
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop",
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False])),
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind",
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False], [socket.SOCK_DGRAM, socket.SOCK_STREAM])),
)
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop):
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind):
# Test basic sender operation with an assortment of options
foo, bar = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM, 0)
foo, bar = socket.socketpair(socket.AF_UNIX, socket_kind, 0)
statsd = DogStatsd(
telemetry_min_flush_interval=0,
disable_background_sender=disable_background_sender,
Expand Down Expand Up @@ -101,3 +106,41 @@ def test_buffering_with_context():
bar.settimeout(5)
msg = bar.recv(8192)
assert msg == b"first:1|c\n"

@pytest.fixture()
def socket_dir():
tempdir = tempfile.mkdtemp()
yield tempdir
shutil.rmtree(tempdir)

@pytest.mark.parametrize(
"socket_prefix, socket_kind, success",
[
("", socket.SOCK_DGRAM, True),
("", socket.SOCK_STREAM, True),
("unix://", socket.SOCK_DGRAM, True),
("unix://", socket.SOCK_STREAM, True),
("unixstream://", socket.SOCK_DGRAM, False),
("unixstream://", socket.SOCK_STREAM, True),
("unixgram://", socket.SOCK_DGRAM, True),
("unixgram://", socket.SOCK_STREAM, False)
]
)
def test_socket_connection(socket_dir, socket_prefix, socket_kind, success):
socket_path = os.path.join(socket_dir, str(uuid.uuid1()) + ".sock")
listener_socket = socket.socket(socket.AF_UNIX, socket_kind)
listener_socket.bind(socket_path)

if socket_kind == socket.SOCK_STREAM:
listener_socket.listen(1)

with closing(listener_socket):
statsd = DogStatsd(
socket_path = socket_prefix + socket_path
)

if success:
assert statsd.get_socket() is not None
else:
with pytest.raises(socket.error):
statsd.get_socket()
85 changes: 66 additions & 19 deletions tests/unit/dogstatsd/test_statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# Standard libraries
from collections import deque
from contextlib import closing
import struct
from threading import Thread
import errno
import os
Expand Down Expand Up @@ -41,13 +42,17 @@ class FakeSocket(object):

FLUSH_GRACE_PERIOD = 0.2

def __init__(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL):
def __init__(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL, socket_kind=socket.SOCK_DGRAM):
self.payloads = deque()

self._flush_interval = flush_interval
self._flush_wait = False
self._socket_kind = socket_kind
self.timeout = () # unit tuple = settimeout was not called

def sendall(self, payload):
self.send(payload)

def send(self, payload):
if is_p3k():
assert isinstance(payload, bytes)
Expand All @@ -64,17 +69,29 @@ def recv(self, count=1, reset_wait=False, no_wait=False):
time.sleep(self._flush_interval+self.FLUSH_GRACE_PERIOD)
self._flush_wait = True

if count > len(self.payloads):
payload_len = len(self.payloads)
if self._socket_kind == socket.SOCK_STREAM:
if payload_len % 2 != 0 or count > (payload_len / 2):
return None
elif count > len(self.payloads):
return None

out = []
for _ in range(count):
out.append(self.payloads.popleft().decode('utf-8'))
if self._socket_kind == socket.SOCK_DGRAM:
out.append(self.payloads.popleft().decode('utf-8'))
else:
length = struct.unpack('<I', self.payloads.popleft())[0]
pl = self.payloads.popleft()[:length].decode('utf-8')
out.append(pl)
return '\n'.join(out)

def close(self):
pass

def getsockopt(self, *args):
return self._socket_kind

def __repr__(self):
return str(self.payloads)

Expand Down Expand Up @@ -1061,47 +1078,71 @@ def test_batching(self):
telemetry=telemetry_metrics(metrics=2, bytes_sent=len(expected))
)

def test_flush(self):
def test_flush_dgram(self):
self._test_flush(socket.SOCK_DGRAM)

def test_flush_stream(self):
self._test_flush(socket.SOCK_STREAM)

def _test_flush(self, socket_kind):
dogstatsd = DogStatsd(disable_buffering=False, telemetry_min_flush_interval=0)
fake_socket = FakeSocket()
fake_socket = FakeSocket(socket_kind=socket_kind)
dogstatsd.socket = fake_socket

dogstatsd.increment('page.views')
dogstatsd.increment(u'page.®views®')
self.assertIsNone(fake_socket.recv(no_wait=True))
dogstatsd.flush()
self.assert_equal_telemetry('page.views:1|c\n', fake_socket.recv(2))
self.assert_equal_telemetry(u'page.®views®:1|c\n', fake_socket.recv(2))

def test_flush_interval_dgram(self):
self._test_flush_interval(socket.SOCK_DGRAM)

def test_flush_interval_stream(self):
self._test_flush_interval(socket.SOCK_STREAM)

def test_flush_interval(self):
def _test_flush_interval(self, socket_kind):
dogstatsd = DogStatsd(disable_buffering=False, flush_interval=1, telemetry_min_flush_interval=0)
fake_socket = FakeSocket()
fake_socket = FakeSocket(socket_kind=socket_kind)
dogstatsd.socket = fake_socket

dogstatsd.increment('page.views')
dogstatsd.increment(u'page.®views®')
self.assertIsNone(fake_socket.recv(no_wait=True))

time.sleep(0.3)
self.assertIsNone(fake_socket.recv(no_wait=True))

time.sleep(1)
self.assert_equal_telemetry(
'page.views:1|c\n',
u'page.®views®:1|c\n',
fake_socket.recv(2, no_wait=True)
)

def test_aggregation_buffering_simultaneously(self):
def test_aggregation_buffering_simultaneously_dgram(self):
self._test_aggregation_buffering_simultaneously(socket.SOCK_DGRAM)

def test_aggregation_buffering_simultaneously_stream(self):
self._test_aggregation_buffering_simultaneously(socket.SOCK_STREAM)

def _test_aggregation_buffering_simultaneously(self, socket_kind):
dogstatsd = DogStatsd(disable_buffering=False, disable_aggregation=False, telemetry_min_flush_interval=0)
fake_socket = FakeSocket()
fake_socket = FakeSocket(socket_kind=socket_kind)
dogstatsd.socket = fake_socket
for _ in range(10):
dogstatsd.increment('test.aggregation_and_buffering')
dogstatsd.increment(u'test.ÀggregÀtion_and_buffering')
self.assertIsNone(fake_socket.recv(no_wait=True))
dogstatsd.flush_aggregated_metrics()
dogstatsd.flush()
self.assert_equal_telemetry('test.aggregation_and_buffering:10|c\n', fake_socket.recv(2))
self.assert_equal_telemetry(u'test.ÀggregÀtion_and_buffering:10|c\n', fake_socket.recv(2))

def test_aggregation_buffering_simultaneously_with_interval_dgram(self):
self._test_aggregation_buffering_simultaneously_with_interval(socket.SOCK_DGRAM)

def test_aggregation_buffering_simultaneously_with_interval(self):
def test_aggregation_buffering_simultaneously_with_interval_stream(self):
self._test_aggregation_buffering_simultaneously_with_interval(socket.SOCK_STREAM)

def _test_aggregation_buffering_simultaneously_with_interval(self, socket_kind):
dogstatsd = DogStatsd(disable_buffering=False, disable_aggregation=False, flush_interval=1, telemetry_min_flush_interval=0)
fake_socket = FakeSocket()
fake_socket = FakeSocket(socket_kind=socket_kind)
dogstatsd.socket = fake_socket
for _ in range(10):
dogstatsd.increment('test.aggregation_and_buffering_with_interval')
Expand Down Expand Up @@ -1185,12 +1226,18 @@ def test_batching_sequential(self):
)
)

def test_batching_runtime_changes(self):
def test_batching_runtime_changes_dgram(self):
self._test_batching_runtime_changes(socket.SOCK_DGRAM)

def test_batching_runtime_changes_stream(self):
self._test_batching_runtime_changes(socket.SOCK_STREAM)

def _test_batching_runtime_changes(self, socket_kind):
dogstatsd = DogStatsd(
disable_buffering=True,
telemetry_min_flush_interval=0
)
dogstatsd.socket = FakeSocket()
dogstatsd.socket = FakeSocket(socket_kind=socket_kind)

# Send some unbuffered metrics and verify we got it immediately
last_telemetry_size = self.send_and_assert(
Expand Down

0 comments on commit c9dd150

Please sign in to comment.