Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMLII-2019] Max samples per context for Histogram, Distribution and Timing metrics (Experimental Feature) #863

Open
wants to merge 80 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
6de4f9b
WIP
andrewqian2001datadog Sep 9, 2024
c171911
add buffered_metrics object type (#853)
andrewqian2001datadog Sep 10, 2024
572da5c
Merge branch 'add-extended-aggregation' of github.com:DataDog/datadog…
andrewqian2001datadog Sep 10, 2024
79590b0
Merge branch 'master' into add-extended-aggregation
andrewqian2001datadog Oct 28, 2024
c112d5b
revert test config change
andrewqian2001datadog Oct 28, 2024
890c657
add buffered_metric_context WIP
andrewqian2001datadog Oct 29, 2024
e591bbb
Merge branch 'master' into add-extended-aggregation
andrewqian2001datadog Dec 4, 2024
4c2b238
change naming to sample
andrewqian2001datadog Dec 9, 2024
a84af9d
update tests
andrewqian2001datadog Dec 9, 2024
583e287
fix buffered_metric_context and aggregator, update tests
andrewqian2001datadog Dec 10, 2024
b01ed1d
use snake case
andrewqian2001datadog Dec 10, 2024
bb863c5
histograms, distribution and timing metrics are not aggregated, they …
andrewqian2001datadog Dec 12, 2024
ca4981d
remove max_metric_per_context, not in scope?
andrewqian2001datadog Dec 12, 2024
1091569
lint
andrewqian2001datadog Dec 12, 2024
606b271
fix lint
andrewqian2001datadog Dec 12, 2024
a7f2a56
Merge branch 'master' into add-extended-aggregation
andrewqian2001datadog Dec 12, 2024
a5d4b15
fix syntax
andrewqian2001datadog Dec 12, 2024
7cf00b1
replace secrets with random
andrewqian2001datadog Dec 12, 2024
92481b9
lint
andrewqian2001datadog Dec 12, 2024
6aa0243
update tests
andrewqian2001datadog Dec 12, 2024
58af7c0
lint
andrewqian2001datadog Dec 12, 2024
f5a4cd1
lint
andrewqian2001datadog Dec 12, 2024
3725fca
update test
andrewqian2001datadog Dec 12, 2024
2f8c3fe
test
andrewqian2001datadog Dec 12, 2024
26255de
test
andrewqian2001datadog Dec 13, 2024
44352ac
test2
andrewqian2001datadog Dec 13, 2024
02e3b23
remove comment
andrewqian2001datadog Dec 13, 2024
148ef17
base.py uses aggregator
andrewqian2001datadog Dec 13, 2024
c5d9563
lint
andrewqian2001datadog Dec 13, 2024
08091d5
lint
andrewqian2001datadog Dec 13, 2024
b482e72
timing metric can be aggregated
andrewqian2001datadog Dec 13, 2024
3f9168a
lint
andrewqian2001datadog Dec 13, 2024
a46b5d2
lint
andrewqian2001datadog Dec 13, 2024
1e6d213
remove prints
andrewqian2001datadog Dec 15, 2024
5bbf6c5
add test for testing maybe_keep_sample
andrewqian2001datadog Dec 15, 2024
6809466
fix flushing logic
andrewqian2001datadog Dec 15, 2024
4039b6b
fix tests
andrewqian2001datadog Dec 15, 2024
21b5e08
explictly check if rate is none
andrewqian2001datadog Dec 15, 2024
50337ba
add test for buffered metrics
andrewqian2001datadog Dec 17, 2024
fe0c521
rerun test
andrewqian2001datadog Dec 17, 2024
4dcee39
rerun tests 3x
andrewqian2001datadog Dec 17, 2024
0444e99
rerun tests x4
andrewqian2001datadog Dec 17, 2024
fc87fa5
what
andrewqian2001datadog Dec 17, 2024
f0c4db0
???
andrewqian2001datadog Dec 17, 2024
e7b62d2
rerun tests
andrewqian2001datadog Dec 17, 2024
9d4f24a
rerun tests
andrewqian2001datadog Dec 18, 2024
620ea1e
remove unused function
andrewqian2001datadog Dec 18, 2024
a54c136
Merge branch 'master' into add-extended-aggregation
andrewqian2001datadog Jan 2, 2025
7d78065
add flag for enabling/disabling extended aggregation
andrewqian2001datadog Jan 5, 2025
393201b
make max_metric_sampels configurable
andrewqian2001datadog Jan 5, 2025
dd7e743
remove unecessary lock
andrewqian2001datadog Jan 5, 2025
a442722
remove extended aggregation
andrewqian2001datadog Jan 6, 2025
5d872a3
remove extended aggregation
andrewqian2001datadog Jan 6, 2025
0e06501
remove test, not in scope
andrewqian2001datadog Jan 7, 2025
10e9ebe
lint
andrewqian2001datadog Jan 7, 2025
1e2eed8
rename max_metric_samples to max_metric_samples_per_context
andrewqian2001datadog Jan 7, 2025
bad35ba
rename buffered_metrics to max_sample_metric, change base.py so that …
andrewqian2001datadog Jan 13, 2025
bca7448
more renaming
andrewqian2001datadog Jan 13, 2025
90b083d
lint
andrewqian2001datadog Jan 13, 2025
22c2ac9
more renaming
andrewqian2001datadog Jan 13, 2025
08d150a
lint
andrewqian2001datadog Jan 13, 2025
dfd1a29
lint
andrewqian2001datadog Jan 13, 2025
5c86590
use statsd_max_samples_per_context to set the max samples per context…
andrewqian2001datadog Jan 13, 2025
0276bfa
lint
andrewqian2001datadog Jan 13, 2025
e50a85f
set max_samples_per_context through Aggregator constructor
andrewqian2001datadog Jan 14, 2025
f6d963d
add comments
andrewqian2001datadog Jan 14, 2025
4dbaba5
lint
andrewqian2001datadog Jan 14, 2025
b2f714b
remove print
andrewqian2001datadog Jan 14, 2025
b46b539
update base.py to not use new feature unless max_samples_per_context …
andrewqian2001datadog Jan 15, 2025
b80da4c
remove comment
andrewqian2001datadog Jan 15, 2025
c27bf8d
add flag for _report function to enable/disable sampling
andrewqian2001datadog Jan 15, 2025
52fd25f
fix one off error
andrewqian2001datadog Jan 15, 2025
ec9a487
remove unused code
andrewqian2001datadog Jan 15, 2025
363d940
use specified rate
andrewqian2001datadog Jan 16, 2025
f274b9e
prelocate data in array
andrewqian2001datadog Jan 16, 2025
810353a
change append
andrewqian2001datadog Jan 17, 2025
351a8cc
lint
andrewqian2001datadog Jan 17, 2025
a94afa5
lint x2
andrewqian2001datadog Jan 17, 2025
d334346
modify test
andrewqian2001datadog Jan 17, 2025
ac7c86d
rerun tests
andrewqian2001datadog Jan 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions datadog/dogstatsd/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@
GaugeMetric,
SetMetric,
)
from datadog.dogstatsd.buffered_metrics import (
HistogramMetric,
DistributionMetric,
TimingMetric
)
from datadog.dogstatsd.metric_types import MetricType
from datadog.dogstatsd.buffered_metrics_context import BufferedMetricContexts


