Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMLII-2170] fix removed/renamed function flush #868

Merged
merged 3 commits into from
Nov 15, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
@@ -50,7 +50,7 @@
DEFAULT_PORT = 8125

# Buffering-related values (in seconds)
DEFAULT_FLUSH_INTERVAL = 0.3
DEFAULT_BUFFERING_FLUSH_INTERVAL = 0.3
MIN_FLUSH_INTERVAL = 0.0001

# Env var to enable/disable sending the container ID field
@@ -145,7 +145,7 @@ def __init__(
host=DEFAULT_HOST, # type: Text
port=DEFAULT_PORT, # type: int
max_buffer_size=None, # type: None
flush_interval=DEFAULT_FLUSH_INTERVAL, # type: float
flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL, # type: float
disable_aggregation=True, # type: bool
disable_buffering=True, # type: bool
namespace=None, # type: Optional[Text]
@@ -643,7 +643,7 @@ def disable_aggregation(self):
self._stop_flush_thread()
log.debug("Statsd aggregation is disabled")

def enable_aggregation(self, flush_interval=DEFAULT_FLUSH_INTERVAL):
def enable_aggregation(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL):
with self._config_lock:
if not self._disable_aggregation:
return
@@ -805,6 +805,9 @@ def _reset_buffer(self):
self._current_buffer_total_size = 0
self._buffer = []

def flush(self):
self.flush_buffered_metrics()

def flush_buffered_metrics(self):
"""
Flush the metrics buffer by sending the data to the server.
60 changes: 30 additions & 30 deletions tests/unit/dogstatsd/test_statsd.py
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@
# Datadog libraries
from datadog import initialize, statsd
from datadog import __version__ as version
from datadog.dogstatsd.base import DEFAULT_FLUSH_INTERVAL, DogStatsd, MIN_SEND_BUFFER_SIZE, UDP_OPTIMAL_PAYLOAD_LENGTH, UDS_OPTIMAL_PAYLOAD_LENGTH
from datadog.dogstatsd.base import DEFAULT_BUFFERING_FLUSH_INTERVAL, DogStatsd, MIN_SEND_BUFFER_SIZE, UDP_OPTIMAL_PAYLOAD_LENGTH, UDS_OPTIMAL_PAYLOAD_LENGTH
from datadog.dogstatsd.context import TimedContextManagerDecorator
from datadog.util.compat import is_higher_py35, is_p3k
from tests.util.contextmanagers import preserve_environment_variable, EnvVars
@@ -41,7 +41,7 @@ class FakeSocket(object):

FLUSH_GRACE_PERIOD = 0.2

def __init__(self, flush_interval=DEFAULT_FLUSH_INTERVAL):
def __init__(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL):
self.payloads = deque()

self._flush_interval = flush_interval
@@ -331,42 +331,42 @@ def test_gauge_with_invalid_ts_should_be_ignored(self):

def test_counter(self):
self.statsd.increment('page.views')
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry('page.views:1|c\n', self.recv(2))

self.statsd._reset_telemetry()
self.statsd.increment('page.views', 11)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry('page.views:11|c\n', self.recv(2))

self.statsd._reset_telemetry()
self.statsd.decrement('page.views')
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry('page.views:-1|c\n', self.recv(2))

self.statsd._reset_telemetry()
self.statsd.decrement('page.views', 12)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry('page.views:-12|c\n', self.recv(2))

def test_count(self):
self.statsd.count('page.views', 11)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry('page.views:11|c\n', self.recv(2))

def test_count_with_ts(self):
self.statsd.count_with_timestamp("page.views", 1, timestamp=1066)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry("page.views:1|c|T1066\n", self.recv(2))

self.statsd._reset_telemetry()
self.statsd.count_with_timestamp("page.views", 11, timestamp=2121)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry("page.views:11|c|T2121\n", self.recv(2))

def test_count_with_invalid_ts_should_be_ignored(self):
self.statsd.count_with_timestamp("page.views", 1, timestamp=-1066)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry("page.views:1|c\n", self.recv(2))

def test_histogram(self):
@@ -399,7 +399,7 @@ def test_sample_rate(self):
for _ in range(10000):
self.statsd.increment('sampled_counter', sample_rate=0.3)

self.statsd.flush_buffered_metrics()
self.statsd.flush()

total_metrics = 0
payload = self.recv()
@@ -667,7 +667,7 @@ def test_socket_error(self):
self.statsd.socket = BrokenSocket()
with mock.patch("datadog.dogstatsd.base.log") as mock_log:
self.statsd.gauge('no error', 1)
self.statsd.flush_buffered_metrics()
self.statsd.flush()

mock_log.error.assert_not_called()
mock_log.warning.assert_called_once_with(
@@ -679,7 +679,7 @@ def test_socket_overflown(self):
self.statsd.socket = OverflownSocket()
with mock.patch("datadog.dogstatsd.base.log") as mock_log:
self.statsd.gauge('no error', 1)
self.statsd.flush_buffered_metrics()
self.statsd.flush()

mock_log.error.assert_not_called()
calls = [call("Socket send would block: %s, dropping the packet", mock.ANY)]
@@ -689,7 +689,7 @@ def test_socket_message_too_long(self):
self.statsd.socket = BrokenSocket(error_number=errno.EMSGSIZE)
with mock.patch("datadog.dogstatsd.base.log") as mock_log:
self.statsd.gauge('no error', 1)
self.statsd.flush_buffered_metrics()
self.statsd.flush()

mock_log.error.assert_not_called()
calls = [
@@ -705,7 +705,7 @@ def test_socket_no_buffer_space(self):
self.statsd.socket = BrokenSocket(error_number=errno.ENOBUFS)
with mock.patch("datadog.dogstatsd.base.log") as mock_log:
self.statsd.gauge('no error', 1)
self.statsd.flush_buffered_metrics()
self.statsd.flush()

mock_log.error.assert_not_called()
calls = [call("Socket buffer full: %s, dropping the packet", mock.ANY)]
@@ -720,7 +720,7 @@ def test_uds_socket_ensures_min_receive_buffer(self, mock_socket_create):

datadog = DogStatsd(socket_path="/fake/uds/socket/path")
datadog.gauge('some value', 1)
datadog.flush_buffered_metrics()
datadog.flush()

# Sanity check
mock_socket_create.assert_called_once_with(socket.AF_UNIX, socket.SOCK_DGRAM)
@@ -740,7 +740,7 @@ def test_udp_socket_ensures_min_receive_buffer(self, mock_socket_create):

datadog = DogStatsd()
datadog.gauge('some value', 1)
datadog.flush_buffered_metrics()
datadog.flush()

# Sanity check
mock_socket_create.assert_called_once_with(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
@@ -837,7 +837,7 @@ def func(arg1, arg2, kwarg1=1, kwarg2=1):
return (arg1, arg2, kwarg1, kwarg2)

func(1, 2, kwarg2=3)
self.statsd.flush_buffered_metrics()
self.statsd.flush()

# Ignore telemetry packet
packet = self.recv(2).split("\n")[0]
@@ -881,7 +881,7 @@ def func(arg1, arg2, kwarg1=1, kwarg2=1):
return (arg1, arg2, kwarg1, kwarg2)

func(1, 2, kwarg2=3)
self.statsd.flush_buffered_metrics()
self.statsd.flush()

packet = self.recv()
name_value, type_ = packet.rstrip('\n').split('|')
@@ -1068,7 +1068,7 @@ def test_flush(self):

dogstatsd.increment('page.views')
self.assertIsNone(fake_socket.recv(no_wait=True))
dogstatsd.flush_buffered_metrics()
dogstatsd.flush()
self.assert_equal_telemetry('page.views:1|c\n', fake_socket.recv(2))

def test_flush_interval(self):
@@ -1096,7 +1096,7 @@ def test_aggregation_buffering_simultaneously(self):
dogstatsd.increment('test.aggregation_and_buffering')
self.assertIsNone(fake_socket.recv(no_wait=True))
dogstatsd.flush_aggregated_metrics()
dogstatsd.flush_buffered_metrics()
dogstatsd.flush()
self.assert_equal_telemetry('test.aggregation_and_buffering:10|c\n', fake_socket.recv(2))

def test_aggregation_buffering_simultaneously_with_interval(self):
@@ -1139,7 +1139,7 @@ def test_flush_disable(self):
dogstatsd.increment('page.views')
self.assertIsNone(fake_socket.recv(no_wait=True))

time.sleep(DEFAULT_FLUSH_INTERVAL)
time.sleep(DEFAULT_BUFFERING_FLUSH_INTERVAL)
self.assertIsNone(fake_socket.recv(no_wait=True))

time.sleep(0.3)
@@ -1697,7 +1697,7 @@ def test_entity_id_and_container_id(self):
dogstatsd._container_id = "ci-fake-container-id"

dogstatsd.increment("page.views")
dogstatsd.flush_buffered_metrics()
dogstatsd.flush()
tags = "dd.internal.entity_id:04652bb7-19b7-11e9-9cc6-42010a9c016d"
metric = 'page.views:1|c|#' + tags + '|c:ci-fake-container-id\n'
self.assertEqual(metric, dogstatsd.socket.recv())
@@ -1712,7 +1712,7 @@ def test_entity_id_and_container_id_and_external_env(self):
dogstatsd._container_id = "ci-fake-container-id"

dogstatsd.increment("page.views")
dogstatsd.flush_buffered_metrics()
dogstatsd.flush()
tags = "dd.internal.entity_id:04652bb7-19b7-11e9-9cc6-42010a9c016d"
metric = 'page.views:1|c|#' + tags + '|c:ci-fake-container-id' + '|e:it-false,cn-container-name,pu-04652bb7-19b7-11e9-9cc6-42010a9c016d' + '\n'
self.assertEqual(metric, dogstatsd.socket.recv())
@@ -1795,7 +1795,7 @@ def test_dogstatsd_initialization_with_dd_env_service_version(self):
# Make call with no tags passed; only the globally configured tags will be used.
global_tags_str = ','.join([t for t in global_tags])
dogstatsd.gauge('gt', 123.4)
dogstatsd.flush_buffered_metrics()
dogstatsd.flush()

# Protect against the no tags case.
metric = 'gt:123.4|g|#{}\n'.format(global_tags_str) if global_tags_str else 'gt:123.4|g\n'
@@ -1813,7 +1813,7 @@ def test_dogstatsd_initialization_with_dd_env_service_version(self):
passed_tags = ['env:prod', 'version:def456', 'custom_tag:toad']
all_tags_str = ','.join([t for t in passed_tags + global_tags])
dogstatsd.gauge('gt', 123.4, tags=passed_tags)
dogstatsd.flush_buffered_metrics()
dogstatsd.flush()

metric = 'gt:123.4|g|#{}\n'.format(all_tags_str)
self.assertEqual(metric, dogstatsd.socket.recv())
@@ -1919,22 +1919,22 @@ def test_counter_with_container_field(self):
self.statsd._container_id = "ci-fake-container-id"

self.statsd.increment("page.views")
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry("page.views:1|c|c:ci-fake-container-id\n", self.recv(2))

self.statsd._reset_telemetry()
self.statsd.increment("page.views", 11)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry("page.views:11|c|c:ci-fake-container-id\n", self.recv(2))

self.statsd._reset_telemetry()
self.statsd.decrement("page.views")
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry("page.views:-1|c|c:ci-fake-container-id\n", self.recv(2))

self.statsd._reset_telemetry()
self.statsd.decrement("page.views", 12)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry("page.views:-12|c|c:ci-fake-container-id\n", self.recv(2))

self.statsd._container_id = None