Skip to content

Commit 719c697

Browse files
andrewqian2001datadogddrthall
authored andcommitted
[AMLII-2170] fix removed/renamed function flush (#868)
* fix missing function that was renamed * fix lint * change variable name back to original
1 parent ab20e29 commit 719c697

File tree

3 files changed

+205
-55
lines changed

3 files changed

+205
-55
lines changed

datadog/dogstatsd/base.py

+72-9
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import os
1414
import socket
1515
import errno
16+
import struct
1617
import threading
1718
import time
1819
from threading import Lock, RLock
@@ -49,8 +50,13 @@
4950
DEFAULT_HOST = "localhost"
5051
DEFAULT_PORT = 8125
5152

53+
# Socket prefixes
54+
UNIX_ADDRESS_SCHEME = "unix://"
55+
UNIX_ADDRESS_DATAGRAM_SCHEME = "unixgram://"
56+
UNIX_ADDRESS_STREAM_SCHEME = "unixstream://"
57+
5258
# Buffering-related values (in seconds)
53-
DEFAULT_FLUSH_INTERVAL = 0.3
59+
DEFAULT_BUFFERING_FLUSH_INTERVAL = 0.3
5460
MIN_FLUSH_INTERVAL = 0.0001
5561

5662
# Env var to enable/disable sending the container ID field
@@ -145,7 +151,7 @@ def __init__(
145151
host=DEFAULT_HOST, # type: Text
146152
port=DEFAULT_PORT, # type: int
147153
max_buffer_size=None, # type: None
148-
flush_interval=DEFAULT_FLUSH_INTERVAL, # type: float
154+
flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL, # type: float
149155
disable_aggregation=True, # type: bool
150156
disable_buffering=True, # type: bool
151157
namespace=None, # type: Optional[Text]
@@ -489,6 +495,30 @@ def socket_path(self, path):
489495
self._transport = "uds"
490496
self._max_payload_size = self._max_buffer_len or UDS_OPTIMAL_PAYLOAD_LENGTH
491497

498+
@property
499+
def socket(self):
500+
return self._socket
501+
502+
@socket.setter
503+
def socket(self, new_socket):
504+
self._socket = new_socket
505+
if new_socket:
506+
self._socket_kind = new_socket.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)
507+
else:
508+
self._socket_kind = None
509+
510+
@property
511+
def telemetry_socket(self):
512+
return self._telemetry_socket
513+
514+
@telemetry_socket.setter
515+
def telemetry_socket(self, t_socket):
516+
self._telemetry_socket = t_socket
517+
if t_socket:
518+
self._telemetry_socket_kind = t_socket.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)
519+
else:
520+
self._telemetry_socket_kind = None
521+
492522
def enable_background_sender(self, sender_queue_size=0, sender_queue_timeout=0):
493523
"""
494524
Use a background thread to communicate with the dogstatsd server.
@@ -643,7 +673,7 @@ def disable_aggregation(self):
643673
self._stop_flush_thread()
644674
log.debug("Statsd aggregation is disabled")
645675

646-
def enable_aggregation(self, flush_interval=DEFAULT_FLUSH_INTERVAL):
676+
def enable_aggregation(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL):
647677
with self._config_lock:
648678
if not self._disable_aggregation:
649679
return
@@ -731,11 +761,37 @@ def _ensure_min_send_buffer_size(cls, sock, min_size=MIN_SEND_BUFFER_SIZE):
731761

732762
@classmethod
733763
def _get_uds_socket(cls, socket_path, timeout):
734-
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
735-
sock.settimeout(timeout)
736-
cls._ensure_min_send_buffer_size(sock)
737-
sock.connect(socket_path)
738-
return sock
764+
valid_socket_kinds = [socket.SOCK_DGRAM, socket.SOCK_STREAM]
765+
if socket_path.startswith(UNIX_ADDRESS_DATAGRAM_SCHEME):
766+
valid_socket_kinds = [socket.SOCK_DGRAM]
767+
socket_path = socket_path[len(UNIX_ADDRESS_DATAGRAM_SCHEME):]
768+
elif socket_path.startswith(UNIX_ADDRESS_STREAM_SCHEME):
769+
valid_socket_kinds = [socket.SOCK_STREAM]
770+
socket_path = socket_path[len(UNIX_ADDRESS_STREAM_SCHEME):]
771+
elif socket_path.startswith(UNIX_ADDRESS_SCHEME):
772+
socket_path = socket_path[len(UNIX_ADDRESS_SCHEME):]
773+
774+
last_error = ValueError("Invalid socket path")
775+
for socket_kind in valid_socket_kinds:
776+
# py2 stores socket kinds differently than py3, determine the name independently from version
777+
sk_name = { socket.SOCK_STREAM: "stream", socket.SOCK_DGRAM: "datagram" }[socket_kind]
778+
779+
try:
780+
sock = socket.socket(socket.AF_UNIX, socket_kind)
781+
sock.settimeout(timeout)
782+
cls._ensure_min_send_buffer_size(sock)
783+
sock.connect(socket_path)
784+
log.debug("Connected to socket %s with kind %s", socket_path, sk_name)
785+
return sock
786+
except Exception as e:
787+
if sock is not None:
788+
sock.close()
789+
log.debug("Failed to connect to %s with kind %s: %s", socket_path, sk_name, e)
790+
if e.errno == errno.EPROTOTYPE:
791+
last_error = e
792+
continue
793+
raise e
794+
raise last_error
739795

740796
@classmethod
741797
def _get_udp_socket(cls, host, port, timeout):
@@ -805,6 +861,9 @@ def _reset_buffer(self):
805861
self._current_buffer_total_size = 0
806862
self._buffer = []
807863

864+
def flush(self):
865+
self.flush_buffered_metrics()
866+
808867
def flush_buffered_metrics(self):
809868
"""
810869
Flush the metrics buffer by sending the data to the server.
@@ -1216,10 +1275,14 @@ def _xmit_packet(self, packet, is_telemetry):
12161275
try:
12171276
if is_telemetry and self._dedicated_telemetry_destination():
12181277
mysocket = self.telemetry_socket or self.get_socket(telemetry=True)
1278+
socket_kind = self._telemetry_socket_kind
12191279
else:
12201280
# If set, use socket directly
12211281
mysocket = self.socket or self.get_socket()
1282+
socket_kind = self._socket_kind
12221283