class Aggregator(object):
Expand All @@ -14,10 +20,18 @@ def __init__(self):
MetricType.GAUGE: {},
MetricType.SET: {},
}
self.buffered_metrics_map = {
MetricType.HISTOGRAM: BufferedMetricContexts(HistogramMetric),
MetricType.DISTRIBUTION: BufferedMetricContexts(DistributionMetric),
MetricType.TIMING: BufferedMetricContexts(TimingMetric)
}
self._locks = {
MetricType.COUNT: threading.RLock(),
MetricType.GAUGE: threading.RLock(),
MetricType.SET: threading.RLock(),
MetricType.HISTOGRAM: threading.RLock(),
MetricType.DISTRIBUTION: threading.RLock(),
MetricType.TIMING: threading.RLock(),
}

def flush_aggregated_metrics(self):
Expand All @@ -28,6 +42,23 @@ def flush_aggregated_metrics(self):
self.metrics_map[metric_type] = {}
for metric in current_metrics.values():
metrics.extend(metric.get_data() if isinstance(metric, SetMetric) else [metric])

for metric_type in self.buffered_metrics_map.keys():
with self._locks[metric_type]:
andrewqian2001datadog marked this conversation as resolved.
Show resolved Hide resolved
metric_context = self.buffered_metrics_map[metric_type]
self.buffered_metrics_map[metric_type] = {}
for metricList in metric_context.flush():
metrics.extend(metricList)
return metrics

def flush_aggregated_buffered_metrics(self):
metrics = []
for metric_type in self.buffered_metrics_map.keys():
with self._locks[metric_type]:
current_metrics = self.buffered_metrics_map[metric_type]
self.buffered_metrics_map[metric_type] = {}
for metric in current_metrics.values():
metrics.append(metric)
return metrics

