Skip to content

Commit 7799cec

Browse files
committed
Added shard operations collector and optimized node stats collector
1 parent fc919c7 commit 7799cec

File tree

7 files changed

+369
-49
lines changed

7 files changed

+369
-49
lines changed

src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,8 @@ public Collection<Object> createComponents(
407407
// initialize it. This is the earliest point at which we know ClusterService is created.
408408
// So, call the initialize method here.
409409
clusterSettingsManager.initialize();
410+
// Initialize ShardMetricsCollector histograms
411+
ShardMetricsCollector.INSTANCE.initialize();
410412
return Collections.singletonList(performanceAnalyzerController);
411413
}
412414

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.performanceanalyzer;
7+
8+
import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics;
9+
import org.opensearch.telemetry.metrics.Histogram;
10+
import org.opensearch.telemetry.metrics.MetricsRegistry;
11+
import org.opensearch.telemetry.metrics.tags.Tags;
12+
13+
/**
14+
* A singleton collector for recording per-shard CPU and heap metrics in OpenSearch. This class
15+
* maintains two histograms:
16+
*
17+
* <ul>
18+
* <li>CPU utilization histogram - tracks CPU usage per shard
19+
* <li>Heap usage histogram - tracks heap memory allocation per shard
20+
* </ul>
21+
*
22+
* The metrics are recorded with tags for better categorization and analysis.
23+
*/
24+
public final class ShardMetricsCollector {
25+
/** Singleton instance of the ShardMetricsCollector */
26+
public static final ShardMetricsCollector INSTANCE = new ShardMetricsCollector();
27+
28+
public static final String SHARD_CPU_UTILIZATION = "shard_cpu_utilization";
29+
public static final String SHARD_HEAP_ALLOCATED = "shard_heap_allocated";
30+
31+
/** Histogram for tracking CPU utilization -- GETTER -- Gets the CPU utilization histogram. */
32+
private Histogram cpuUtilizationHistogram;
33+
34+
/** Histogram for tracking heap usage -- GETTER -- Gets the heap usage histogram. */
35+
private Histogram heapUsedHistogram;
36+
37+
/**
38+
* Private constructor that initializes the CPU and heap histograms. This is private to ensure
39+
* singleton pattern.
40+
*/
41+
private ShardMetricsCollector() {
42+
this.cpuUtilizationHistogram = null;
43+
this.heapUsedHistogram = null;
44+
}
45+
46+
/** Initialise metric histograms */
47+
public void initialize() {
48+
if (this.cpuUtilizationHistogram == null) {
49+
this.cpuUtilizationHistogram = createCpuUtilizationHistogram();
50+
}
51+
if (this.heapUsedHistogram == null) {
52+
this.heapUsedHistogram = createHeapUsedHistogram();
53+
}
54+
}
55+
56+
/**
57+
* Creates a histogram for tracking CPU utilization.
58+
*
59+
* @return A histogram instance for CPU metrics, or null if metrics registry is unavailable
60+
*/
61+
private Histogram createCpuUtilizationHistogram() {
62+
MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry();
63+
if (metricsRegistry != null) {
64+
return metricsRegistry.createHistogram(
65+
SHARD_CPU_UTILIZATION,
66+
"CPU Utilization per shard for an operation",
67+
RTFMetrics.MetricUnits.RATE.toString());
68+
}
69+
return null;
70+
}
71+
72+
/**
73+
* Creates a histogram for tracking heap usage.
74+
*
75+
* @return A histogram instance for heap metrics, or null if metrics registry is unavailable
76+
*/
77+
private Histogram createHeapUsedHistogram() {
78+
MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry();
79+
if (metricsRegistry != null) {
80+
return metricsRegistry.createHistogram(
81+
SHARD_HEAP_ALLOCATED,
82+
"Heap Utilization per shard for an operation",
83+
RTFMetrics.MetricUnits.BYTE.toString());
84+
}
85+
return null;
86+
}
87+
88+
/**
89+
* Records a CPU utilization measurement with associated tags.
90+
*
91+
* @param cpuUtilization The CPU utilization value to record (as a percentage)
92+
* @param tags The tags to associate with this measurement (e.g., shard ID, operation type)
93+
*/
94+
public void recordCpuUtilization(double cpuUtilization, Tags tags) {
95+
if (cpuUtilizationHistogram != null) {
96+
cpuUtilizationHistogram.record(cpuUtilization, tags);
97+
}
98+
}
99+
100+
/**
101+
* Records a heap usage measurement with associated tags.
102+
*
103+
* @param heapBytes The heap usage value to record (in bytes)
104+
* @param tags The tags to associate with this measurement (e.g., shard ID, operation type)
105+
*/
106+
public void recordHeapUsed(double heapBytes, Tags tags) {
107+
if (heapUsedHistogram != null) {
108+
heapUsedHistogram.record(heapBytes, tags);
109+
}
110+
}
111+
112+
public Histogram getCpuUtilizationHistogram() {
113+
return cpuUtilizationHistogram;
114+
}
115+
116+
public Histogram getHeapUsedHistogram() {
117+
return heapUsedHistogram;
118+
}
119+
}

