Skip to content

Commit eac5933

Browse files
committed
fix slo warnings
1 parent abb87ab commit eac5933

File tree

4 files changed

+78
-48
lines changed

4 files changed

+78
-48
lines changed

dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ public ConflatingMetricsAggregatorBenchmark() {
4949
HealthMetrics.NO_OP,
5050
new NullSink(),
5151
2048,
52-
2048);
52+
2048,
53+
true);
5354
}
5455

5556
static List<CoreSpan<?>> generateTrace(int len) {

dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java

Lines changed: 51 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
101101
private final SharedCommunicationObjects sharedCommunicationObjects;
102102
private volatile DDAgentFeaturesDiscovery features;
103103
private final HealthMetrics healthMetrics;
104+
private final boolean configMetricsEnabled;
104105

105106
private volatile AgentTaskScheduler.Scheduled<?> cancellation;
106107

@@ -121,7 +122,8 @@ public ConflatingMetricsAggregator(
121122
false,
122123
DEFAULT_HEADERS),
123124
config.getTracerMetricsMaxAggregates(),
124-
config.getTracerMetricsMaxPending());
125+
config.getTracerMetricsMaxPending(),
126+
config.isTracerMetricsEnabled());
125127
}
126128

127129
ConflatingMetricsAggregator(
@@ -131,7 +133,8 @@ public ConflatingMetricsAggregator(
131133
HealthMetrics healthMetric,
132134
Sink sink,
133135
int maxAggregates,
134-
int queueSize) {
136+
int queueSize,
137+
boolean configMetricsEnabled) {
135138
this(
136139
wellKnownTags,
137140
ignoredResources,
@@ -141,7 +144,8 @@ public ConflatingMetricsAggregator(
141144
maxAggregates,
142145
queueSize,
143146
10,
144-
SECONDS);
147+
SECONDS,
148+
configMetricsEnabled);
145149
}
146150

147151
ConflatingMetricsAggregator(
@@ -153,7 +157,8 @@ public ConflatingMetricsAggregator(
153157
int maxAggregates,
154158
int queueSize,
155159
long reportingInterval,
156-
TimeUnit timeUnit) {
160+
TimeUnit timeUnit,
161+
boolean configMetricsEnabled) {
157162
this(
158163
ignoredResources,
159164
sharedCommunicationObjects,
@@ -163,7 +168,8 @@ public ConflatingMetricsAggregator(
163168
maxAggregates,
164169
queueSize,
165170
reportingInterval,
166-
timeUnit);
171+
timeUnit,
172+
configMetricsEnabled);
167173
}
168174

169175
ConflatingMetricsAggregator(
@@ -175,7 +181,9 @@ public ConflatingMetricsAggregator(
175181
int maxAggregates,
176182
int queueSize,
177183
long reportingInterval,
178-
TimeUnit timeUnit) {
184+
TimeUnit timeUnit,
185+
boolean configMetricsEnabled) {
186+
this.configMetricsEnabled = configMetricsEnabled;
179187
this.ignoredResources = ignoredResources;
180188
this.inbox = new MpscCompoundQueue<>(queueSize);
181189
this.batchPool = new SpmcArrayQueue<>(maxAggregates);
@@ -199,16 +207,23 @@ public ConflatingMetricsAggregator(
199207
this.reportingIntervalTimeUnit = timeUnit;
200208
}
201209

202-
private DDAgentFeaturesDiscovery featuresDiscovery() {
203-
DDAgentFeaturesDiscovery ret = features;
204-
if (ret != null) {
205-
return ret;
210+
private void initialiseFeaturesDiscovery() {
211+
features = sharedCommunicationObjects.featuresDiscovery(Config.get());
212+
if (!features.supportsMetrics()) {
213+
disable();
214+
}
215+
}
216+
217+
private boolean supportsMetrics() {
218+
final DDAgentFeaturesDiscovery features = this.features;
219+
if (features != null) {
220+
return features.supportsMetrics();
206221
}
207-
// no need to synchronise here since it's already done in sharedCommunicationObject.
208-
// At worst, we'll assign multiple time the variable but it will be the same object
209-
ret = sharedCommunicationObjects.featuresDiscovery(Config.get());
210-
features = ret;
211-
return ret;
222+
// when the feature discovery is not yet ready, if metrics are enabled we don't want to loose
223+
// spans metrics.
224+
// In any case the feature discovery will happen and if not supported the inbox queue will be
225+
// cleared
226+
return configMetricsEnabled;
212227
}
213228

214229
@Override
@@ -223,6 +238,8 @@ public void start() {
223238
reportingInterval,
224239
reportingInterval,
225240
reportingIntervalTimeUnit);
241+
// lazily fetch feature discovery
242+
AgentTaskScheduler.get().execute(this::initialiseFeaturesDiscovery);
226243
log.debug("started metrics aggregator");
227244
}
228245

@@ -242,7 +259,7 @@ public boolean report() {
242259

243260
@Override
244261
public Future<Boolean> forceReport() {
245-
if (!featuresDiscovery().supportsMetrics()) {
262+
if (!supportsMetrics()) {
246263
return CompletableFuture.completedFuture(false);
247264
}
248265
// Wait for the thread to start
@@ -278,8 +295,8 @@ public Future<Boolean> forceReport() {
278295
public boolean publish(List<? extends CoreSpan<?>> trace) {
279296
boolean forceKeep = false;
280297
int counted = 0;
281-
final DDAgentFeaturesDiscovery features = featuresDiscovery();
282-
if (features.supportsMetrics()) {
298+
if (supportsMetrics()) {
299+
final DDAgentFeaturesDiscovery features = this.features;
283300
for (CoreSpan<?> span : trace) {
284301
boolean isTopLevel = span.isTopLevel();
285302
if (shouldComputeMetric(span)) {
@@ -293,7 +310,7 @@ public boolean publish(List<? extends CoreSpan<?>> trace) {
293310
}
294311
}
295312
healthMetrics.onClientStatTraceComputed(
296-
counted, trace.size(), features.supportsDropping() && !forceKeep);
313+
counted, trace.size(), features != null && features.supportsDropping() && !forceKeep);
297314
}
298315
return forceKeep;
299316
}
@@ -424,7 +441,7 @@ public void onEvent(EventType eventType, String message) {
424441
switch (eventType) {
425442
case DOWNGRADED:
426443
log.debug("Agent downgrade was detected");
427-
AgentTaskScheduler.get().execute(this::disable);
444+
AgentTaskScheduler.get().execute(this::onDowngrade);
428445
break;
429446
case BAD_PAYLOAD:
430447
log.debug("bad metrics payload sent to trace agent: {}", message);
@@ -439,19 +456,25 @@ public void onEvent(EventType eventType, String message) {
439456
}
440457
}
441458

442-
private void disable() {
443-
final DDAgentFeaturesDiscovery features = featuresDiscovery();
444-
features.discover();
445-
if (!features.supportsMetrics()) {
459+
private void onDowngrade() {
460+
DDAgentFeaturesDiscovery features = this.features;
461+
if (null != features) {
462+
features.discover();
463+
}
464+
if (!supportsMetrics()) {
446465
log.debug("Disabling metric reporting because an agent downgrade was detected");
447466
healthMetrics.onClientStatDowngraded();
448-
this.pending.clear();
449-
this.batchPool.clear();
450-
this.inbox.clear();
451-
this.aggregator.clearAggregates();
467+
disable();
452468
}
453469
}
454470

471+
private void disable() {
472+
this.pending.clear();
473+
this.batchPool.clear();
474+
this.inbox.clear();
475+
this.aggregator.clearAggregates();
476+
}
477+
455478
private static final class ReportTask
456479
implements AgentTaskScheduler.Task<ConflatingMetricsAggregator> {
457480

dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
4545
10,
4646
queueSize,
4747
1,
48-
MILLISECONDS
48+
MILLISECONDS,
49+
true
4950
)
5051
aggregator.start()
5152

@@ -75,7 +76,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
7576
10,
7677
queueSize,
7778
1,
78-
MILLISECONDS
79+
MILLISECONDS,
80+
true
7981
)
8082
aggregator.start()
8183

@@ -105,7 +107,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
105107
features.supportsMetrics() >> true
106108
features.peerTags() >> []
107109
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
108-
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS)
110+
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS,
111+
true)
109112
aggregator.start()
110113

111114
when:
@@ -147,7 +150,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
147150
features.supportsMetrics() >> true
148151
features.peerTags() >> []
149152
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
150-
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS)
153+
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS,
154+
true)
151155
aggregator.start()
152156

153157
when:
@@ -198,7 +202,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
198202
features.supportsMetrics() >> true
199203
features.peerTags() >>> [["country"], ["country", "georegion"],]
200204
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
201-
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS)
205+
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS,
206+
true)
202207
aggregator.start()
203208

204209
when:
@@ -257,7 +262,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
257262
features.supportsMetrics() >> true
258263
features.peerTags() >> ["peer.hostname", "_dd.base_service"]
259264
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
260-
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS)
265+
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS,
266+
true)
261267
aggregator.start()
262268

263269
when:
@@ -306,7 +312,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
306312
features.supportsMetrics() >> true
307313
features.peerTags() >> []
308314
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, sharedCommunicationObjects(features), HealthMetrics.NO_OP,
309-
sink, writer, 10, queueSize, reportingInterval, SECONDS)
315+
sink, writer, 10, queueSize, reportingInterval, SECONDS, true)
310316
aggregator.start()
311317

312318
when:
@@ -354,7 +360,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
354360
features.supportsMetrics() >> true
355361
features.peerTags() >> []
356362
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
357-
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS)
363+
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true)
358364
long duration = 100
359365
List<CoreSpan> trace = [
360366
new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, duration, HTTP_OK).setTag(SPAN_KIND, "baz"),
@@ -419,7 +425,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
419425
features.supportsMetrics() >> true
420426
features.peerTags() >> []
421427
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
422-
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS)
428+
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, true)
423429
long duration = 100
424430
aggregator.start()
425431

