Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UDS SOCK_STREAM support to the DogStatsD client #869

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datadog/dogstatsd/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ def flush_aggregated_sampled_metrics(self):
return metrics

def get_context(self, name, tags):
tags_str = ",".join(tags) if tags is not None else ""
return "{}:{}".format(name, tags_str)
tags_str = u",".join(tags) if tags is not None else ""
return u"{}:{}".format(name, tags_str)

def count(self, name, value, tags, rate, timestamp=0):
return self.add_metric(
Expand Down
91 changes: 84 additions & 7 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import os
import socket
import errno
import struct
import threading
import time
from threading import Lock, RLock
Expand Down Expand Up @@ -49,6 +50,11 @@
DEFAULT_HOST = "localhost"
DEFAULT_PORT = 8125

# Socket prefixes
UNIX_ADDRESS_SCHEME = "unix://"
UNIX_ADDRESS_DATAGRAM_SCHEME = "unixgram://"
UNIX_ADDRESS_STREAM_SCHEME = "unixstream://"

# Buffering-related values (in seconds)
DEFAULT_BUFFERING_FLUSH_INTERVAL = 0.3
MIN_FLUSH_INTERVAL = 0.0001
Expand Down Expand Up @@ -495,6 +501,36 @@ def socket_path(self, path):
self._transport = "uds"
self._max_payload_size = self._max_buffer_len or UDS_OPTIMAL_PAYLOAD_LENGTH

@property
def socket(self):
return self._socket

@socket.setter
def socket(self, new_socket):
self._socket = new_socket
if new_socket:
try:
self._socket_kind = new_socket.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)
return
except AttributeError: # _socket can't have a type if it doesn't have sockopts
log.info("Unexpected socket provided with no support for getsockopt")
self._socket_kind = None

@property
def telemetry_socket(self):
return self._telemetry_socket

@telemetry_socket.setter
def telemetry_socket(self, t_socket):
self._telemetry_socket = t_socket
if t_socket:
try:
self._telemetry_socket_kind = t_socket.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)
return
except AttributeError: # _telemetry_socket can't have a kind if it doesn't have sockopts
log.info("Unexpected telemetry socket provided with no support for getsockopt")
self._telemetry_socket_kind = None