src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.opensearch.core.action.NotifyOnceListener;
1717
import org.opensearch.index.shard.SearchOperationListener;
1818
import org.opensearch.performanceanalyzer.OpenSearchResources;
19+
import org.opensearch.performanceanalyzer.ShardMetricsCollector;
1920
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
2021
import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics;
2122
import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics.ShardOperationsValue;
@@ -287,18 +288,29 @@ protected void innerOnResponse(Task task) {
287288
* particular phaseTime and the total time till this calculation happen from the
288289
* overall start time.
289290
*/
290-
long totalTime = System.nanoTime() - startTime;
291+
long totalTime = System.nanoTime() - task.getStartTimeNanos();
291292
double shareFactor = computeShareFactor(phaseTookTime, totalTime);
292-
cpuUtilizationHistogram.record(
293+
LOG.debug(
294+
"Total task time {} ns. Total Operation Listener time {} ns. "
295+
+ "Phase took time {} ns. Share factor {} ",
296+
totalTime,
297+
System.nanoTime() - startTime,
298+
phaseTookTime,
299+
shareFactor);
300+
double cpuUtilization =
293301
Utils.calculateCPUUtilization(
294302
numProcessors,
295303
totalTime,
296304
task.getTotalResourceStats().getCpuTimeInNanos(),
297-
shareFactor),
298-
createTags(searchContext, phase, isFailed));
299-
heapUsedHistogram.record(
300-
Math.max(0, task.getTotalResourceStats().getMemoryInBytes() * shareFactor),
301-
createTags(searchContext, phase, isFailed));
305+
shareFactor);
306+
cpuUtilizationHistogram.record(
307+
cpuUtilization, createTags(searchContext, phase, isFailed));
308+
ShardMetricsCollector.INSTANCE.recordCpuUtilization(
309+
cpuUtilization, createTags(searchContext));
310+
double heapUsed =
311+
Math.max(0, task.getTotalResourceStats().getMemoryInBytes() * shareFactor);
312+
heapUsedHistogram.record(heapUsed, createTags(searchContext, phase, isFailed));
313+
ShardMetricsCollector.INSTANCE.recordHeapUsed(heapUsed, createTags(searchContext));
302314
}
303315

304316
@Override
@@ -309,17 +321,27 @@ protected void innerOnFailure(Exception e) {
309321
}
310322

