diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java index 4f5911ebc69..2209e465cfa 100644 --- a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java @@ -49,7 +49,8 @@ public ConflatingMetricsAggregatorBenchmark() { HealthMetrics.NO_OP, new NullSink(), 2048, - 2048); + 2048, + true); } static List> generateTrace(int len) { diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java index b2b8b1820f1..b1bca3d2b5e 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java @@ -17,7 +17,7 @@ final class Aggregator implements Runnable { - private static final long DEFAULT_SLEEP_MILLIS = 10; + private static final long DEFAULT_SLEEP_MILLIS = 1; private static final Logger log = LoggerFactory.getLogger(Aggregator.class); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 04f60a78fa1..f8a50ebb6ab 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -101,6 +101,7 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private final SharedCommunicationObjects sharedCommunicationObjects; private volatile DDAgentFeaturesDiscovery features; private final HealthMetrics healthMetrics; + private final boolean configMetricsEnabled; private volatile AgentTaskScheduler.Scheduled cancellation; @@ -121,7 +122,8 @@ public ConflatingMetricsAggregator( false, DEFAULT_HEADERS), config.getTracerMetricsMaxAggregates(), - config.getTracerMetricsMaxPending()); + config.getTracerMetricsMaxPending(), + config.isTracerMetricsEnabled()); } ConflatingMetricsAggregator( @@ -131,7 +133,8 @@ public ConflatingMetricsAggregator( HealthMetrics healthMetric, Sink sink, int maxAggregates, - int queueSize) { + int queueSize, + boolean configMetricsEnabled) { this( wellKnownTags, ignoredResources, @@ -141,7 +144,8 @@ public ConflatingMetricsAggregator( maxAggregates, queueSize, 10, - SECONDS); + SECONDS, + configMetricsEnabled); } ConflatingMetricsAggregator( @@ -153,7 +157,8 @@ public ConflatingMetricsAggregator( int maxAggregates, int queueSize, long reportingInterval, - TimeUnit timeUnit) { + TimeUnit timeUnit, + boolean configMetricsEnabled) { this( ignoredResources, sharedCommunicationObjects, @@ -163,7 +168,8 @@ public ConflatingMetricsAggregator( maxAggregates, queueSize, reportingInterval, - timeUnit); + timeUnit, + configMetricsEnabled); } ConflatingMetricsAggregator( @@ -175,7 +181,9 @@ public ConflatingMetricsAggregator( int maxAggregates, int queueSize, long reportingInterval, - TimeUnit timeUnit) { + TimeUnit timeUnit, + boolean configMetricsEnabled) { + this.configMetricsEnabled = configMetricsEnabled; this.ignoredResources = ignoredResources; this.inbox = new MpscCompoundQueue<>(queueSize); this.batchPool = new SpmcArrayQueue<>(maxAggregates); @@ -199,16 +207,23 @@ public ConflatingMetricsAggregator( this.reportingIntervalTimeUnit = timeUnit; } - private DDAgentFeaturesDiscovery featuresDiscovery() { - DDAgentFeaturesDiscovery ret = features; - if (ret != null) { - return ret; + private void initialiseFeaturesDiscovery() { + features = sharedCommunicationObjects.featuresDiscovery(Config.get()); + if (!features.supportsMetrics()) { + clean(); + } + } + + private boolean supportsMetrics() { + final DDAgentFeaturesDiscovery features = this.features; + if (features != null) { + return features.supportsMetrics(); } - // no need to synchronise here since it's already done in sharedCommunicationObject. - // At worst, we'll assign multiple time the variable but it will be the same object - ret = sharedCommunicationObjects.featuresDiscovery(Config.get()); - features = ret; - return ret; + // when the feature discovery is not yet ready, if metrics are enabled we don't want to loose + // spans metrics. + // In any case the feature discovery will happen and if not supported the inbox queue will be + // cleared + return configMetricsEnabled; } @Override @@ -223,11 +238,19 @@ public void start() { reportingInterval, reportingInterval, reportingIntervalTimeUnit); + // lazily fetch feature discovery + AgentTaskScheduler.get().execute(this::initialiseFeaturesDiscovery); log.debug("started metrics aggregator"); } @Override public boolean report() { + // avoid triggering a downgrade if the agent is not ready / discovered + if (features == null) { + clean(); + return false; + } + boolean published; int attempts = 0; do { @@ -242,7 +265,7 @@ public boolean report() { @Override public Future forceReport() { - if (!featuresDiscovery().supportsMetrics()) { + if (!supportsMetrics()) { return CompletableFuture.completedFuture(false); } // Wait for the thread to start @@ -278,8 +301,8 @@ public Future forceReport() { public boolean publish(List> trace) { boolean forceKeep = false; int counted = 0; - final DDAgentFeaturesDiscovery features = featuresDiscovery(); - if (features.supportsMetrics()) { + if (supportsMetrics()) { + final DDAgentFeaturesDiscovery features = this.features; for (CoreSpan span : trace) { boolean isTopLevel = span.isTopLevel(); if (shouldComputeMetric(span)) { @@ -293,7 +316,7 @@ public boolean publish(List> trace) { } } healthMetrics.onClientStatTraceComputed( - counted, trace.size(), features.supportsDropping() && !forceKeep); + counted, trace.size(), features != null && features.supportsDropping() && !forceKeep); } return forceKeep; } @@ -413,6 +436,7 @@ public void stop() { public void close() { stop(); try { + thread.interrupt(); thread.join(THREAD_JOIN_TIMOUT_MS); } catch (InterruptedException ignored) { } @@ -424,7 +448,7 @@ public void onEvent(EventType eventType, String message) { switch (eventType) { case DOWNGRADED: log.debug("Agent downgrade was detected"); - AgentTaskScheduler.get().execute(this::disable); + AgentTaskScheduler.get().execute(this::onDowngrade); break; case BAD_PAYLOAD: log.debug("bad metrics payload sent to trace agent: {}", message); @@ -439,19 +463,25 @@ public void onEvent(EventType eventType, String message) { } } - private void disable() { - final DDAgentFeaturesDiscovery features = featuresDiscovery(); - features.discover(); - if (!features.supportsMetrics()) { + private void onDowngrade() { + DDAgentFeaturesDiscovery features = this.features; + if (null != features) { + features.discover(); + } + if (!supportsMetrics()) { log.debug("Disabling metric reporting because an agent downgrade was detected"); healthMetrics.onClientStatDowngraded(); - this.pending.clear(); - this.batchPool.clear(); - this.inbox.clear(); - this.aggregator.clearAggregates(); + clean(); } } + private void clean() { + this.pending.clear(); + this.batchPool.clear(); + this.inbox.clear(); + this.aggregator.clearAggregates(); + } + private static final class ReportTask implements AgentTaskScheduler.Task { diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index 747cb33e7e3..768526e4fe3 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -45,7 +45,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification { 10, queueSize, 1, - MILLISECONDS + MILLISECONDS, + true ) aggregator.start() @@ -75,7 +76,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification { 10, queueSize, 1, - MILLISECONDS + MILLISECONDS, + true ) aggregator.start() @@ -105,7 +107,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, + true) aggregator.start() when: @@ -147,7 +150,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, + true) aggregator.start() when: @@ -198,7 +202,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >>> [["country"], ["country", "georegion"],] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, + true) aggregator.start() when: @@ -257,7 +262,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> ["peer.hostname", "_dd.base_service"] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, + true) aggregator.start() when: @@ -306,7 +312,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, sharedCommunicationObjects(features), HealthMetrics.NO_OP, - sink, writer, 10, queueSize, reportingInterval, SECONDS) + sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() when: @@ -354,7 +360,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) long duration = 100 List trace = [ new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, duration, HTTP_OK).setTag(SPAN_KIND, "baz"), @@ -419,7 +425,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, true) long duration = 100 aggregator.start() @@ -478,7 +484,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, true) long duration = 100 aggregator.start() @@ -568,7 +574,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, true) long duration = 100 aggregator.start() @@ -624,7 +630,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, true) long duration = 100 aggregator.start() @@ -671,7 +677,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, true) long duration = 100 aggregator.start() @@ -710,7 +716,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, true) long duration = 100 aggregator.start() @@ -741,7 +747,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false) aggregator.start() when: @@ -763,7 +769,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> false features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, 200, MILLISECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, 200, MILLISECONDS, true) final spans = [ new SimpleSpan("service", "operation", "resource", "type", false, true, false, 0, 10, HTTP_OK) ] @@ -795,7 +801,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, true) when: def async = CompletableFuture.supplyAsync(new Supplier() { @@ -828,7 +834,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS) + sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() when: diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy index da5fe6a26c4..8456c7eba55 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy @@ -42,7 +42,7 @@ class FootprintForkedTest extends DDSpecification { 1000, 1000, 100, - SECONDS) + SECONDS, true) // Removing the 'features' as it's a mock, and mocks are heavyweight, e.g. around 22MiB def baseline = footprint(aggregator, features) aggregator.start()