1284+
if socket_kind == socket.SOCK_STREAM:
1285+
mysocket.send(struct.pack('<I', len(packet)))
12231286
mysocket.send(packet.encode(self.encoding))
12241287

12251288
if not is_telemetry and self._telemetry:
@@ -1253,7 +1316,7 @@ def _xmit_packet(self, packet, is_telemetry):
12531316
)
12541317
self.close_socket()
12551318
except Exception as exc:
1256-
print("Unexpected error: %s", exc)
1319+
print("Unexpected error: ", exc)
12571320
log.error("Unexpected error: %s", str(exc))
12581321

12591322
if not is_telemetry and self._telemetry:

tests/integration/dogstatsd/test_statsd_sender.py

+47-4
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,23 @@
1+
from contextlib import closing
12
import itertools
3+
import os
4+
import shutil
25
import socket
6+
import tempfile
37
from threading import Thread
8+
import uuid
49

510
import pytest
611

712
from datadog.dogstatsd.base import DogStatsd
813

914
@pytest.mark.parametrize(
10-
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop",
11-
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False])),
15+
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind",
16+
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False], [socket.SOCK_DGRAM, socket.SOCK_STREAM])),
1217
)
13-
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop):
18+
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind):
1419
# Test basic sender operation with an assortment of options
15-
foo, bar = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM, 0)
20+
foo, bar = socket.socketpair(socket.AF_UNIX, socket_kind, 0)
1621
statsd = DogStatsd(
1722
telemetry_min_flush_interval=0,
1823
disable_background_sender=disable_background_sender,
@@ -101,3 +106,41 @@ def test_buffering_with_context():
101106
bar.settimeout(5)
102107
msg = bar.recv(8192)
103108
assert msg == b"first:1|c\n"
109+
110+
@pytest.fixture()
111+
def socket_dir():
112+
tempdir = tempfile.mkdtemp()
113+
yield tempdir
114+
shutil.rmtree(tempdir)
115+
116+
@pytest.mark.parametrize(
117+
"socket_prefix, socket_kind, success",
118+
[
119+
("", socket.SOCK_DGRAM, True),
120+
("", socket.SOCK_STREAM, True),
121+
("unix://", socket.SOCK_DGRAM, True),
122+
("unix://", socket.SOCK_STREAM, True),
123+
("unixstream://", socket.SOCK_DGRAM, False),
124+
("unixstream://", socket.SOCK_STREAM, True),
125+
("unixgram://", socket.SOCK_DGRAM, True),
126+
("unixgram://", socket.SOCK_STREAM, False)
127+
]
128+
)
129+
def test_socket_connection(socket_dir, socket_prefix, socket_kind, success):
130+
socket_path = os.path.join(socket_dir, str(uuid.uuid1()) + ".sock")
131+
listener_socket = socket.socket(socket.AF_UNIX, socket_kind)
132+
listener_socket.bind(socket_path)
133+
134+
if socket_kind == socket.SOCK_STREAM:
135+
listener_socket.listen(1)
136+
137+
with closing(listener_socket):
138+
statsd = DogStatsd(
139+
socket_path = socket_prefix + socket_path
140+
)
141+
142+
if success:
143+
assert statsd.get_socket() is not None
144+
else:
145+
with pytest.raises(socket.error):
146+
statsd.get_socket()

0 commit comments

Comments
 (0)