Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMLII-2019] Max samples per context for Histogram, Distribution and Timing metrics (Experimental Feature) #863

Open
wants to merge 80 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
6de4f9b
WIP
andrewqian2001datadog Sep 9, 2024
c171911
add buffered_metrics object type (#853)
andrewqian2001datadog Sep 10, 2024
572da5c
Merge branch 'add-extended-aggregation' of github.com:DataDog/datadog…
andrewqian2001datadog Sep 10, 2024
79590b0
Merge branch 'master' into add-extended-aggregation
andrewqian2001datadog Oct 28, 2024
c112d5b
revert test config change
andrewqian2001datadog Oct 28, 2024
890c657
add buffered_metric_context WIP
andrewqian2001datadog Oct 29, 2024
e591bbb
Merge branch 'master' into add-extended-aggregation
andrewqian2001datadog Dec 4, 2024
4c2b238
change naming to sample
andrewqian2001datadog Dec 9, 2024
a84af9d
update tests
andrewqian2001datadog Dec 9, 2024
583e287
fix buffered_metric_context and aggregator, update tests
andrewqian2001datadog Dec 10, 2024
b01ed1d
use snake case
andrewqian2001datadog Dec 10, 2024
bb863c5
histograms, distribution and timing metrics are not aggregated, they …
andrewqian2001datadog Dec 12, 2024
ca4981d
remove max_metric_per_context, not in scope?
andrewqian2001datadog Dec 12, 2024
1091569
lint
andrewqian2001datadog Dec 12, 2024
606b271
fix lint
andrewqian2001datadog Dec 12, 2024
a7f2a56
Merge branch 'master' into add-extended-aggregation
andrewqian2001datadog Dec 12, 2024
a5d4b15
fix syntax
andrewqian2001datadog Dec 12, 2024
7cf00b1
replace secrets with random
andrewqian2001datadog Dec 12, 2024
92481b9
lint
andrewqian2001datadog Dec 12, 2024
6aa0243
update tests
andrewqian2001datadog Dec 12, 2024
58af7c0
lint
andrewqian2001datadog Dec 12, 2024
f5a4cd1
lint
andrewqian2001datadog Dec 12, 2024
3725fca
update test
andrewqian2001datadog Dec 12, 2024
2f8c3fe
test
andrewqian2001datadog Dec 12, 2024
26255de
test
andrewqian2001datadog Dec 13, 2024
44352ac
test2
andrewqian2001datadog Dec 13, 2024
02e3b23
remove comment
andrewqian2001datadog Dec 13, 2024
148ef17
base.py uses aggregator
andrewqian2001datadog Dec 13, 2024
c5d9563
lint
andrewqian2001datadog Dec 13, 2024
08091d5
lint
andrewqian2001datadog Dec 13, 2024
b482e72
timing metric can be aggregated
andrewqian2001datadog Dec 13, 2024
3f9168a
lint
andrewqian2001datadog Dec 13, 2024
a46b5d2
lint
andrewqian2001datadog Dec 13, 2024
1e6d213
remove prints
andrewqian2001datadog Dec 15, 2024
5bbf6c5
add test for testing maybe_keep_sample
andrewqian2001datadog Dec 15, 2024
6809466
fix flushing logic
andrewqian2001datadog Dec 15, 2024
4039b6b
fix tests
andrewqian2001datadog Dec 15, 2024
21b5e08
explictly check if rate is none
andrewqian2001datadog Dec 15, 2024
50337ba
add test for buffered metrics
andrewqian2001datadog Dec 17, 2024
fe0c521
rerun test
andrewqian2001datadog Dec 17, 2024
4dcee39
rerun tests 3x
andrewqian2001datadog Dec 17, 2024
0444e99
rerun tests x4
andrewqian2001datadog Dec 17, 2024
fc87fa5
what
andrewqian2001datadog Dec 17, 2024
f0c4db0
???
andrewqian2001datadog Dec 17, 2024
e7b62d2
rerun tests
andrewqian2001datadog Dec 17, 2024
9d4f24a
rerun tests
andrewqian2001datadog Dec 18, 2024
620ea1e
remove unused function
andrewqian2001datadog Dec 18, 2024
a54c136
Merge branch 'master' into add-extended-aggregation
andrewqian2001datadog Jan 2, 2025
7d78065
add flag for enabling/disabling extended aggregation
andrewqian2001datadog Jan 5, 2025
393201b
make max_metric_sampels configurable
andrewqian2001datadog Jan 5, 2025
dd7e743
remove unecessary lock
andrewqian2001datadog Jan 5, 2025
a442722
remove extended aggregation
andrewqian2001datadog Jan 6, 2025
5d872a3
remove extended aggregation
andrewqian2001datadog Jan 6, 2025
0e06501
remove test, not in scope
andrewqian2001datadog Jan 7, 2025
10e9ebe
lint
andrewqian2001datadog Jan 7, 2025
1e2eed8
rename max_metric_samples to max_metric_samples_per_context
andrewqian2001datadog Jan 7, 2025
bad35ba
rename buffered_metrics to max_sample_metric, change base.py so that …
andrewqian2001datadog Jan 13, 2025
bca7448
more renaming
andrewqian2001datadog Jan 13, 2025
90b083d
lint
andrewqian2001datadog Jan 13, 2025
22c2ac9
more renaming
andrewqian2001datadog Jan 13, 2025
08d150a
lint
andrewqian2001datadog Jan 13, 2025
dfd1a29
lint
andrewqian2001datadog Jan 13, 2025
5c86590
use statsd_max_samples_per_context to set the max samples per context…
andrewqian2001datadog Jan 13, 2025
0276bfa
lint
andrewqian2001datadog Jan 13, 2025
e50a85f
set max_samples_per_context through Aggregator constructor
andrewqian2001datadog Jan 14, 2025
f6d963d
add comments
andrewqian2001datadog Jan 14, 2025
4dbaba5
lint
andrewqian2001datadog Jan 14, 2025
b2f714b
remove print
andrewqian2001datadog Jan 14, 2025
b46b539
update base.py to not use new feature unless max_samples_per_context …
andrewqian2001datadog Jan 15, 2025
b80da4c
remove comment
andrewqian2001datadog Jan 15, 2025
c27bf8d
add flag for _report function to enable/disable sampling
andrewqian2001datadog Jan 15, 2025
52fd25f
fix one off error
andrewqian2001datadog Jan 15, 2025
ec9a487
remove unused code
andrewqian2001datadog Jan 15, 2025
363d940
use specified rate
andrewqian2001datadog Jan 16, 2025
f274b9e
prelocate data in array
andrewqian2001datadog Jan 16, 2025
810353a
change append
andrewqian2001datadog Jan 17, 2025
351a8cc
lint
andrewqian2001datadog Jan 17, 2025
a94afa5
lint x2
andrewqian2001datadog Jan 17, 2025
d334346
modify test
andrewqian2001datadog Jan 17, 2025
ac7c86d
rerun tests
andrewqian2001datadog Jan 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions datadog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def initialize(
statsd_use_default_route=False, # type: bool
statsd_socket_path=None, # type: Optional[str]
statsd_namespace=None, # type: Optional[str]
statsd_max_samples_per_context=0, # type: Optional[int]
statsd_constant_tags=None, # type: Optional[List[str]]
return_raw_response=False, # type: bool
hostname_from_config=True, # type: bool
Expand Down Expand Up @@ -82,8 +83,12 @@ def initialize(
(default: True).
:type statsd_disable_aggregation: boolean

:param statsd_max_samples_per_context: Set the max samples per context for Histogram,
Distribution and Timing metrics. Use with the statsd_disable_aggregation set to False.
:type statsd_max_samples_per_context: int

:param statsd_aggregation_flush_interval: If aggregation is enabled, set the flush interval for
aggregation/buffering
aggregation/buffering (This feature is experimental)
(default: 0.3 seconds)
:type statsd_aggregation_flush_interval: float

Expand Down Expand Up @@ -142,7 +147,7 @@ def initialize(
if statsd_disable_aggregation:
statsd.disable_aggregation()
else:
statsd.enable_aggregation(statsd_aggregation_flush_interval)
statsd.enable_aggregation(statsd_aggregation_flush_interval, statsd_max_samples_per_context)
statsd.disable_buffering = statsd_disable_buffering
api._return_raw_response = return_raw_response

Expand Down
49 changes: 48 additions & 1 deletion datadog/dogstatsd/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,28 @@
GaugeMetric,
SetMetric,
)
from datadog.dogstatsd.max_sample_metric import (
HistogramMetric,
DistributionMetric,
TimingMetric
)
from datadog.dogstatsd.metric_types import MetricType
from datadog.dogstatsd.max_sample_metric_context import MaxSampleMetricContexts


class Aggregator(object):
def __init__(self):
def __init__(self, max_samples_per_context=0):
self.max_samples_per_context = max_samples_per_context
self.metrics_map = {
MetricType.COUNT: {},
MetricType.GAUGE: {},
MetricType.SET: {},
}
self.max_sample_metric_map = {
MetricType.HISTOGRAM: MaxSampleMetricContexts(HistogramMetric),
MetricType.DISTRIBUTION: MaxSampleMetricContexts(DistributionMetric),
MetricType.TIMING: MaxSampleMetricContexts(TimingMetric)
}
self._locks = {
MetricType.COUNT: threading.RLock(),
MetricType.GAUGE: threading.RLock(),
Expand All @@ -30,6 +42,17 @@ def flush_aggregated_metrics(self):
metrics.extend(metric.get_data() if isinstance(metric, SetMetric) else [metric])
return metrics

def set_max_samples_per_context(self, max_samples_per_context=0):
self.max_samples_per_context = max_samples_per_context

def flush_aggregated_sampled_metrics(self):
metrics = []
for metric_type in self.max_sample_metric_map.keys():
metric_context = self.max_sample_metric_map[metric_type]
for metricList in metric_context.flush():
metrics.extend(metricList)
return metrics

def get_context(self, name, tags):
tags_str = ",".join(tags) if tags is not None else ""
return "{}:{}".format(name, tags_str)
Expand Down Expand Up @@ -60,3 +83,27 @@ def add_metric(
self.metrics_map[metric_type][context] = metric_class(
name, value, tags, rate, timestamp
)

def histogram(self, name, value, tags, rate):
return self.add_max_sample_metric(
MetricType.HISTOGRAM, name, value, tags, rate
)

def distribution(self, name, value, tags, rate):
return self.add_max_sample_metric(
MetricType.DISTRIBUTION, name, value, tags, rate
)

def timing(self, name, value, tags, rate):
return self.add_max_sample_metric(
MetricType.TIMING, name, value, tags, rate
)

def add_max_sample_metric(
self, metric_type, name, value, tags, rate
):
if rate is None:
rate = 1
context_key = self.get_context(name, tags)
metric_context = self.max_sample_metric_map[metric_type]
return metric_context.sample(name, value, tags, rate, context_key, self.max_samples_per_context)
45 changes: 34 additions & 11 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def __init__(
telemetry_port=None, # type: Union[str, int]
telemetry_socket_path=None, # type: Text
max_buffer_len=0, # type: int
max_metric_samples_per_context=0, # type: int
container_id=None, # type: Optional[Text]
origin_detection_enabled=True, # type: bool
socket_timeout=0, # type: Optional[float]
Expand Down Expand Up @@ -236,9 +237,14 @@ def __init__(
it overrides the default value.
:type flush_interval: float

:disable_aggregation: If true, metrics (Count, Gauge, Set) are no longered aggregated by the client
:disable_aggregation: If true, metrics (Count, Gauge, Set) are no longer aggregated by the client
:type disable_aggregation: bool

:max_metric_samples_per_context: Sets the maximum amount of samples for Histogram, Distribution
and Timings metrics (default 0). This feature should be used alongside aggregation. This feature
is experimental.
:type max_metric_samples_per_context: int

:disable_buffering: If set, metrics are no longered buffered by the client and
all data is sent synchronously to the server
:type disable_buffering: bool
Expand Down Expand Up @@ -450,7 +456,7 @@ def __init__(
self._flush_interval = flush_interval
self._flush_thread = None
self._flush_thread_stop = threading.Event()
self.aggregator = Aggregator()
self.aggregator = Aggregator(max_metric_samples_per_context)
# Indicates if the process is about to fork, so we shouldn't start any new threads yet.
self._forking = False

Expand Down Expand Up @@ -643,10 +649,11 @@ def disable_aggregation(self):
self._stop_flush_thread()
log.debug("Statsd aggregation is disabled")

def enable_aggregation(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL):
def enable_aggregation(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL, max_samples_per_context=0):
with self._config_lock:
if not self._disable_aggregation:
return
self.aggregator.set_max_samples_per_context(max_samples_per_context)
self._disable_aggregation = False
self._flush_interval = flush_interval
if self._disable_buffering:
Expand Down Expand Up @@ -826,6 +833,10 @@ def flush_aggregated_metrics(self):
for m in metrics:
self._report(m.name, m.metric_type, m.value, m.tags, m.rate, m.timestamp)

sampled_metrics = self.aggregator.flush_aggregated_sampled_metrics()
for m in sampled_metrics:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: While this is definitely fully functional, I would be a bit more comfortable if _report was modified to allow the option to not internally sample. That way we only have one location to worry about changes to "are we disabled", "how do we handle constant tags", "how do we handle telemetry", etc.

self._report(m.name, m.metric_type, m.value, m.tags, m.rate, m.timestamp, False)

def gauge(
self,
metric, # type: Text
Expand Down Expand Up @@ -960,7 +971,12 @@ def histogram(
>>> statsd.histogram("uploaded.file.size", 1445)
>>> statsd.histogram("album.photo.count", 26, tags=["gender:female"])
"""
self._report(metric, "h", value, tags, sample_rate)
if not self._disable_aggregation and self.aggregator.max_samples_per_context != 0:
print("Aggregated histogram")
self.aggregator.histogram(metric, value, tags, sample_rate)
else:
print("Regular histogram")
self._report(metric, "h", value, tags, sample_rate)

def distribution(
self,
Expand All @@ -975,7 +991,10 @@ def distribution(
>>> statsd.distribution("uploaded.file.size", 1445)
>>> statsd.distribution("album.photo.count", 26, tags=["gender:female"])
"""
self._report(metric, "d", value, tags, sample_rate)
if not self._disable_aggregation and self.aggregator.max_samples_per_context != 0:
self.aggregator.distribution(metric, value, tags, sample_rate)
else:
self._report(metric, "d", value, tags, sample_rate)

def timing(
self,
Expand All @@ -989,7 +1008,10 @@ def timing(

>>> statsd.timing("query.response.time", 1234)
"""
self._report(metric, "ms", value, tags, sample_rate)
if not self._disable_aggregation and self.aggregator.max_samples_per_context != 0:
self.aggregator.timing(metric, value, tags, sample_rate)
else:
self._report(metric, "ms", value, tags, sample_rate)

def timed(self, metric=None, tags=None, sample_rate=None, use_ms=None):
"""
Expand Down Expand Up @@ -1093,7 +1115,7 @@ def _serialize_metric(
("|T" + text(timestamp)) if timestamp > 0 else "",
)

def _report(self, metric, metric_type, value, tags, sample_rate, timestamp=0):
def _report(self, metric, metric_type, value, tags, sample_rate, timestamp=0, sampling=True):
"""
Create a metric packet and send it.

Expand All @@ -1109,11 +1131,12 @@ def _report(self, metric, metric_type, value, tags, sample_rate, timestamp=0):
if self._telemetry:
self.metrics_count += 1

if sample_rate is None:
sample_rate = self.default_sample_rate
if sampling:
if sample_rate is None:
sample_rate = self.default_sample_rate

if sample_rate != 1 and random() > sample_rate:
return
if sample_rate != 1 and random() > sample_rate:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Code Vulnerability

do not use random (...read more)

Make sure to use values that are actually random. The random module in Python should generally not be used and replaced with the secrets module, as noted in the official Python documentation.

Learn More

View in Datadog  Leave us feedback  Documentation

return
# timestamps (protocol v1.3) only allowed on gauges and counts
allows_timestamp = metric_type == MetricType.GAUGE or metric_type == MetricType.COUNT

Expand Down
60 changes: 60 additions & 0 deletions datadog/dogstatsd/max_sample_metric.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import random
from datadog.dogstatsd.metric_types import MetricType
from datadog.dogstatsd.metrics import MetricAggregator


class MaxSampleMetric(object):
def __init__(self, name, tags, metric_type, specified_rate=1.0, max_metric_samples=0):
self.name = name
self.tags = tags
self.metric_type = metric_type
self.max_metric_samples = max_metric_samples
self.specified_rate = specified_rate
self.data = [None] * max_metric_samples if max_metric_samples > 0 else []
self.stored_metric_samples = 0
self.total_metric_samples = 0

def sample(self, value):
if self.max_metric_samples == 0:
self.data.append(value)
else:
self.data[self.stored_metric_samples] = value
self.stored_metric_samples += 1
self.total_metric_samples += 1

def maybe_keep_sample(self, value):
if self.max_metric_samples > 0:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All or most of this function will need lock protections, a number of internal variables like stored metrics and the data array are potentially being updated by multiple threads.

self.total_metric_samples += 1
if self.stored_metric_samples < self.max_metric_samples:
self.data[self.stored_metric_samples] = value
self.stored_metric_samples += 1
else:
i = random.randint(0, self.total_metric_samples - 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like an off-by-one - if max_metric_samples is five, then the sixth sample should have a chance to not be persisted to the data array. It looks like we're persisting it 100% of the time here.

if i < self.max_metric_samples:
self.data[i] = value
else:
self.sample(value)

def skip_sample(self):
self.total_metric_samples += 1

def flush(self):
return [
MetricAggregator(self.name, self.tags, self.specified_rate, self.metric_type, value)
for value in self.data
]


class HistogramMetric(MaxSampleMetric):
def __init__(self, name, tags, rate=1.0, max_metric_samples=0):
super(HistogramMetric, self).__init__(name, tags, MetricType.HISTOGRAM, rate, max_metric_samples)


class DistributionMetric(MaxSampleMetric):
def __init__(self, name, tags, rate=1.0, max_metric_samples=0):
super(DistributionMetric, self).__init__(name, tags, MetricType.DISTRIBUTION, rate, max_metric_samples)


class TimingMetric(MaxSampleMetric):
def __init__(self, name, tags, rate=1.0, max_metric_samples=0):
super(TimingMetric, self).__init__(name, tags, MetricType.TIMING, rate, max_metric_samples)
40 changes: 40 additions & 0 deletions datadog/dogstatsd/max_sample_metric_context.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from threading import Lock
import random


class MaxSampleMetricContexts:
def __init__(self, max_sample_metric_type):
self.lock = Lock()
self.values = {}
self.max_sample_metric_type = max_sample_metric_type

def flush(self):
metrics = []
"""Flush the metrics and reset the stored values."""
with self.lock:
copiedValues = self.values.copy()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't look to me like the copy and the clear are necessary - the subsequent assignment of values to an empty dict should be generally equivalent to the clear for our use case, and without a clear call there's no need to shallow copy the dict.

self.values.clear()
self.values = {}
for _, metric in copiedValues.items():
metrics.append(metric.flush())

return metrics

def sample(self, name, value, tags, rate, context_key, max_samples_per_context):
"""Sample a metric and store it if it meets the criteria."""
keeping_sample = self.should_sample(rate)
with self.lock:
if context_key not in self.values:
# Create a new metric if it doesn't exist
self.values[context_key] = self.max_sample_metric_type(name, tags, rate, max_samples_per_context)
metric = self.values[context_key]
if keeping_sample:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we're likely racing the flush to persist this correctly. Operating flow:
Thread in sample pulls a metric from the values dict and gets pre-empted after dropping the metric_context's lock.
Separate thread enters flush, pulls the metric from values and removes further reference to it, then flushes the metric.
Original thread wakes back up and continues operating on the metric object that it had pulled, unaware that the metric had already been flushed and any additional samples added will be ignored.

metric.maybe_keep_sample(value)
else:
metric.skip_sample()

def should_sample(self, rate):
"""Determine if a sample should be kept based on the specified rate."""
if rate >= 1:
return True
return random.random() < rate
andrewqian2001datadog marked this conversation as resolved.
Show resolved Hide resolved
3 changes: 3 additions & 0 deletions datadog/dogstatsd/metric_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@ class MetricType:
COUNT = "c"
GAUGE = "g"
SET = "s"
HISTOGRAM = "h"
DISTRIBUTION = "d"
TIMING = "ms"
Loading
Loading