Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public ConflatingMetricsAggregatorBenchmark() {
HealthMetrics.NO_OP,
new NullSink(),
2048,
2048);
2048,
true);
}

static List<CoreSpan<?>> generateTrace(int len) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

final class Aggregator implements Runnable {

private static final long DEFAULT_SLEEP_MILLIS = 10;
private static final long DEFAULT_SLEEP_MILLIS = 1;
Copy link
Contributor

@AlexeyKuznetsov-DD AlexeyKuznetsov-DD Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious how value of 1 was selected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm testing stuff. it's a change to be ignored. It turned out that the main responsible for increased startup is the classloading (we pull a lot of classes mostly related to jctools). I will trash this PR


private static final Logger log = LoggerFactory.getLogger(Aggregator.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
private final SharedCommunicationObjects sharedCommunicationObjects;
private volatile DDAgentFeaturesDiscovery features;
private final HealthMetrics healthMetrics;
private final boolean configMetricsEnabled;

private volatile AgentTaskScheduler.Scheduled<?> cancellation;

Expand All @@ -121,7 +122,8 @@ public ConflatingMetricsAggregator(
false,
DEFAULT_HEADERS),
config.getTracerMetricsMaxAggregates(),
config.getTracerMetricsMaxPending());
config.getTracerMetricsMaxPending(),
config.isTracerMetricsEnabled());
}

ConflatingMetricsAggregator(
Expand All @@ -131,7 +133,8 @@ public ConflatingMetricsAggregator(
HealthMetrics healthMetric,
Sink sink,
int maxAggregates,
int queueSize) {
int queueSize,
boolean configMetricsEnabled) {
this(
wellKnownTags,
ignoredResources,
Expand All @@ -141,7 +144,8 @@ public ConflatingMetricsAggregator(
maxAggregates,
queueSize,
10,
SECONDS);
SECONDS,
configMetricsEnabled);
}

ConflatingMetricsAggregator(
Expand All @@ -153,7 +157,8 @@ public ConflatingMetricsAggregator(
int maxAggregates,
int queueSize,
long reportingInterval,
TimeUnit timeUnit) {
TimeUnit timeUnit,
boolean configMetricsEnabled) {
this(
ignoredResources,
sharedCommunicationObjects,
Expand All @@ -163,7 +168,8 @@ public ConflatingMetricsAggregator(
maxAggregates,
queueSize,
reportingInterval,
timeUnit);
timeUnit,
configMetricsEnabled);
}

ConflatingMetricsAggregator(
Expand All @@ -175,7 +181,9 @@ public ConflatingMetricsAggregator(
int maxAggregates,
int queueSize,
long reportingInterval,
TimeUnit timeUnit) {
TimeUnit timeUnit,
boolean configMetricsEnabled) {
this.configMetricsEnabled = configMetricsEnabled;
this.ignoredResources = ignoredResources;
this.inbox = new MpscCompoundQueue<>(queueSize);
this.batchPool = new SpmcArrayQueue<>(maxAggregates);
Expand All @@ -199,16 +207,23 @@ public ConflatingMetricsAggregator(
this.reportingIntervalTimeUnit = timeUnit;
}

private DDAgentFeaturesDiscovery featuresDiscovery() {
DDAgentFeaturesDiscovery ret = features;
if (ret != null) {
return ret;
private void initialiseFeaturesDiscovery() {
features = sharedCommunicationObjects.featuresDiscovery(Config.get());
if (!features.supportsMetrics()) {
clean();
}
}

private boolean supportsMetrics() {
final DDAgentFeaturesDiscovery features = this.features;
if (features != null) {
return features.supportsMetrics();
}
// no need to synchronise here since it's already done in sharedCommunicationObject.
// At worst, we'll assign multiple time the variable but it will be the same object
ret = sharedCommunicationObjects.featuresDiscovery(Config.get());
features = ret;
return ret;
// when the feature discovery is not yet ready, if metrics are enabled we don't want to loose
// spans metrics.
// In any case the feature discovery will happen and if not supported the inbox queue will be
// cleared
return configMetricsEnabled;
}

@Override
Expand All @@ -223,11 +238,19 @@ public void start() {
reportingInterval,
reportingInterval,
reportingIntervalTimeUnit);
// lazily fetch feature discovery
AgentTaskScheduler.get().execute(this::initialiseFeaturesDiscovery);
log.debug("started metrics aggregator");
}

@Override
public boolean report() {
// avoid triggering a downgrade if the agent is not ready / discovered
if (features == null) {
clean();
return false;
}

boolean published;
int attempts = 0;
do {
Expand All @@ -242,7 +265,7 @@ public boolean report() {

@Override
public Future<Boolean> forceReport() {
if (!featuresDiscovery().supportsMetrics()) {
if (!supportsMetrics()) {
return CompletableFuture.completedFuture(false);
}
// Wait for the thread to start
Expand Down Expand Up @@ -278,8 +301,8 @@ public Future<Boolean> forceReport() {
public boolean publish(List<? extends CoreSpan<?>> trace) {
boolean forceKeep = false;
int counted = 0;
final DDAgentFeaturesDiscovery features = featuresDiscovery();
if (features.supportsMetrics()) {
if (supportsMetrics()) {
final DDAgentFeaturesDiscovery features = this.features;
for (CoreSpan<?> span : trace) {
boolean isTopLevel = span.isTopLevel();
if (shouldComputeMetric(span)) {
Expand All @@ -293,7 +316,7 @@ public boolean publish(List<? extends CoreSpan<?>> trace) {
}
}
healthMetrics.onClientStatTraceComputed(
counted, trace.size(), features.supportsDropping() && !forceKeep);
counted, trace.size(), features != null && features.supportsDropping() && !forceKeep);
}
return forceKeep;
}
Expand Down Expand Up @@ -413,6 +436,7 @@ public void stop() {
public void close() {
stop();
try {
thread.interrupt();
thread.join(THREAD_JOIN_TIMOUT_MS);
} catch (InterruptedException ignored) {
}
Expand All @@ -424,7 +448,7 @@ public void onEvent(EventType eventType, String message) {
switch (eventType) {
case DOWNGRADED:
log.debug("Agent downgrade was detected");
AgentTaskScheduler.get().execute(this::disable);
AgentTaskScheduler.get().execute(this::onDowngrade);
break;
case BAD_PAYLOAD:
log.debug("bad metrics payload sent to trace agent: {}", message);
Expand All @@ -439,19 +463,25 @@ public void onEvent(EventType eventType, String message) {
}
}

private void disable() {
final DDAgentFeaturesDiscovery features = featuresDiscovery();
features.discover();
if (!features.supportsMetrics()) {
private void onDowngrade() {
DDAgentFeaturesDiscovery features = this.features;
if (null != features) {
features.discover();
}
if (!supportsMetrics()) {
log.debug("Disabling metric reporting because an agent downgrade was detected");
healthMetrics.onClientStatDowngraded();
this.pending.clear();
this.batchPool.clear();
this.inbox.clear();
this.aggregator.clearAggregates();
clean();
}
}

private void clean() {
this.pending.clear();
this.batchPool.clear();
this.inbox.clear();
this.aggregator.clearAggregates();
}

private static final class ReportTask
implements AgentTaskScheduler.Task<ConflatingMetricsAggregator> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
10,
queueSize,
1,
MILLISECONDS
MILLISECONDS,
true
)
aggregator.start()

Expand Down Expand Up @@ -75,7 +76,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
10,
queueSize,
1,
MILLISECONDS
MILLISECONDS,
true
)
aggregator.start()

Expand Down Expand Up @@ -105,7 +107,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
features.supportsMetrics() >> true
features.peerTags() >> []
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS)
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS,
true)
aggregator.start()

when:
Expand Down Expand Up @@ -147,7 +150,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
features.supportsMetrics() >> true
features.peerTags() >> []
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS)
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS,
true)
aggregator.start()

when:
Expand Down Expand Up @@ -198,7 +202,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
features.supportsMetrics() >> true
features.peerTags() >>> [["country"], ["country", "georegion"],]
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS)
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS,
true)
aggregator.start()

when:
Expand Down Expand Up @@ -257,7 +262,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
features.supportsMetrics() >> true
features.peerTags() >> ["peer.hostname", "_dd.base_service"]
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS)
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS,
true)
aggregator.start()

when:
Expand Down Expand Up @@ -306,7 +312,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
features.supportsMetrics() >> true
features.peerTags() >> []
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, sharedCommunicationObjects(features), HealthMetrics.NO_OP,
sink, writer, 10, queueSize, reportingInterval, SECONDS)
sink, writer, 10, queueSize, reportingInterval, SECONDS, true)
aggregator.start()

when:
Expand Down Expand Up @@ -354,7 +360,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
features.supportsMetrics() >> true
features.peerTags() >> []
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS)
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true)
long duration = 100
List<CoreSpan> trace = [
new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, duration, HTTP_OK).setTag(SPAN_KIND, "baz"),
Expand Down Expand Up @@ -419,7 +425,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
features.supportsMetrics() >> true
features.peerTags() >> []
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS)
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, true)
long duration = 100
aggregator.start()

Expand Down Expand Up @@ -478,7 +484,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
features.supportsMetrics() >> true
features.peerTags() >> []
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS)
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, true)
long duration = 100
aggregator.start()