@@ -478,7 +484,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
478484
features.supportsMetrics() >> true
479485
features.peerTags() >> []
480486
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
481-
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS)
487+
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, true)
482488
long duration = 100
483489
aggregator.start()
484490

@@ -568,7 +574,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
568574
features.supportsMetrics() >> true
569575
features.peerTags() >> []
570576
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
571-
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS)
577+
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, true)
572578
long duration = 100
573579
aggregator.start()
574580

@@ -624,7 +630,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
624630
features.supportsMetrics() >> true
625631
features.peerTags() >> []
626632
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
627-
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS)
633+
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, true)
628634
long duration = 100
629635
aggregator.start()
630636

@@ -671,7 +677,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
671677
features.supportsMetrics() >> true
672678
features.peerTags() >> []
673679
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
674-
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS)
680+
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, true)
675681
long duration = 100
676682
aggregator.start()
677683

@@ -710,7 +716,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
710716
features.supportsMetrics() >> true
711717
features.peerTags() >> []
712718
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
713-
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS)
719+
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, true)
714720
long duration = 100
715721
aggregator.start()
716722

@@ -741,7 +747,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
741747
Sink sink = Stub(Sink)
742748
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
743749
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
744-
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS)
750+
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false)
745751
aggregator.start()
746752

747753
when:
@@ -763,7 +769,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
763769
features.supportsMetrics() >> false
764770
features.peerTags() >> []
765771
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
766-
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, 200, MILLISECONDS)
772+
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, 200, MILLISECONDS, true)
767773
final spans = [
768774
new SimpleSpan("service", "operation", "resource", "type", false, true, false, 0, 10, HTTP_OK)
769775
]
@@ -795,7 +801,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
795801
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
796802
features.supportsMetrics() >> true
797803
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
798-
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS)
804+
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, true)
799805

800806
when:
801807
def async = CompletableFuture.supplyAsync(new Supplier<Boolean>() {
@@ -828,7 +834,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
828834
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
829835
features.supportsMetrics() >> true
830836
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
831-
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS)
837+
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true)
832838
aggregator.start()
833839
834840
when:

dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class FootprintForkedTest extends DDSpecification {
4242
1000,
4343
1000,
4444
100,
45-
SECONDS)
45+
SECONDS, true)
4646
// Removing the 'features' as it's a mock, and mocks are heavyweight, e.g. around 22MiB
4747
def baseline = footprint(aggregator, features)
4848
aggregator.start()

0 commit comments

Comments
 (0)