forked from DataDog/datadog-agent
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdemultiplexer.go
226 lines (194 loc) · 9.03 KB
/
demultiplexer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.
package aggregator
import (
"time"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/metrics"
"github.com/DataDog/datadog-agent/pkg/aggregator/sender"
agentruntime "github.com/DataDog/datadog-agent/pkg/runtime"
"github.com/DataDog/datadog-agent/pkg/serializer"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
const (
// AutoAdjustStrategyMaxThroughput will adapt the number of pipelines for maximum throughput
AutoAdjustStrategyMaxThroughput = "max_throughput"
// AutoAdjustStrategyPerOrigin will adapt the number of pipelines for better container isolation
AutoAdjustStrategyPerOrigin = "per_origin"
)
// Demultiplexer is composed of multiple samplers (check and time/dogstatsd)
// a shared forwarder, the event platform forwarder, orchestrator data buffers
// and other data that need to be sent to the forwarders.
// AgentDemultiplexerOptions let you configure which forwarders have to be started.
type Demultiplexer interface {
// General
// --
// Serializer returns the serializer used by the Demultiplexer instance.
Serializer() serializer.MetricSerializer
// Samples API
// --
// AggregateSample sends a MetricSample to the DogStatsD time sampler.
// In sharded implementation, the metric is sent to the first time sampler.
AggregateSample(sample metrics.MetricSample)
// AggregateSamples sends a batch of MetricSample to the given DogStatsD
// time sampler shard.
// Implementation not supporting sharding may ignore the `shard` parameter.
AggregateSamples(shard TimeSamplerID, samples metrics.MetricSampleBatch)
// SendSamplesWithoutAggregation pushes metrics in the no-aggregation pipeline: a pipeline
// where the metrics are not sampled and sent as-is.
// This is the method to use to send metrics with a valid timestamp attached.
SendSamplesWithoutAggregation(metrics metrics.MetricSampleBatch)
// ForceFlushToSerializer flushes all the aggregated data from the different samplers to
// the serialization/forwarding parts.
ForceFlushToSerializer(start time.Time, waitForSerializer bool)
// GetMetricSamplePool returns a shared resource used in the whole DogStatsD
// pipeline to re-use metric samples slices: the server is getting a slice
// and filling it with samples, the rest of the pipeline process them the
// end of line (the time sampler) is putting back the slice in the pool.
// Main idea is to reduce the garbage generated by slices allocation.
GetMetricSamplePool() *metrics.MetricSamplePool
// Senders API, mainly used by collectors/checks
// --
sender.SenderManager
}
// trigger be used to trigger something in the TimeSampler or the BufferedAggregator.
// If `blockChan` is not nil, a message is expected on this chan when the action is done.
// See `flushTrigger` to see the usage in a flush trigger.
type trigger struct {
time time.Time
// if not nil, the flusher will send a message in this chan when the flush is complete.
blockChan chan struct{}
// used by the BufferedAggregator to know if serialization of events,
// service checks and such have to be waited for before returning
// from Flush()
waitForSerializer bool
}
// flushTrigger is a trigger used to flush data, results is expected to be written
// in flushedSeries (or seriesSink depending on the implementation) and flushedSketches.
type flushTrigger struct {
trigger
sketchesSink metrics.SketchesSink
seriesSink metrics.SerieSink
}
func createIterableMetrics(
flushAndSerializeInParallel FlushAndSerializeInParallel,
serializer serializer.MetricSerializer,
logPayloads bool,
isServerless bool,
) (*metrics.IterableSeries, *metrics.IterableSketches) {
var series *metrics.IterableSeries
var sketches *metrics.IterableSketches
if serializer.AreSeriesEnabled() {
series = metrics.NewIterableSeries(func(se *metrics.Serie) {
if logPayloads {
log.Debugf("Flushing serie: %s", se)
}
tagsetTlm.updateHugeSerieTelemetry(se)
}, flushAndSerializeInParallel.BufferSize, flushAndSerializeInParallel.ChannelSize)
}
if serializer.AreSketchesEnabled() {
sketches = metrics.NewIterableSketches(func(sketch *metrics.SketchSeries) {
if logPayloads {
log.Debugf("Flushing Sketches: %v", sketch)
}
if isServerless {
log.DebugfServerless("Sending sketches payload : %s", sketch.String())
}
tagsetTlm.updateHugeSketchesTelemetry(sketch)
}, flushAndSerializeInParallel.BufferSize, flushAndSerializeInParallel.ChannelSize)
}
return series, sketches
}
// sendIterableSeries is continuously sending series to the serializer, until another routine calls SenderStopped on the
// series sink.
// Mainly meant to be executed in its own routine, sendIterableSeries is closing the `done` channel once it has returned
// from SendIterableSeries (because the SenderStopped methods has been called on the sink).
func sendIterableSeries(serializer serializer.MetricSerializer, start time.Time, serieSource metrics.SerieSource) {
log.Debug("Demultiplexer: sendIterableSeries: start sending iterable series to the serializer")
err := serializer.SendIterableSeries(serieSource)
// if err == nil, SenderStopped was called and it is safe to read the number of series.
count := serieSource.Count()
addFlushCount("Series", int64(count))
updateSerieTelemetry(start, count, err)
log.Debug("Demultiplexer: sendIterableSeries: stop routine")
}
// GetDogStatsDWorkerAndPipelineCount returns how many routines should be spawned
// for the DogStatsD workers and how many DogStatsD pipeline should be running.
func GetDogStatsDWorkerAndPipelineCount() (int, int) {
work, pipe := getDogStatsDWorkerAndPipelineCount(agentruntime.NumVCPU())
log.Infof("Dogstatsd configured to run with %d workers and %d pipelines", work, pipe)
return work, pipe
}
func getDogStatsDWorkerAndPipelineCount(vCPUs int) (int, int) {
var dsdWorkerCount int
var pipelineCount int
autoAdjust := pkgconfigsetup.Datadog().GetBool("dogstatsd_pipeline_autoadjust")
autoAdjustStrategy := pkgconfigsetup.Datadog().GetString("dogstatsd_pipeline_autoadjust_strategy")
if autoAdjustStrategy != AutoAdjustStrategyMaxThroughput && autoAdjustStrategy != AutoAdjustStrategyPerOrigin {
log.Warnf("Invalid value for 'dogstatsd_pipeline_autoadjust_strategy', using default value: %s", AutoAdjustStrategyMaxThroughput)
autoAdjustStrategy = AutoAdjustStrategyMaxThroughput
}
// no auto-adjust of the pipeline count:
// we use the pipeline count configuration
// to determine how many workers should be running
// ------------------------------------
if !autoAdjust {
pipelineCount = pkgconfigsetup.Datadog().GetInt("dogstatsd_pipeline_count")
if pipelineCount <= 0 { // guard against configuration mistakes
pipelineCount = 1
}
// - a core for the listener goroutine
// - one per aggregation pipeline (time sampler)
// - the rest for workers
// But we want at minimum 2 workers.
dsdWorkerCount = vCPUs - 1 - pipelineCount
if dsdWorkerCount < 2 {
dsdWorkerCount = 2
}
} else if autoAdjustStrategy == AutoAdjustStrategyMaxThroughput {
// we will auto-adjust the pipeline and workers count to maximize throughput
//
// Benchmarks have revealed that 3 very busy workers can be processed
// by 2 pipelines DogStatsD and have a good ratio execution / scheduling / waiting.
// To keep this simple for now, we will try running 1 less pipeline than workers.
// (e.g. for 4 workers, 3 pipelines)
// Use Go routines analysis with pprof to look at execution time if you want
// adapt this heuristic.
//
// Basically the formula is:
// - half the amount of vCPUS for the amount of workers routines
// - half the amount of vCPUS - 1 for the amount of pipeline routines
// - this last routine for the listener routine
dsdWorkerCount = vCPUs / 2
if dsdWorkerCount < 2 { // minimum 2 workers
dsdWorkerCount = 2
}
pipelineCount = dsdWorkerCount - 1
if pipelineCount <= 0 { // minimum 1 pipeline
pipelineCount = 1
}
if pkgconfigsetup.Datadog().GetInt("dogstatsd_pipeline_count") > 1 {
log.Warn("DogStatsD pipeline count value ignored since 'dogstatsd_pipeline_autoadjust' is enabled.")
}
} else if autoAdjustStrategy == AutoAdjustStrategyPerOrigin {
// we will auto-adjust the pipeline and workers count to isolate the pipelines
//
// The goal here is to have many pipelines to isolate the processing of the
// different samplers and avoid contention between them.
//
// This also has the benefit of increasing compression efficiency by having
// similarly tagged metrics flushed together.
dsdWorkerCount = vCPUs / 2
if dsdWorkerCount < 2 {
dsdWorkerCount = 2
}
pipelineCount = pkgconfigsetup.Datadog().GetInt("dogstatsd_pipeline_count")
if pipelineCount <= 0 { // guard against configuration mistakes
pipelineCount = vCPUs * 2
}
}
log.Info("Dogstatsd workers and pipelines count: ", dsdWorkerCount, " workers, ", pipelineCount, " pipelines")
return dsdWorkerCount, pipelineCount
}