Expand Down Expand Up @@ -568,7 +574,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
features.supportsMetrics() >> true
features.peerTags() >> []
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS)
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, true)
long duration = 100
aggregator.start()

Expand Down Expand Up @@ -624,7 +630,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
features.supportsMetrics() >> true
features.peerTags() >> []
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS)
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, true)
long duration = 100
aggregator.start()

Expand Down Expand Up @@ -671,7 +677,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
features.supportsMetrics() >> true
features.peerTags() >> []
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS)
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, true)
long duration = 100
aggregator.start()

Expand Down Expand Up @@ -710,7 +716,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
features.supportsMetrics() >> true
features.peerTags() >> []
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS)
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, true)
long duration = 100
aggregator.start()

Expand Down Expand Up @@ -741,7 +747,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
Sink sink = Stub(Sink)
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS)
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false)
aggregator.start()

when:
Expand All @@ -763,7 +769,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
features.supportsMetrics() >> false
features.peerTags() >> []
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, 200, MILLISECONDS)
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, 200, MILLISECONDS, true)
final spans = [
new SimpleSpan("service", "operation", "resource", "type", false, true, false, 0, 10, HTTP_OK)
]
Expand Down Expand Up @@ -795,7 +801,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS)
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, true)

when:
def async = CompletableFuture.supplyAsync(new Supplier<Boolean>() {
Expand Down Expand Up @@ -828,7 +834,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification {
DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery)
features.supportsMetrics() >> true
ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty,
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS)
sharedCommunicationObjects(features), HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true)
aggregator.start()

when:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class FootprintForkedTest extends DDSpecification {
1000,
1000,
100,
SECONDS)
SECONDS, true)
// Removing the 'features' as it's a mock, and mocks are heavyweight, e.g. around 22MiB
def baseline = footprint(aggregator, features)
aggregator.start()
Expand Down
Loading