def get_context(self, name, tags):
Expand Down Expand Up @@ -60,3 +91,27 @@ def add_metric(
self.metrics_map[metric_type][context] = metric_class(
name, value, tags, rate, timestamp
)

def histogram(self, name, value, tags, rate):
return self.add_buffered_metric(
MetricType.HISTOGRAM, name, value, tags, rate
)

def distribution(self, name, value, tags, rate):
return self.add_buffered_metric(
MetricType.DISTRIBUTION, name, value, tags, rate
)

def timing(self, name, value, tags, rate):
return self.add_buffered_metric(
MetricType.TIMING, name, value, tags, rate
)

def add_buffered_metric(
self, metric_type, name, value, tags, rate
):
context_key = self.get_context(name, tags)
metric_context = self.buffered_metrics_map[metric_type]
return metric_context.sample(name, value, tags, rate, context_key)


7 changes: 6 additions & 1 deletion datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,8 @@ def _flush_thread_loop(self, flush_interval):
time.sleep(flush_interval)
if not self._disable_aggregation:
self.flush_aggregated_metrics()
# Histograms, Distribution and Timing metrics are not aggregated
self.flush_buffered_metrics()
if not self._disable_buffering:
self.flush_buffered_metrics()
self._flush_thread = threading.Thread(
Expand Down Expand Up @@ -1127,7 +1129,10 @@ def _report(self, metric, metric_type, value, tags, sample_rate, timestamp=0):
)

# Send it
self._send(payload)
if metric_type == MetricType.DISTRIBUTION or metric_type == MetricType.HISTOGRAM or metric_type == MetricType.TIMING:
self._send_to_buffer(payload)
else:
self._send(payload)

def _reset_telemetry(self):
self.metrics_count = 0
Expand Down
68 changes: 68 additions & 0 deletions datadog/dogstatsd/buffered_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import random
from datadog.dogstatsd.metric_types import MetricType
from datadog.dogstatsd.metrics import MetricAggregator


class BufferedMetric(object):
def __init__(self, name, tags, metric_type, specified_rate=1.0, max_metric_samples=0):
andrewqian2001datadog marked this conversation as resolved.
Show resolved Hide resolved
self.name = name
self.tags = tags
self.metric_type = metric_type
self.max_metric_samples = max_metric_samples
self.specified_rate = specified_rate
self.data = []
self.stored_metric_samples = 1
self.total_metric_samples = 1

def sample(self, value):
self.data.append(value)
self.stored_metric_samples += 1
self.total_metric_samples += 1

def maybe_keep_sample(self, value):
andrewqian2001datadog marked this conversation as resolved.
Show resolved Hide resolved
print("max metric samples is ", self.max_metric_samples)
print("stored metric samples is ", self.stored_metric_samples)
if self.max_metric_samples > 0:
if self.stored_metric_samples >= self.max_metric_samples:
i = random.randint(0, self.total_metric_samples - 1)
if i < self.max_metric_samples:
print("REPLACE")
self.data[i] = value
else:
print("APPEND")
self.data.append(value)
self.stored_metric_samples += 1
self.total_metric_samples += 1
else:
print("APPEND2")
self.sample(value)

def skip_sample(self):
self.total_metric_samples += 1

def flush(self):
total_metric_samples = self.total_metric_samples
if self.specified_rate != 1.0:
rate = self.specified_rate
else:
rate = self.stored_metric_samples / total_metric_samples

return [
MetricAggregator(self.name, self.tags, rate, self.metric_type, value)
for value in self.data
]


class HistogramMetric(BufferedMetric):
def __init__(self, name, tags, rate=1.0, max_metric_samples=0):
super(HistogramMetric, self).__init__(name, tags, MetricType.HISTOGRAM, rate, max_metric_samples)


class DistributionMetric(BufferedMetric):
def __init__(self, name, tags, rate=1.0, max_metric_samples=0):
super(DistributionMetric, self).__init__(name, tags, MetricType.DISTRIBUTION, rate, max_metric_samples)


class TimingMetric(BufferedMetric):
def __init__(self, name, tags, rate=1.0, max_metric_samples=0):
super(TimingMetric, self).__init__(name, tags, MetricType.TIMING, rate, max_metric_samples)
46 changes: 46 additions & 0 deletions datadog/dogstatsd/buffered_metrics_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from threading import Lock
import random


class BufferedMetricContexts:
def __init__(self, buffered_metric_type):
self.nb_context = 0
self.lock = Lock()
self.values = {}
self.buffered_metric_type = buffered_metric_type

def flush(self):
metrics = []
"""Flush the metrics and reset the stored values."""
with self.lock:
values = self.values.copy()
self.values.clear()

for _, metric in values.items():
metrics.append(metric.flush())

self.nb_context += len(values)
return metrics

def sample(self, name, value, tags, rate, context_key):
"""Sample a metric and store it if it meets the criteria."""
keeping_sample = self.should_sample(rate)
with self.lock:
if context_key not in self.values:
# Create a new metric if it doesn't exist
self.values[context_key] = self.buffered_metric_type(name, tags, rate)
metric = self.values[context_key]
if keeping_sample:
metric.maybe_keep_sample(value)
else:
metric.skip_sample()

def should_sample(self, rate):
"""Determine if a sample should be kept based on the specified rate."""
if rate >= 1:
return True
return random.random() < rate # Replace `secrets` with `random`
andrewqian2001datadog marked this conversation as resolved.
Show resolved Hide resolved

def get_nb_context(self):
"""Return the number of contexts."""
return self.nb_context
3 changes: 3 additions & 0 deletions datadog/dogstatsd/metric_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@ class MetricType:
COUNT = "c"
GAUGE = "g"
SET = "s"
HISTOGRAM = "h"
DISTRIBUTION = "d"
TIMING = "ms"
71 changes: 47 additions & 24 deletions tests/unit/dogstatsd/test_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,30 @@ def setUp(self):

def test_aggregator_sample(self):
tags = ["tag1", "tag2"]
for _ in range(2):
self.aggregator.gauge("gaugeTest", 21, tags, 1)
self.assertEqual(len(self.aggregator.metrics_map[MetricType.GAUGE]), 1)
self.assertIn("gaugeTest:tag1,tag2", self.aggregator.metrics_map[MetricType.GAUGE])

self.aggregator.gauge("gaugeTest", 21, tags, 1)
self.assertEqual(len(self.aggregator.metrics_map[MetricType.GAUGE]), 1)
self.assertIn("gaugeTest:tag1,tag2", self.aggregator.metrics_map[MetricType.GAUGE])
self.aggregator.count("countTest", 21, tags, 1)
self.assertEqual(len(self.aggregator.metrics_map[MetricType.COUNT]), 1)
self.assertIn("countTest:tag1,tag2", self.aggregator.metrics_map[MetricType.COUNT])

self.aggregator.count("countTest", 21, tags, 1)
self.assertEqual(len(self.aggregator.metrics_map[MetricType.COUNT]), 1)
self.assertIn("countTest:tag1,tag2", self.aggregator.metrics_map[MetricType.COUNT])
self.aggregator.set("setTest", "value1", tags, 1)
self.assertEqual(len(self.aggregator.metrics_map[MetricType.SET]), 1)
self.assertIn("setTest:tag1,tag2", self.aggregator.metrics_map[MetricType.SET])

self.aggregator.set("setTest", "value1", tags, 1)
self.assertEqual(len(self.aggregator.metrics_map[MetricType.SET]), 1)
self.assertIn("setTest:tag1,tag2", self.aggregator.metrics_map[MetricType.SET])
self.aggregator.histogram("histogramTest", 21, tags, 1)
self.assertEqual(len(self.aggregator.buffered_metrics_map[MetricType.HISTOGRAM].values), 1)
self.assertIn("histogramTest:tag1,tag2", self.aggregator.buffered_metrics_map[MetricType.HISTOGRAM].values)

self.aggregator.gauge("gaugeTest", 123, tags, 1)
self.assertEqual(len(self.aggregator.metrics_map[MetricType.GAUGE]), 1)
self.assertIn("gaugeTest:tag1,tag2", self.aggregator.metrics_map[MetricType.GAUGE])
self.aggregator.distribution("distributionTest", 21, tags, 1)
self.assertEqual(len(self.aggregator.buffered_metrics_map[MetricType.DISTRIBUTION].values), 1)
self.assertIn("distributionTest:tag1,tag2", self.aggregator.buffered_metrics_map[MetricType.DISTRIBUTION].values)

self.aggregator.count("countTest", 10, tags, 1)
self.assertEqual(len(self.aggregator.metrics_map[MetricType.COUNT]), 1)
self.assertIn("countTest:tag1,tag2", self.aggregator.metrics_map[MetricType.COUNT])

self.aggregator.set("setTest", "value1", tags, 1)
self.assertEqual(len(self.aggregator.metrics_map[MetricType.SET]), 1)
self.assertIn("setTest:tag1,tag2", self.aggregator.metrics_map[MetricType.SET])
self.aggregator.timing("timingTest", 21, tags, 1)
self.assertEqual(len(self.aggregator.buffered_metrics_map[MetricType.TIMING].values), 1)
self.assertIn("timingTest:tag1,tag2", self.aggregator.buffered_metrics_map[MetricType.TIMING].values)

def test_aggregator_flush(self):
tags = ["tag1", "tag2"]
Expand All @@ -50,29 +50,52 @@ def test_aggregator_flush(self):
self.aggregator.set("setTest1", "value2", tags, 1)
self.aggregator.set("setTest2", "value1", tags, 1)

self.aggregator.histogram("histogramTest1", 21, tags, 1)
self.aggregator.histogram("histogramTest1", 22, tags, 1)
self.aggregator.histogram("histogramTest2", 23, tags, 1)

self.aggregator.distribution("distributionTest1", 21, tags, 1)
self.aggregator.distribution("distributionTest1", 22, tags, 1)
self.aggregator.distribution("distributionTest2", 23, tags, 1)

self.aggregator.timing("timingTest1", 21, tags, 1)
self.aggregator.timing("timingTest1", 22, tags, 1)
self.aggregator.timing("timingTest2", 23, tags, 1)

metrics = self.aggregator.flush_aggregated_metrics()
self.assertEqual(len(self.aggregator.metrics_map[MetricType.GAUGE]), 0)
self.assertEqual(len(self.aggregator.metrics_map[MetricType.COUNT]), 0)
self.assertEqual(len(self.aggregator.metrics_map[MetricType.SET]), 0)

self.assertEqual(len(metrics), 7)
self.assertEqual(len(self.aggregator.buffered_metrics_map[MetricType.HISTOGRAM]), 0)
self.assertEqual(len(self.aggregator.buffered_metrics_map[MetricType.DISTRIBUTION]), 0)
self.assertEqual(len(self.aggregator.buffered_metrics_map[MetricType.TIMING]), 0)
self.assertEqual(len(metrics), 16)
metrics.sort(key=lambda m: (m.metric_type, m.name, m.value))

expected_metrics = [
{"metric_type": MetricType.COUNT, "name": "countTest1", "tags": tags, "rate": 1, "value": 31, "timestamp": 0},
{"metric_type": MetricType.COUNT, "name": "countTest2", "tags": tags, "rate": 1, "value": 1, "timestamp": 0},
{"metric_type": MetricType.DISTRIBUTION, "name": "distributionTest1", "tags": tags, "rate": 1, "value": 21},
{"metric_type": MetricType.DISTRIBUTION, "name": "distributionTest1", "tags": tags, "rate": 1, "value": 22},
{"metric_type": MetricType.DISTRIBUTION, "name": "distributionTest2", "tags": tags, "rate": 1, "value": 23},
{"metric_type": MetricType.GAUGE, "name": "gaugeTest1", "tags": tags, "rate": 1, "value": 10, "timestamp": 0},
{"metric_type": MetricType.GAUGE, "name": "gaugeTest2", "tags": tags, "rate": 1, "value": 15, "timestamp": 0},
{"metric_type": MetricType.HISTOGRAM, "name": "histogramTest1", "tags": tags, "rate": 1, "value": 21},
{"metric_type": MetricType.HISTOGRAM, "name": "histogramTest1", "tags": tags, "rate": 1, "value": 22},
{"metric_type": MetricType.HISTOGRAM, "name": "histogramTest2", "tags": tags, "rate": 1, "value": 23},
{"metric_type": MetricType.TIMING, "name": "timingTest1", "tags": tags, "rate": 1, "value": 21},
{"metric_type": MetricType.TIMING, "name": "timingTest1", "tags": tags, "rate": 1, "value": 22},
{"metric_type": MetricType.TIMING, "name": "timingTest2", "tags": tags, "rate": 1, "value": 23},
{"metric_type": MetricType.SET, "name": "setTest1", "tags": tags, "rate": 1, "value": "value1", "timestamp": 0},
{"metric_type": MetricType.SET, "name": "setTest1", "tags": tags, "rate": 1, "value": "value2", "timestamp": 0},
{"metric_type": MetricType.SET, "name": "setTest2", "tags": tags, "rate": 1, "value": "value1", "timestamp": 0},
]

for metric, expected in zip(metrics, expected_metrics):
self.assertEqual(metric.name, expected["name"])
self.assertEqual(metric.tags, expected["tags"])
self.assertEqual(metric.rate, expected["rate"])
self.assertEqual(metric.value, expected["value"])



if __name__ == '__main__':
unittest.main()
Loading
Loading