Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMLII-2170] fix removed/renamed function flush #868

Merged
merged 3 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
DEFAULT_PORT = 8125

# Buffering-related values (in seconds)
DEFAULT_FLUSH_INTERVAL = 0.3
DEFAULT_BUFFERING_FLUSH_INTERVAL = 0.3
MIN_FLUSH_INTERVAL = 0.0001

# Env var to enable/disable sending the container ID field
Expand Down Expand Up @@ -145,7 +145,7 @@ def __init__(
host=DEFAULT_HOST, # type: Text
port=DEFAULT_PORT, # type: int
max_buffer_size=None, # type: None
flush_interval=DEFAULT_FLUSH_INTERVAL, # type: float
flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL, # type: float
disable_aggregation=True, # type: bool
disable_buffering=True, # type: bool
namespace=None, # type: Optional[Text]
Expand Down Expand Up @@ -643,7 +643,7 @@ def disable_aggregation(self):
self._stop_flush_thread()
log.debug("Statsd aggregation is disabled")

def enable_aggregation(self, flush_interval=DEFAULT_FLUSH_INTERVAL):
def enable_aggregation(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL):
with self._config_lock:
if not self._disable_aggregation:
return
Expand Down Expand Up @@ -805,6 +805,9 @@ def _reset_buffer(self):
self._current_buffer_total_size = 0
self._buffer = []

def flush(self):
self.flush_buffered_metrics()

def flush_buffered_metrics(self):
"""
Flush the metrics buffer by sending the data to the server.
Expand Down
60 changes: 30 additions & 30 deletions tests/unit/dogstatsd/test_statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
# Datadog libraries
from datadog import initialize, statsd
from datadog import __version__ as version
from datadog.dogstatsd.base import DEFAULT_FLUSH_INTERVAL, DogStatsd, MIN_SEND_BUFFER_SIZE, UDP_OPTIMAL_PAYLOAD_LENGTH, UDS_OPTIMAL_PAYLOAD_LENGTH
from datadog.dogstatsd.base import DEFAULT_BUFFERING_FLUSH_INTERVAL, DogStatsd, MIN_SEND_BUFFER_SIZE, UDP_OPTIMAL_PAYLOAD_LENGTH, UDS_OPTIMAL_PAYLOAD_LENGTH
from datadog.dogstatsd.context import TimedContextManagerDecorator
from datadog.util.compat import is_higher_py35, is_p3k
from tests.util.contextmanagers import preserve_environment_variable, EnvVars
Expand All @@ -41,7 +41,7 @@ class FakeSocket(object):

FLUSH_GRACE_PERIOD = 0.2

def __init__(self, flush_interval=DEFAULT_FLUSH_INTERVAL):
def __init__(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL):
self.payloads = deque()

self._flush_interval = flush_interval
Expand Down Expand Up @@ -331,42 +331,42 @@ def test_gauge_with_invalid_ts_should_be_ignored(self):

def test_counter(self):
self.statsd.increment('page.views')
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry('page.views:1|c\n', self.recv(2))

self.statsd._reset_telemetry()
self.statsd.increment('page.views', 11)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry('page.views:11|c\n', self.recv(2))

self.statsd._reset_telemetry()
self.statsd.decrement('page.views')
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry('page.views:-1|c\n', self.recv(2))

self.statsd._reset_telemetry()
self.statsd.decrement('page.views', 12)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry('page.views:-12|c\n', self.recv(2))

def test_count(self):
self.statsd.count('page.views', 11)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry('page.views:11|c\n', self.recv(2))

def test_count_with_ts(self):
self.statsd.count_with_timestamp("page.views", 1, timestamp=1066)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry("page.views:1|c|T1066\n", self.recv(2))

self.statsd._reset_telemetry()
self.statsd.count_with_timestamp("page.views", 11, timestamp=2121)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry("page.views:11|c|T2121\n", self.recv(2))

def test_count_with_invalid_ts_should_be_ignored(self):
self.statsd.count_with_timestamp("page.views", 1, timestamp=-1066)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry("page.views:1|c\n", self.recv(2))

def test_histogram(self):
Expand Down Expand Up @@ -399,7 +399,7 @@ def test_sample_rate(self):
for _ in range(10000):
self.statsd.increment('sampled_counter', sample_rate=0.3)

self.statsd.flush_buffered_metrics()
self.statsd.flush()

total_metrics = 0
payload = self.recv()
Expand Down Expand Up @@ -667,7 +667,7 @@ def test_socket_error(self):
self.statsd.socket = BrokenSocket()
with mock.patch("datadog.dogstatsd.base.log") as mock_log:
self.statsd.gauge('no error', 1)
self.statsd.flush_buffered_metrics()
self.statsd.flush()

mock_log.error.assert_not_called()
mock_log.warning.assert_called_once_with(
Expand All @@ -679,7 +679,7 @@ def test_socket_overflown(self):
self.statsd.socket = OverflownSocket()
with mock.patch("datadog.dogstatsd.base.log") as mock_log:
self.statsd.gauge('no error', 1)
self.statsd.flush_buffered_metrics()
self.statsd.flush()

mock_log.error.assert_not_called()
calls = [call("Socket send would block: %s, dropping the packet", mock.ANY)]
Expand All @@ -689,7 +689,7 @@ def test_socket_message_too_long(self):
self.statsd.socket = BrokenSocket(error_number=errno.EMSGSIZE)
with mock.patch("datadog.dogstatsd.base.log") as mock_log:
self.statsd.gauge('no error', 1)
self.statsd.flush_buffered_metrics()
self.statsd.flush()

mock_log.error.assert_not_called()
calls = [
Expand All @@ -705,7 +705,7 @@ def test_socket_no_buffer_space(self):
self.statsd.socket = BrokenSocket(error_number=errno.ENOBUFS)
with mock.patch("datadog.dogstatsd.base.log") as mock_log:
self.statsd.gauge('no error', 1)
self.statsd.flush_buffered_metrics()
self.statsd.flush()

mock_log.error.assert_not_called()
calls = [call("Socket buffer full: %s, dropping the packet", mock.ANY)]
Expand All @@ -720,7 +720,7 @@ def test_uds_socket_ensures_min_receive_buffer(self, mock_socket_create):

datadog = DogStatsd(socket_path="/fake/uds/socket/path")
datadog.gauge('some value', 1)
datadog.flush_buffered_metrics()
datadog.flush()

# Sanity check
mock_socket_create.assert_called_once_with(socket.AF_UNIX, socket.SOCK_DGRAM)
Expand All @@ -740,7 +740,7 @@ def test_udp_socket_ensures_min_receive_buffer(self, mock_socket_create):

datadog = DogStatsd()
datadog.gauge('some value', 1)
datadog.flush_buffered_metrics()
datadog.flush()

# Sanity check
mock_socket_create.assert_called_once_with(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
Expand Down Expand Up @@ -837,7 +837,7 @@ def func(arg1, arg2, kwarg1=1, kwarg2=1):
return (arg1, arg2, kwarg1, kwarg2)

func(1, 2, kwarg2=3)
self.statsd.flush_buffered_metrics()
self.statsd.flush()

# Ignore telemetry packet
packet = self.recv(2).split("\n")[0]
Expand Down Expand Up @@ -881,7 +881,7 @@ def func(arg1, arg2, kwarg1=1, kwarg2=1):
return (arg1, arg2, kwarg1, kwarg2)

func(1, 2, kwarg2=3)
self.statsd.flush_buffered_metrics()
self.statsd.flush()

packet = self.recv()
name_value, type_ = packet.rstrip('\n').split('|')
Expand Down Expand Up @@ -1068,7 +1068,7 @@ def test_flush(self):

dogstatsd.increment('page.views')
self.assertIsNone(fake_socket.recv(no_wait=True))
dogstatsd.flush_buffered_metrics()
dogstatsd.flush()
self.assert_equal_telemetry('page.views:1|c\n', fake_socket.recv(2))

def test_flush_interval(self):
Expand Down Expand Up @@ -1096,7 +1096,7 @@ def test_aggregation_buffering_simultaneously(self):
dogstatsd.increment('test.aggregation_and_buffering')
self.assertIsNone(fake_socket.recv(no_wait=True))
dogstatsd.flush_aggregated_metrics()
dogstatsd.flush_buffered_metrics()
dogstatsd.flush()
self.assert_equal_telemetry('test.aggregation_and_buffering:10|c\n', fake_socket.recv(2))

def test_aggregation_buffering_simultaneously_with_interval(self):
Expand Down Expand Up @@ -1139,7 +1139,7 @@ def test_flush_disable(self):
dogstatsd.increment('page.views')
self.assertIsNone(fake_socket.recv(no_wait=True))

time.sleep(DEFAULT_FLUSH_INTERVAL)
time.sleep(DEFAULT_BUFFERING_FLUSH_INTERVAL)
self.assertIsNone(fake_socket.recv(no_wait=True))

time.sleep(0.3)
Expand Down Expand Up @@ -1697,7 +1697,7 @@ def test_entity_id_and_container_id(self):
dogstatsd._container_id = "ci-fake-container-id"

dogstatsd.increment("page.views")
dogstatsd.flush_buffered_metrics()
dogstatsd.flush()
tags = "dd.internal.entity_id:04652bb7-19b7-11e9-9cc6-42010a9c016d"
metric = 'page.views:1|c|#' + tags + '|c:ci-fake-container-id\n'
self.assertEqual(metric, dogstatsd.socket.recv())
Expand All @@ -1712,7 +1712,7 @@ def test_entity_id_and_container_id_and_external_env(self):
dogstatsd._container_id = "ci-fake-container-id"

dogstatsd.increment("page.views")
dogstatsd.flush_buffered_metrics()
dogstatsd.flush()
tags = "dd.internal.entity_id:04652bb7-19b7-11e9-9cc6-42010a9c016d"
metric = 'page.views:1|c|#' + tags + '|c:ci-fake-container-id' + '|e:it-false,cn-container-name,pu-04652bb7-19b7-11e9-9cc6-42010a9c016d' + '\n'
self.assertEqual(metric, dogstatsd.socket.recv())
Expand Down Expand Up @@ -1795,7 +1795,7 @@ def test_dogstatsd_initialization_with_dd_env_service_version(self):
# Make call with no tags passed; only the globally configured tags will be used.
global_tags_str = ','.join([t for t in global_tags])
dogstatsd.gauge('gt', 123.4)
dogstatsd.flush_buffered_metrics()
dogstatsd.flush()

# Protect against the no tags case.
metric = 'gt:123.4|g|#{}\n'.format(global_tags_str) if global_tags_str else 'gt:123.4|g\n'
Expand All @@ -1813,7 +1813,7 @@ def test_dogstatsd_initialization_with_dd_env_service_version(self):
passed_tags = ['env:prod', 'version:def456', 'custom_tag:toad']
all_tags_str = ','.join([t for t in passed_tags + global_tags])
dogstatsd.gauge('gt', 123.4, tags=passed_tags)
dogstatsd.flush_buffered_metrics()
dogstatsd.flush()

metric = 'gt:123.4|g|#{}\n'.format(all_tags_str)
self.assertEqual(metric, dogstatsd.socket.recv())
Expand Down Expand Up @@ -1919,22 +1919,22 @@ def test_counter_with_container_field(self):
self.statsd._container_id = "ci-fake-container-id"

self.statsd.increment("page.views")
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry("page.views:1|c|c:ci-fake-container-id\n", self.recv(2))

self.statsd._reset_telemetry()
self.statsd.increment("page.views", 11)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry("page.views:11|c|c:ci-fake-container-id\n", self.recv(2))

self.statsd._reset_telemetry()
self.statsd.decrement("page.views")
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry("page.views:-1|c|c:ci-fake-container-id\n", self.recv(2))

self.statsd._reset_telemetry()
self.statsd.decrement("page.views", 12)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry("page.views:-12|c|c:ci-fake-container-id\n", self.recv(2))

self.statsd._container_id = None
Expand Down
Loading