def enable_background_sender(self, sender_queue_size=0, sender_queue_timeout=0):
"""
Use a background thread to communicate with the dogstatsd server.
Expand Down Expand Up @@ -738,11 +774,37 @@ def _ensure_min_send_buffer_size(cls, sock, min_size=MIN_SEND_BUFFER_SIZE):

@classmethod
def _get_uds_socket(cls, socket_path, timeout):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.settimeout(timeout)
cls._ensure_min_send_buffer_size(sock)
sock.connect(socket_path)
return sock
valid_socket_kinds = [socket.SOCK_DGRAM, socket.SOCK_STREAM]
if socket_path.startswith(UNIX_ADDRESS_DATAGRAM_SCHEME):
valid_socket_kinds = [socket.SOCK_DGRAM]
socket_path = socket_path[len(UNIX_ADDRESS_DATAGRAM_SCHEME):]
elif socket_path.startswith(UNIX_ADDRESS_STREAM_SCHEME):
valid_socket_kinds = [socket.SOCK_STREAM]
socket_path = socket_path[len(UNIX_ADDRESS_STREAM_SCHEME):]
elif socket_path.startswith(UNIX_ADDRESS_SCHEME):
socket_path = socket_path[len(UNIX_ADDRESS_SCHEME):]

last_error = ValueError("Invalid socket path")
for socket_kind in valid_socket_kinds:
# py2 stores socket kinds differently than py3, determine the name independently from version
sk_name = {socket.SOCK_STREAM: "stream", socket.SOCK_DGRAM: "datagram"}[socket_kind]

try:
sock = socket.socket(socket.AF_UNIX, socket_kind)
sock.settimeout(timeout)
cls._ensure_min_send_buffer_size(sock)
sock.connect(socket_path)
log.debug("Connected to socket %s with kind %s", socket_path, sk_name)
return sock
except Exception as e:
if sock is not None:
sock.close()
log.debug("Failed to connect to %s with kind %s: %s", socket_path, sk_name, e)
if e.errno == errno.EPROTOTYPE:
last_error = e
continue
raise e
raise last_error

@classmethod
def _get_udp_socket(cls, host, port, timeout):
Expand Down Expand Up @@ -1243,14 +1305,23 @@ def _xmit_packet_with_telemetry(self, packet):
self.packets_dropped_writer += 1

def _xmit_packet(self, packet, is_telemetry):
socket_kind = None
try:
if is_telemetry and self._dedicated_telemetry_destination():
mysocket = self.telemetry_socket or self.get_socket(telemetry=True)
socket_kind = self._telemetry_socket_kind
else:
# If set, use socket directly
mysocket = self.socket or self.get_socket()
socket_kind = self._socket_kind

mysocket.send(packet.encode(self.encoding))
encoded_packet = packet.encode(self.encoding)
if socket_kind == socket.SOCK_STREAM:
with self._socket_lock:
mysocket.sendall(struct.pack('<I', len(encoded_packet)))
mysocket.sendall(encoded_packet)
else:
mysocket.send(encoded_packet)

if not is_telemetry and self._telemetry:
self.packets_sent += 1
Expand Down Expand Up @@ -1283,13 +1354,19 @@ def _xmit_packet(self, packet, is_telemetry):
)
self.close_socket()
except Exception as exc:
print("Unexpected error: %s", exc)
print("Unexpected error: ", exc)
log.error("Unexpected error: %s", str(exc))

if not is_telemetry and self._telemetry:
self.bytes_dropped_writer += len(packet)
self.packets_dropped_writer += 1

# if in stream mode we need to shut down the socket; we can't recover from a
# partial send
if socket_kind == socket.SOCK_STREAM:
log.debug("Confirming socket closure after error streaming")
self.close_socket()

return False

def _send_to_buffer(self, packet):
Expand Down
77 changes: 72 additions & 5 deletions tests/integration/dogstatsd/test_statsd_sender.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
from contextlib import closing
import itertools
import os
import shutil
import socket
import struct
import tempfile
from threading import Thread
import uuid

import pytest

from datadog.dogstatsd.base import DogStatsd

@pytest.mark.parametrize(
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop",
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False])),
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind",
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False], [socket.SOCK_DGRAM, socket.SOCK_STREAM])),
)
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop):
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind):
# Test basic sender operation with an assortment of options
foo, bar = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM, 0)
foo, bar = socket.socketpair(socket.AF_UNIX, socket_kind, 0)
statsd = DogStatsd(
telemetry_min_flush_interval=0,
disable_background_sender=disable_background_sender,
Expand All @@ -24,7 +30,11 @@ def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pend
statsd._reset_telemetry()

def reader_thread():
msg = bar.recv(8192)
if socket_kind == socket.SOCK_DGRAM:
msg = bar.recv(8192)
else:
size = struct.unpack("<I", bar.recv(4))[0]
msg = bar.recv(size)
assert msg == b"test.metric:1|c\n"

t = Thread(target=reader_thread, name="test_sender_mode/reader_thread")
Expand All @@ -49,6 +59,25 @@ def test_set_socket_timeout():
statsd.close_socket()
assert statsd.get_socket().gettimeout() == 1

def test_stream_cleanup():
foo, _ = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM, 0)

foo.settimeout(0)
statsd = DogStatsd(disable_buffering=True)
statsd.socket = foo
statsd.increment("test", 1)
statsd.increment("test", 1)
statsd.increment("test", 1)
assert statsd.socket is not None

foo.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1) # different os's have different mins, e.g. this sets the buffer size to 2304 on certain linux variants

with pytest.raises(socket.error):
foo.sendall(os.urandom(5000)) # pre-emptively clog the buffer

statsd.increment("test", 1)

assert statsd.socket is None

@pytest.mark.parametrize(
"disable_background_sender, disable_buffering",
Expand Down Expand Up @@ -101,3 +130,41 @@ def test_buffering_with_context():
bar.settimeout(5)
msg = bar.recv(8192)
assert msg == b"first:1|c\n"

@pytest.fixture()
def socket_dir():
tempdir = tempfile.mkdtemp()
yield tempdir
shutil.rmtree(tempdir)

@pytest.mark.parametrize(
"socket_prefix, socket_kind, success",
[
("", socket.SOCK_DGRAM, True),
("", socket.SOCK_STREAM, True),
("unix://", socket.SOCK_DGRAM, True),
("unix://", socket.SOCK_STREAM, True),
("unixstream://", socket.SOCK_DGRAM, False),
("unixstream://", socket.SOCK_STREAM, True),
("unixgram://", socket.SOCK_DGRAM, True),
("unixgram://", socket.SOCK_STREAM, False)
]
)
def test_socket_connection(socket_dir, socket_prefix, socket_kind, success):
socket_path = os.path.join(socket_dir, str(uuid.uuid1()) + ".sock")
listener_socket = socket.socket(socket.AF_UNIX, socket_kind)
listener_socket.bind(socket_path)

if socket_kind == socket.SOCK_STREAM:
listener_socket.listen(1)

with closing(listener_socket):
statsd = DogStatsd(
socket_path = socket_prefix + socket_path
)

if success:
assert statsd.get_socket() is not None
else:
with pytest.raises(socket.error):
statsd.get_socket()
Loading
Loading