311323
private Tags createTags(SearchContext searchContext, String phase, boolean isFailed) {
312-
return Tags.create()
313-
.addTag(
314-
RTFMetrics.CommonDimension.INDEX_NAME.toString(),
315-
searchContext.request().shardId().getIndex().getName())
316-
.addTag(
317-
RTFMetrics.CommonDimension.INDEX_UUID.toString(),
318-
searchContext.request().shardId().getIndex().getUUID())
319-
.addTag(
320-
RTFMetrics.CommonDimension.SHARD_ID.toString(),
321-
searchContext.request().shardId().getId())
322-
.addTag(RTFMetrics.CommonDimension.OPERATION.toString(), phase)
323-
.addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed);
324+
Tags tags =
325+
Tags.create()
326+
.addTag(
327+
RTFMetrics.CommonDimension.INDEX_NAME.toString(),
328+
searchContext.request().shardId().getIndex().getName())
329+
.addTag(
330+
RTFMetrics.CommonDimension.INDEX_UUID.toString(),
331+
searchContext.request().shardId().getIndex().getUUID())
332+
.addTag(
333+
RTFMetrics.CommonDimension.SHARD_ID.toString(),
334+
searchContext.request().shardId().getId());
335+
336+
// Only add phase tag if phase is not null
337+
if (phase != null && !phase.isEmpty()) {
338+
tags.addTag(RTFMetrics.CommonDimension.OPERATION.toString(), phase)
339+
.addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed);
340+
}
341+
return tags;
342+
}
343+
344+
private Tags createTags(SearchContext searchContext) {
345+
return createTags(searchContext, null, false);
324346
}
325347
}

src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.opensearch.Version;
1616
import org.opensearch.core.index.shard.ShardId;
1717
import org.opensearch.core.transport.TransportResponse;
18+
import org.opensearch.performanceanalyzer.ShardMetricsCollector;
1819
import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics;
1920
import org.opensearch.performanceanalyzer.util.Utils;
2021
import org.opensearch.telemetry.metrics.Histogram;
@@ -142,27 +143,41 @@ void recordIndexingLatencyMetric(
142143
void recordCPUUtilizationMetric(
143144
ShardId shardId, double cpuUtilization, String operation, boolean isFailed) {
144145
cpuUtilizationHistogram.record(cpuUtilization, createTags(shardId, operation, isFailed));
146+
ShardMetricsCollector.INSTANCE.recordCpuUtilization(cpuUtilization, createTags(shardId));
145147
}
146148

147149
@VisibleForTesting
148150
void recordHeapUsedMetric(
149151
ShardId shardId, double heapUsedBytes, String operation, boolean isFailed) {
150152
heapUsedHistogram.record(heapUsedBytes, createTags(shardId, operation, isFailed));
153+
ShardMetricsCollector.INSTANCE.recordHeapUsed(heapUsedBytes, createTags(shardId));
151154
}
152155

153156
private Tags createTags(ShardId shardId, String operation, boolean isFailed) {
154-
return Tags.create()
155-
.addTag(
156-
RTFMetrics.CommonDimension.INDEX_NAME.toString(),
157-
shardId.getIndex().getName())
158-
.addTag(
159-
RTFMetrics.CommonDimension.INDEX_UUID.toString(),
160-
shardId.getIndex().getUUID())
161-
.addTag(RTFMetrics.CommonDimension.SHARD_ID.toString(), shardId.getId())
162-
.addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation)
163-
.addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed)
164-
.addTag(
165-
RTFMetrics.CommonDimension.SHARD_ROLE.toString(),
166-
primary ? SHARD_ROLE_PRIMARY : SHARD_ROLE_REPLICA);
157+
Tags tags =
158+
Tags.create()
159+
.addTag(
160+
RTFMetrics.CommonDimension.INDEX_NAME.toString(),
161+
shardId.getIndex().getName())
162+
.addTag(
163+
RTFMetrics.CommonDimension.INDEX_UUID.toString(),
164+
shardId.getIndex().getUUID())
165+
.addTag(RTFMetrics.CommonDimension.SHARD_ID.toString(), shardId.getId());
166+
167+
// Only add operation tag if operation is not null
168+
if (operation != null && !operation.isEmpty()) {
169+
tags.addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation)
170+
.addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed)
171+
.addTag(
172+
RTFMetrics.CommonDimension.SHARD_ROLE.toString(),
173+
primary ? SHARD_ROLE_PRIMARY : SHARD_ROLE_REPLICA);
174+
;
175+
}
176+
177+
return tags;
178+
}
179+
180+
private Tags createTags(ShardId shardId) {
181+
return createTags(shardId, null, false);
167182
}
168183
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.performanceanalyzer;
7+
8+
import static org.mockito.ArgumentMatchers.anyString;
9+
import static org.mockito.Mockito.clearInvocations;
10+
import static org.mockito.Mockito.mock;
11+
import static org.mockito.Mockito.times;
12+
import static org.mockito.Mockito.verify;
13+
import static org.mockito.Mockito.verifyZeroInteractions;
14+
import static org.mockito.Mockito.when;
15+
16+
import org.junit.Before;
17+
import org.junit.Test;
18+
import org.opensearch.telemetry.metrics.Histogram;
19+
import org.opensearch.telemetry.metrics.MetricsRegistry;
20+
import org.opensearch.telemetry.metrics.tags.Tags;
21+
22+
public class ShardMetricsCollectorTests {
23+
private ShardMetricsCollector shardMetricsCollector;
24+
private static MetricsRegistry metricsRegistry;
25+
private static Histogram cpuUtilizationHistogram;
26+
private static Histogram heapUsedHistogram;
27+
28+
@Before
29+
public void init() {
30+
if (cpuUtilizationHistogram != null && heapUsedHistogram != null) {
31+
// Clear any previous mock interactions
32+
clearInvocations(cpuUtilizationHistogram, heapUsedHistogram);
33+
}
34+
35+
metricsRegistry = mock(MetricsRegistry.class);
36+
cpuUtilizationHistogram = mock(Histogram.class);
37+
heapUsedHistogram = mock(Histogram.class);
38+
39+
OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry);
40+
41+
when(metricsRegistry.createHistogram(anyString(), anyString(), anyString()))
42+
.thenAnswer(
43+
invocationOnMock -> {
44+
String histogramName = (String) invocationOnMock.getArguments()[0];
45+
if (histogramName.equals(ShardMetricsCollector.SHARD_CPU_UTILIZATION)) {
46+
return cpuUtilizationHistogram;
47+
} else if (histogramName.equals(
48+
ShardMetricsCollector.SHARD_HEAP_ALLOCATED)) {
49+
return heapUsedHistogram;
50+
}
51+
return null;
52+
});
53+
}
54+
55+
@Test
56+
public void testRecordMetrics() {
57+
shardMetricsCollector = ShardMetricsCollector.INSTANCE;
58+
shardMetricsCollector.initialize();
59+
Tags testTags = Tags.create().addTag("shard_id", "1").addTag("operation", "search");
60+
61+
shardMetricsCollector.recordCpuUtilization(75.0, testTags);
62+
verify(cpuUtilizationHistogram, times(1)).record(75.0, testTags);
63+
64+
shardMetricsCollector.recordHeapUsed(1024.0, testTags);
65+
verify(heapUsedHistogram, times(1)).record(1024.0, testTags);
66+
}
67+
68+
@Test
69+
public void testNullHistogram() {
70+
// Reset collector and set null registry
71+
shardMetricsCollector = ShardMetricsCollector.INSTANCE;
72+
OpenSearchResources.INSTANCE.setMetricsRegistry(null);
73+
shardMetricsCollector.initialize();
74+
75+
Tags testTags = Tags.create().addTag("shard_id", "1").addTag("operation", "search");
76+
77+
// Verify no exceptions when recording with null histograms
78+
shardMetricsCollector.recordCpuUtilization(75.0, testTags);
79+
shardMetricsCollector.recordHeapUsed(1024.0, testTags);
80+
81+
// Verify no interactions
82+
verifyZeroInteractions(cpuUtilizationHistogram, heapUsedHistogram);
83+
}
84+
}

0 commit comments

Comments
 (0)