diff --git a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java index 705c6d1d..6e9f85a1 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java +++ b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java @@ -58,6 +58,7 @@ import org.opensearch.performanceanalyzer.collectors.telemetry.RTFDisksCollector; import org.opensearch.performanceanalyzer.collectors.telemetry.RTFHeapMetricsCollector; import org.opensearch.performanceanalyzer.collectors.telemetry.RTFNodeStatsAllShardsMetricsCollector; +import org.opensearch.performanceanalyzer.collectors.telemetry.RTFShardOperationCollector; import org.opensearch.performanceanalyzer.collectors.telemetry.RTFThreadPoolMetricsCollector; import org.opensearch.performanceanalyzer.commons.OSMetricsGeneratorFactory; import org.opensearch.performanceanalyzer.commons.collectors.DisksCollector; @@ -239,6 +240,9 @@ private void scheduleTelemetryCollectors() { scheduledMetricCollectorsExecutor.addScheduledMetricCollector( new RTFCacheConfigMetricsCollector( performanceAnalyzerController, configOverridesWrapper)); + scheduledMetricCollectorsExecutor.addScheduledMetricCollector( + new RTFShardOperationCollector( + performanceAnalyzerController, configOverridesWrapper)); } private void scheduleRcaCollectors() { @@ -403,6 +407,8 @@ public Collection createComponents( // initialize it. This is the earliest point at which we know ClusterService is created. // So, call the initialize method here. clusterSettingsManager.initialize(); + // Initialize ShardMetricsCollector histograms + ShardMetricsCollector.INSTANCE.initialize(); return Collections.singletonList(performanceAnalyzerController); } diff --git a/src/main/java/org/opensearch/performanceanalyzer/ShardMetricsCollector.java b/src/main/java/org/opensearch/performanceanalyzer/ShardMetricsCollector.java new file mode 100644 index 00000000..d18f9406 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/ShardMetricsCollector.java @@ -0,0 +1,119 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer; + +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +/** + * A singleton collector for recording per-shard CPU and heap metrics in OpenSearch. This class + * maintains two histograms: + * + * + * + * The metrics are recorded with tags for better categorization and analysis. + */ +public final class ShardMetricsCollector { + /** Singleton instance of the ShardMetricsCollector */ + public static final ShardMetricsCollector INSTANCE = new ShardMetricsCollector(); + + public static final String SHARD_CPU_UTILIZATION = "shard_cpu_utilization"; + public static final String SHARD_HEAP_ALLOCATED = "shard_heap_allocated"; + + /** Histogram for tracking CPU utilization -- GETTER -- Gets the CPU utilization histogram. */ + private Histogram cpuUtilizationHistogram; + + /** Histogram for tracking heap usage -- GETTER -- Gets the heap usage histogram. */ + private Histogram heapUsedHistogram; + + /** + * Private constructor that initializes the CPU and heap histograms. This is private to ensure + * singleton pattern. + */ + private ShardMetricsCollector() { + this.cpuUtilizationHistogram = null; + this.heapUsedHistogram = null; + } + + /** Initialise metric histograms */ + public void initialize() { + if (this.cpuUtilizationHistogram == null) { + this.cpuUtilizationHistogram = createCpuUtilizationHistogram(); + } + if (this.heapUsedHistogram == null) { + this.heapUsedHistogram = createHeapUsedHistogram(); + } + } + + /** + * Creates a histogram for tracking CPU utilization. + * + * @return A histogram instance for CPU metrics, or null if metrics registry is unavailable + */ + private Histogram createCpuUtilizationHistogram() { + MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + SHARD_CPU_UTILIZATION, + "CPU Utilization per shard for an operation", + RTFMetrics.MetricUnits.RATE.toString()); + } + return null; + } + + /** + * Creates a histogram for tracking heap usage. + * + * @return A histogram instance for heap metrics, or null if metrics registry is unavailable + */ + private Histogram createHeapUsedHistogram() { + MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + SHARD_HEAP_ALLOCATED, + "Heap Utilization per shard for an operation", + RTFMetrics.MetricUnits.BYTE.toString()); + } + return null; + } + + /** + * Records a CPU utilization measurement with associated tags. + * + * @param cpuUtilization The CPU utilization value to record (as a percentage) + * @param tags The tags to associate with this measurement (e.g., shard ID, operation type) + */ + public void recordCpuUtilization(double cpuUtilization, Tags tags) { + if (cpuUtilizationHistogram != null) { + cpuUtilizationHistogram.record(cpuUtilization, tags); + } + } + + /** + * Records a heap usage measurement with associated tags. + * + * @param heapBytes The heap usage value to record (in bytes) + * @param tags The tags to associate with this measurement (e.g., shard ID, operation type) + */ + public void recordHeapUsed(double heapBytes, Tags tags) { + if (heapUsedHistogram != null) { + heapUsedHistogram.record(heapBytes, tags); + } + } + + public Histogram getCpuUtilizationHistogram() { + return cpuUtilizationHistogram; + } + + public Histogram getHeapUsedHistogram() { + return heapUsedHistogram; + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/collectors/NodeStatsAllShardsMetricsCollector.java b/src/main/java/org/opensearch/performanceanalyzer/collectors/NodeStatsAllShardsMetricsCollector.java index 72c10220..b2d1235b 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/collectors/NodeStatsAllShardsMetricsCollector.java +++ b/src/main/java/org/opensearch/performanceanalyzer/collectors/NodeStatsAllShardsMetricsCollector.java @@ -13,6 +13,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableMap; import java.lang.reflect.Field; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.logging.log4j.LogManager; @@ -57,9 +58,7 @@ public class NodeStatsAllShardsMetricsCollector extends PerformanceAnalyzerMetri private static final int KEYS_PATH_LENGTH = 2; private static final Logger LOG = LogManager.getLogger(NodeStatsAllShardsMetricsCollector.class); - private HashMap currentShards; - private HashMap currentPerShardStats; - private HashMap prevPerShardStats; + private Map prevPerShardStats; private final PerformanceAnalyzerController controller; public NodeStatsAllShardsMetricsCollector(final PerformanceAnalyzerController controller) { @@ -68,21 +67,10 @@ public NodeStatsAllShardsMetricsCollector(final PerformanceAnalyzerController co "NodeStatsMetrics", NODE_STATS_ALL_SHARDS_METRICS_COLLECTOR_EXECUTION_TIME, NODESTATS_COLLECTION_ERROR); - currentShards = new HashMap<>(); prevPerShardStats = new HashMap<>(); - currentPerShardStats = new HashMap<>(); this.controller = controller; } - private void populateCurrentShards() { - if (!currentShards.isEmpty()) { - prevPerShardStats.putAll(currentPerShardStats); - currentPerShardStats.clear(); - } - currentShards.clear(); - currentShards = Utils.getShards(); - } - private static final Map maps = new HashMap() { { @@ -152,13 +140,13 @@ public void collectMetrics(long startTime) { if (indicesService == null) { return; } - populateCurrentShards(); - populatePerShardStats(indicesService); - for (HashMap.Entry currentShard : currentPerShardStats.entrySet()) { - ShardId shardId = (ShardId) currentShard.getKey(); - ShardStats currentShardStats = (ShardStats) currentShard.getValue(); - if (prevPerShardStats.size() == 0) { + Map currentPerShardStats = populatePerShardStats(indicesService); + + for (HashMap.Entry currentShard : currentPerShardStats.entrySet()) { + ShardId shardId = currentShard.getKey(); + ShardStats currentShardStats = currentShard.getValue(); + if (prevPerShardStats.isEmpty() || !prevPerShardStats.containsKey(shardId)) { // Populating value for the first run. populateMetricValue( currentShardStats, startTime, shardId.getIndexName(), shardId.id()); @@ -179,6 +167,7 @@ public void collectMetrics(long startTime) { populateDiffMetricValue( prevValue, currValue, startTime, shardId.getIndexName(), shardId.id()); } + prevPerShardStats = currentPerShardStats; } // - Separated to have a unit test; and catch any code changes around this field @@ -188,10 +177,12 @@ Field getNodeIndicesStatsByShardField() throws Exception { return field; } - public void populatePerShardStats(IndicesService indicesService) { + public Map populatePerShardStats(IndicesService indicesService) { // Populate the shard stats per shard. - for (HashMap.Entry currentShard : currentShards.entrySet()) { - IndexShard currentIndexShard = (IndexShard) currentShard.getValue(); + HashMap currentShards = Utils.getShards(); + Map currentPerShardStats = new HashMap<>(Collections.emptyMap()); + for (HashMap.Entry currentShard : currentShards.entrySet()) { + IndexShard currentIndexShard = currentShard.getValue(); IndexShardStats currentIndexShardStats = Utils.indexShardStats( indicesService, @@ -200,20 +191,24 @@ public void populatePerShardStats(IndicesService indicesService) { CommonStatsFlags.Flag.QueryCache, CommonStatsFlags.Flag.FieldData, CommonStatsFlags.Flag.RequestCache)); - for (ShardStats shardStats : currentIndexShardStats.getShards()) { - currentPerShardStats.put(currentIndexShardStats.getShardId(), shardStats); + if (currentIndexShardStats != null) { + for (ShardStats shardStats : currentIndexShardStats.getShards()) { + currentPerShardStats.put(currentIndexShardStats.getShardId(), shardStats); + } } } + return currentPerShardStats; } public void populateMetricValue( ShardStats shardStats, long startTime, String IndexName, int ShardId) { - StringBuilder value = new StringBuilder(); - value.append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds()); - // Populate the result with cache specific metrics only. - value.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor) - .append(new NodeStatsMetricsAllShardsPerCollectionStatus(shardStats).serialize()); - saveMetricValues(value.toString(), startTime, IndexName, String.valueOf(ShardId)); + String value = + PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds() + + + // Populate the result with cache specific metrics only. + PerformanceAnalyzerMetrics.sMetricNewLineDelimitor + + new NodeStatsMetricsAllShardsPerCollectionStatus(shardStats).serialize(); + saveMetricValues(value, startTime, IndexName, String.valueOf(ShardId)); } public void populateDiffMetricValue( diff --git a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java index 9bd21757..2f6c01df 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java +++ b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFNodeStatsAllShardsMetricsCollector.java @@ -11,6 +11,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableMap; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.logging.log4j.LogManager; @@ -42,8 +43,6 @@ public class RTFNodeStatsAllShardsMetricsCollector extends PerformanceAnalyzerMe .samplingInterval; private static final Logger LOG = LogManager.getLogger(RTFNodeStatsAllShardsMetricsCollector.class); - private Map currentShards; - private Map currentPerShardStats; private Map prevPerShardStats; private MetricsRegistry metricsRegistry; private Counter cacheQueryHitMetrics; @@ -67,23 +66,12 @@ public RTFNodeStatsAllShardsMetricsCollector( "RTFNodeStatsMetricsCollector", RTF_NODE_STATS_ALL_SHARDS_METRICS_COLLECTOR_EXECUTION_TIME, RTF_NODESTATS_COLLECTION_ERROR); - currentShards = new HashMap<>(); prevPerShardStats = new HashMap<>(); - currentPerShardStats = new HashMap<>(); this.metricsInitialised = false; this.performanceAnalyzerController = performanceAnalyzerController; this.configOverridesWrapper = configOverridesWrapper; } - private void populateCurrentShards() { - if (!currentShards.isEmpty()) { - prevPerShardStats.putAll(currentPerShardStats); - currentPerShardStats.clear(); - } - currentShards.clear(); - currentShards = Utils.getShards(); - } - private static final ImmutableMap valueCalculators = ImmutableMap.of( RTFMetrics.ShardStatsValue.INDEXING_THROTTLE_TIME.toString(), @@ -117,7 +105,7 @@ private void populateCurrentShards() { public void collectMetrics(long startTime) { if (performanceAnalyzerController.isCollectorDisabled( configOverridesWrapper, getCollectorName())) { - LOG.info("RTFDisksCollector is disabled. Skipping collection."); + LOG.info("RTFNodeStatsMetricsCollector is disabled. Skipping collection."); return; } IndicesService indicesService = OpenSearchResources.INSTANCE.getIndicesService(); @@ -133,38 +121,30 @@ configOverridesWrapper, getCollectorName())) { LOG.debug("Executing collect metrics for RTFNodeStatsAllShardsMetricsCollector"); initialiseMetricsIfNeeded(); - populateCurrentShards(); - populatePerShardStats(indicesService); - - for (Map.Entry currentShard : currentPerShardStats.entrySet()) { - ShardId shardId = (ShardId) currentShard.getKey(); - ShardStats currentShardStats = (ShardStats) currentShard.getValue(); - if (prevPerShardStats.size() == 0) { - // Populating value for the first run. + Map currentPerShardStats = populatePerShardStats(indicesService); + + for (Map.Entry currentShard : currentPerShardStats.entrySet()) { + ShardId shardId = currentShard.getKey(); + ShardStats currentShardStats = currentShard.getValue(); + if (prevPerShardStats.isEmpty() || prevPerShardStats.get(shardId) == null) { + // Populating value for the first run of shard. recordMetrics( new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats), shardId); continue; } ShardStats prevShardStats = prevPerShardStats.get(shardId); - if (prevShardStats == null) { - // Populate value for shards which are new and were not present in the previous - // run. - recordMetrics( - new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats), - shardId); - continue; - } NodeStatsMetricsAllShardsPerCollectionStatus prevValue = new NodeStatsMetricsAllShardsPerCollectionStatus(prevShardStats); NodeStatsMetricsAllShardsPerCollectionStatus currValue = new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats); populateDiffMetricValue(prevValue, currValue, shardId); } + prevPerShardStats = currentPerShardStats; } private void initialiseMetricsIfNeeded() { - if (metricsInitialised == false) { + if (!metricsInitialised) { cacheQueryHitMetrics = metricsRegistry.createCounter( RTFMetrics.ShardStatsValue.Constants.QUEY_CACHE_HIT_COUNT_VALUE, @@ -222,10 +202,12 @@ private void initialiseMetricsIfNeeded() { } } - public void populatePerShardStats(IndicesService indicesService) { + public Map populatePerShardStats(IndicesService indicesService) { // Populate the shard stats per shard. - for (Map.Entry currentShard : currentShards.entrySet()) { - IndexShard currentIndexShard = (IndexShard) currentShard.getValue(); + Map currentShards = Utils.getShards(); + Map currentPerShardStats = new HashMap<>(Collections.emptyMap()); + for (Map.Entry currentShard : currentShards.entrySet()) { + IndexShard currentIndexShard = currentShard.getValue(); IndexShardStats currentIndexShardStats = Utils.indexShardStats( indicesService, @@ -234,10 +216,13 @@ public void populatePerShardStats(IndicesService indicesService) { CommonStatsFlags.Flag.QueryCache, CommonStatsFlags.Flag.FieldData, CommonStatsFlags.Flag.RequestCache)); - for (ShardStats shardStats : currentIndexShardStats.getShards()) { - currentPerShardStats.put(currentIndexShardStats.getShardId(), shardStats); + if (currentIndexShardStats != null) { + for (ShardStats shardStats : currentIndexShardStats.getShards()) { + currentPerShardStats.put(currentIndexShardStats.getShardId(), shardStats); + } } } + return currentPerShardStats; } private void recordMetrics( diff --git a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationCollector.java b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationCollector.java new file mode 100644 index 00000000..016b8efe --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationCollector.java @@ -0,0 +1,185 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.collectors.telemetry; + +import java.util.HashMap; +import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.core.index.shard.ShardId; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.collectors.PerformanceAnalyzerMetricsCollector; +import org.opensearch.performanceanalyzer.commons.collectors.TelemetryCollector; +import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper; +import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics.MetricUnits; +import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode; +import org.opensearch.performanceanalyzer.commons.stats.metrics.StatMetrics; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.performanceanalyzer.util.Utils; +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +/** + * This collector measures indexing and search rate per shard. The metric measurement is difference + * between current and last window's operation. For example - if the last window had operation count + * as 10, and now it changed to 12, then collector will publish 2 ops/interval. + */ +public class RTFShardOperationCollector extends PerformanceAnalyzerMetricsCollector + implements TelemetryCollector { + + private static final Logger LOG = LogManager.getLogger(RTFShardOperationCollector.class); + public static final int SAMPLING_TIME_INTERVAL = + MetricsConfiguration.CONFIG_MAP.get(RTFShardOperationCollector.class).samplingInterval; + + private Counter indexingRateCounter; + private Counter searchRateCounter; + + private Map previousIndexOps; + private final long lastCollectionTimeInMillis; + + private MetricsRegistry metricsRegistry; + private boolean metricsInitialized; + private final PerformanceAnalyzerController controller; + private final ConfigOverridesWrapper configOverridesWrapper; + + public RTFShardOperationCollector( + PerformanceAnalyzerController controller, + ConfigOverridesWrapper configOverridesWrapper) { + super( + SAMPLING_TIME_INTERVAL, + "RTFShardOperationCollector", + StatMetrics.RTF_SHARD_OPERATION_COLLECTOR_EXECUTION_TIME, + StatExceptionCode.RTF_SHARD_OPERATION_COLLECTOR_ERROR); + + this.controller = controller; + this.configOverridesWrapper = configOverridesWrapper; + this.metricsInitialized = false; + this.previousIndexOps = new HashMap<>(); + this.lastCollectionTimeInMillis = System.currentTimeMillis(); + } + + @Override + public void collectMetrics(long startTime) { + if (controller.isCollectorDisabled(configOverridesWrapper, getCollectorName())) { + LOG.info("RTFShardOperationCollector is disabled. Skipping collection."); + return; + } + + metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry == null) { + LOG.error("Could not get the instance of MetricsRegistry class"); + return; + } + + initializeMetricsIfNeeded(); + LOG.debug("Executing collect metrics for RTFShardOperationCollector"); + + // Get all shards + Map currentShards = Utils.getShards(); + Map currentIndexOpsMap = new HashMap<>(); + + for (Map.Entry entry : currentShards.entrySet()) { + ShardId shardId = entry.getKey(); + IndexShard shard = entry.getValue(); + + try { + long currentIndexingOps = shard.indexingStats().getTotal().getIndexCount(); + long currentSearchOps = shard.searchStats().getTotal().getQueryCount(); + + if (previousIndexOps.containsKey(shardId)) { + long prevIndexingOps = previousIndexOps.get(shardId).indexOps(); + long prevSearchOps = previousIndexOps.get(shardId).searchOps(); + processOperations( + prevIndexingOps, + prevSearchOps, + currentIndexingOps, + currentSearchOps, + shardId); + } else { + processOperations(0, 0, currentIndexingOps, currentSearchOps, shardId); + } + currentIndexOpsMap.put( + shardId, new ShardOperation(currentIndexingOps, currentSearchOps)); + } catch (Exception e) { + LOG.error( + "Error collecting indexing/search rate metrics for shard {}: {}", + shardId, + e.getMessage()); + } + } + + // Update previous values for next collection + this.previousIndexOps = currentIndexOpsMap; + } + + private void processOperations( + long prevIndexingOps, + long prevSearchOps, + long currentIndexingOps, + long currentSearchOps, + ShardId shardId) { + long indexingOpsDiff = Math.max(0, currentIndexingOps - prevIndexingOps); + long searchOpsDiff = Math.max(0, currentSearchOps - prevSearchOps); + + if (indexingOpsDiff > 0) { + Tags tags = createTags(shardId); + indexingRateCounter.add(indexingOpsDiff, tags); + } + + if (searchOpsDiff > 0) { + Tags tags = createTags(shardId); + searchRateCounter.add(searchOpsDiff, tags); + } + } + + // attributes= {index_name="test", shard_id="0"} + private Tags createTags(ShardId shardId) { + Tags shardOperationsMetricsTag = + Tags.create() + .addTag( + RTFMetrics.CommonDimension.INDEX_NAME.toString(), + shardId.getIndexName()) + .addTag( + RTFMetrics.CommonDimension.SHARD_ID.toString(), + String.valueOf(shardId.getId())); + + if (shardId.getIndex() != null) { + shardOperationsMetricsTag.addTag( + RTFMetrics.CommonDimension.INDEX_UUID.toString(), shardId.getIndex().getUUID()); + } + return shardOperationsMetricsTag; + } + + private void initializeMetricsIfNeeded() { + if (!metricsInitialized) { + indexingRateCounter = + metricsRegistry.createCounter( + RTFMetrics.ShardOperationsValue.Constants.SHARD_INDEXING_RATE, + "Indexing operations per shard", + MetricUnits.RATE.toString()); + + searchRateCounter = + metricsRegistry.createCounter( + RTFMetrics.ShardOperationsValue.Constants.SHARD_SEARCH_RATE, + "Search operations per shard", + MetricUnits.RATE.toString()); + + metricsInitialized = true; + } + } + + /** + * Stores the index and search operations for a shard. + * + * @param indexOps count of index operations. + * @param searchOps count of search operations + */ + private record ShardOperation(long indexOps, long searchOps) {} +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java index 6b7921cc..6d7b0532 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java +++ b/src/main/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListener.java @@ -6,6 +6,7 @@ package org.opensearch.performanceanalyzer.listener; import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR; +import static org.opensearch.performanceanalyzer.util.Utils.computeShareFactor; import com.google.common.annotations.VisibleForTesting; import java.util.HashMap; @@ -15,8 +16,10 @@ import org.opensearch.core.action.NotifyOnceListener; import org.opensearch.index.shard.SearchOperationListener; import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.ShardMetricsCollector; import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector; import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics.ShardOperationsValue; import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; import org.opensearch.performanceanalyzer.util.Utils; @@ -47,6 +50,7 @@ public class RTFPerformanceAnalyzerSearchListener private final PerformanceAnalyzerController controller; private final Histogram cpuUtilizationHistogram; private final Histogram heapUsedHistogram; + private final Histogram searchLatencyHistogram; private final int numProcessors; public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController controller) { @@ -55,7 +59,9 @@ public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController createCPUUtilizationHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); this.heapUsedHistogram = createHeapUsedHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); - this.threadLocal = ThreadLocal.withInitial(() -> new HashMap()); + this.searchLatencyHistogram = + createSearchLatencyHistogram(OpenSearchResources.INSTANCE.getMetricsRegistry()); + this.threadLocal = ThreadLocal.withInitial(HashMap::new); this.numProcessors = Runtime.getRuntime().availableProcessors(); } @@ -83,6 +89,20 @@ private Histogram createHeapUsedHistogram(MetricsRegistry metricsRegistry) { } } + // This histogram will help to get the total latency for search request using getMax over an + // interval. + private Histogram createSearchLatencyHistogram(MetricsRegistry metricsRegistry) { + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + ShardOperationsValue.SHARD_SEARCH_LATENCY.toString(), + "Search latency per shard per phase", + RTFMetrics.MetricUnits.MILLISECOND.toString()); + } else { + LOG.debug("MetricsRegistry is null"); + return null; + } + } + @Override public String toString() { return RTFPerformanceAnalyzerSearchListener.class.getSimpleName(); @@ -170,9 +190,13 @@ public void preQueryPhase(SearchContext searchContext) { @Override public void queryPhase(SearchContext searchContext, long tookInNanos) { long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, System.nanoTime()); - long queryTime = (System.nanoTime() - queryStartTime); + double queryTimeInMills = tookInNanos / 1_000_000.0; + + searchLatencyHistogram.record( + queryTimeInMills, createTags(searchContext, SHARD_QUERY_PHASE, false)); + addResourceTrackingCompletionListener( - searchContext, queryStartTime, queryTime, SHARD_QUERY_PHASE, false); + searchContext, queryStartTime, tookInNanos, SHARD_QUERY_PHASE, false); } @Override @@ -191,9 +215,12 @@ public void preFetchPhase(SearchContext searchContext) { @Override public void fetchPhase(SearchContext searchContext, long tookInNanos) { long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, System.nanoTime()); - long fetchTime = (System.nanoTime() - fetchStartTime); + double fetchTimeInMills = tookInNanos / 1_000_000.0; + searchLatencyHistogram.record( + fetchTimeInMills, createTags(searchContext, SHARD_FETCH_PHASE, false)); + addResourceTrackingCompletionListenerForFetchPhase( - searchContext, fetchStartTime, fetchTime, SHARD_FETCH_PHASE, false); + searchContext, fetchStartTime, tookInNanos, SHARD_FETCH_PHASE, false); } @Override @@ -261,22 +288,41 @@ protected void innerOnResponse(Task task) { * particular phaseTime and the total time till this calculation happen from the * overall start time. */ - long totalTime = System.nanoTime() - startTime; + long totalTime = System.nanoTime() - task.getStartTimeNanos(); double shareFactor = computeShareFactor(phaseTookTime, totalTime); - cpuUtilizationHistogram.record( + LOG.debug( + "Total task time {} ns. Total Operation Listener time {} ns. " + + "Phase took time {} ns. Share factor {} ", + totalTime, + System.nanoTime() - startTime, + phaseTookTime, + shareFactor); + double cpuUtilization = Utils.calculateCPUUtilization( numProcessors, totalTime, task.getTotalResourceStats().getCpuTimeInNanos(), - shareFactor), - createTags()); - heapUsedHistogram.record( - Math.max(0, task.getTotalResourceStats().getMemoryInBytes() * shareFactor), - createTags()); + shareFactor); + cpuUtilizationHistogram.record( + cpuUtilization, createTags(searchContext, phase, isFailed)); + ShardMetricsCollector.INSTANCE.recordCpuUtilization( + cpuUtilization, createTags(searchContext)); + double heapUsed = + Math.max(0, task.getTotalResourceStats().getMemoryInBytes() * shareFactor); + heapUsedHistogram.record(heapUsed, createTags(searchContext, phase, isFailed)); + ShardMetricsCollector.INSTANCE.recordHeapUsed(heapUsed, createTags(searchContext)); } - private Tags createTags() { - return Tags.create() + @Override + protected void innerOnFailure(Exception e) { + LOG.error("Error is executing the the listener", e); + } + }; + } + + private Tags createTags(SearchContext searchContext, String phase, boolean isFailed) { + Tags tags = + Tags.create() .addTag( RTFMetrics.CommonDimension.INDEX_NAME.toString(), searchContext.request().shardId().getIndex().getName()) @@ -285,20 +331,17 @@ private Tags createTags() { searchContext.request().shardId().getIndex().getUUID()) .addTag( RTFMetrics.CommonDimension.SHARD_ID.toString(), - searchContext.request().shardId().getId()) - .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), phase) - .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed); - } + searchContext.request().shardId().getId()); - @Override - protected void innerOnFailure(Exception e) { - LOG.error("Error is executing the the listener", e); - } - }; + // Only add phase tag if phase is not null + if (phase != null && !phase.isEmpty()) { + tags.addTag(RTFMetrics.CommonDimension.OPERATION.toString(), phase) + .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed); + } + return tags; } - @VisibleForTesting - static double computeShareFactor(long phaseTookTime, long totalTime) { - return Math.min(1, ((double) phaseTookTime) / Math.max(1.0, totalTime)); + private Tags createTags(SearchContext searchContext) { + return createTags(searchContext, null, false); } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java index 2ee6a3a9..92d49409 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannel.java @@ -15,6 +15,7 @@ import org.opensearch.Version; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.performanceanalyzer.ShardMetricsCollector; import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; import org.opensearch.performanceanalyzer.util.Utils; import org.opensearch.telemetry.metrics.Histogram; @@ -39,6 +40,8 @@ public final class RTFPerformanceAnalyzerTransportChannel implements TransportCh private long operationStartTime; private Histogram cpuUtilizationHistogram; + private Histogram indexingLatencyHistogram; + private Histogram heapUsedHistogram; private TransportChannel original; private String indexName; @@ -47,15 +50,20 @@ public final class RTFPerformanceAnalyzerTransportChannel implements TransportCh private long threadID; private int numProcessors; + private long initialHeapUsedBytes; void set( TransportChannel original, Histogram cpuUtilizationHistogram, + Histogram indexingLatencyHistogram, + Histogram heapUsedHistogram, String indexName, ShardId shardId, boolean bPrimary) { this.original = original; this.cpuUtilizationHistogram = cpuUtilizationHistogram; + this.indexingLatencyHistogram = indexingLatencyHistogram; + this.heapUsedHistogram = heapUsedHistogram; this.indexName = indexName; this.shardId = shardId; this.primary = bPrimary; @@ -63,6 +71,7 @@ void set( this.operationStartTime = System.nanoTime(); threadID = Thread.currentThread().getId(); this.cpuStartTime = threadMXBean.getThreadCpuTime(threadID); + this.initialHeapUsedBytes = threadMXBean.getThreadAllocatedBytes(threadID); this.numProcessors = Runtime.getRuntime().availableProcessors(); LOG.debug("Thread Name {}", Thread.currentThread().getName()); } @@ -102,6 +111,18 @@ public void sendResponse(Exception exception) throws IOException { private void emitMetrics(boolean isFailed) { double cpuUtilization = calculateCPUUtilization(operationStartTime, cpuStartTime); recordCPUUtilizationMetric(shardId, cpuUtilization, OPERATION_SHARD_BULK, isFailed); + + double heapUsedBytes = calculateHeapUsed(); + recordHeapUsedMetric(shardId, heapUsedBytes, OPERATION_SHARD_BULK, isFailed); + + long latencyInNanos = System.nanoTime() - operationStartTime; + double latencyInMillis = latencyInNanos / 1_000_000.0; + recordIndexingLatencyMetric(shardId, latencyInMillis, OPERATION_SHARD_BULK, isFailed); + } + + private double calculateHeapUsed() { + double shareFactor = Utils.computeShareFactor(System.nanoTime(), operationStartTime); + return shareFactor * threadMXBean.getThreadAllocatedBytes(threadID) - initialHeapUsedBytes; } private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTime) { @@ -112,11 +133,28 @@ private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTi numProcessors, (System.nanoTime() - phaseStartTime), totalCpuTime, 1.0); } + @VisibleForTesting + void recordIndexingLatencyMetric( + ShardId shardId, double indexingLatency, String operation, boolean isFailed) { + indexingLatencyHistogram.record(indexingLatency, createTags(shardId, operation, isFailed)); + } + @VisibleForTesting void recordCPUUtilizationMetric( ShardId shardId, double cpuUtilization, String operation, boolean isFailed) { - cpuUtilizationHistogram.record( - cpuUtilization, + cpuUtilizationHistogram.record(cpuUtilization, createTags(shardId, operation, isFailed)); + ShardMetricsCollector.INSTANCE.recordCpuUtilization(cpuUtilization, createTags(shardId)); + } + + @VisibleForTesting + void recordHeapUsedMetric( + ShardId shardId, double heapUsedBytes, String operation, boolean isFailed) { + heapUsedHistogram.record(heapUsedBytes, createTags(shardId, operation, isFailed)); + ShardMetricsCollector.INSTANCE.recordHeapUsed(heapUsedBytes, createTags(shardId)); + } + + private Tags createTags(ShardId shardId, String operation, boolean isFailed) { + Tags tags = Tags.create() .addTag( RTFMetrics.CommonDimension.INDEX_NAME.toString(), @@ -124,11 +162,22 @@ void recordCPUUtilizationMetric( .addTag( RTFMetrics.CommonDimension.INDEX_UUID.toString(), shardId.getIndex().getUUID()) - .addTag(RTFMetrics.CommonDimension.SHARD_ID.toString(), shardId.getId()) - .addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation) - .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed) - .addTag( - RTFMetrics.CommonDimension.SHARD_ROLE.toString(), - primary ? SHARD_ROLE_PRIMARY : SHARD_ROLE_REPLICA)); + .addTag(RTFMetrics.CommonDimension.SHARD_ID.toString(), shardId.getId()); + + // Only add operation tag if operation is not null + if (operation != null && !operation.isEmpty()) { + tags.addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation) + .addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed) + .addTag( + RTFMetrics.CommonDimension.SHARD_ROLE.toString(), + primary ? SHARD_ROLE_PRIMARY : SHARD_ROLE_REPLICA); + ; + } + + return tags; + } + + private Tags createTags(ShardId shardId) { + return createTags(shardId, null, false); } } diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java index 82a0abe6..fe10f6e7 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportRequestHandler.java @@ -15,6 +15,7 @@ import org.opensearch.performanceanalyzer.OpenSearchResources; import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector; import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics.MetricUnits; import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; import org.opensearch.tasks.Task; @@ -38,12 +39,16 @@ public final class RTFPerformanceAnalyzerTransportRequestHandler actualHandler; private boolean logOnce = false; private final Histogram cpuUtilizationHistogram; + private final Histogram indexingLatencyHistogram; + private final Histogram heapUsedHistogram; RTFPerformanceAnalyzerTransportRequestHandler( TransportRequestHandler actualHandler, PerformanceAnalyzerController controller) { this.actualHandler = actualHandler; this.controller = controller; this.cpuUtilizationHistogram = createCPUUtilizationHistogram(); + this.indexingLatencyHistogram = createIndexingLatencyHistogram(); + this.heapUsedHistogram = createHeapUsedHistogram(); } private Histogram createCPUUtilizationHistogram() { @@ -58,6 +63,30 @@ private Histogram createCPUUtilizationHistogram() { } } + private Histogram createHeapUsedHistogram() { + MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + RTFMetrics.OSMetrics.HEAP_ALLOCATED.toString(), + "Heap Utilization per shard for an operation", + RTFMetrics.MetricUnits.BYTE.toString()); + } else { + return null; + } + } + + private Histogram createIndexingLatencyHistogram() { + MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + if (metricsRegistry != null) { + return metricsRegistry.createHistogram( + RTFMetrics.ShardOperationsValue.SHARD_INDEXING_LATENCY.toString(), + "Indexing Latency per shard for an operation", + MetricUnits.MILLISECOND.toString()); + } else { + return null; + } + } + @Override public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { actualHandler.messageReceived(request, getChannel(request, channel, task), task); @@ -100,17 +129,22 @@ private TransportChannel getShardBulkChannel(T request, TransportChannel channel TransportRequest transportRequest = ((ConcreteShardRequest) request).getRequest(); - if (!(transportRequest instanceof BulkShardRequest)) { + if (!(transportRequest instanceof BulkShardRequest bsr)) { return channel; } - BulkShardRequest bsr = (BulkShardRequest) transportRequest; RTFPerformanceAnalyzerTransportChannel rtfPerformanceAnalyzerTransportChannel = new RTFPerformanceAnalyzerTransportChannel(); try { rtfPerformanceAnalyzerTransportChannel.set( - channel, cpuUtilizationHistogram, bsr.index(), bsr.shardId(), bPrimary); + channel, + cpuUtilizationHistogram, + indexingLatencyHistogram, + heapUsedHistogram, + bsr.index(), + bsr.shardId(), + bPrimary); } catch (Exception ex) { if (!logOnce) { LOG.error(ex); diff --git a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java index 26620455..4d1158c1 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java +++ b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java @@ -5,6 +5,7 @@ package org.opensearch.performanceanalyzer.util; +import com.google.common.annotations.VisibleForTesting; import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; @@ -56,6 +57,8 @@ public static void configureMetrics() { MetricsConfiguration.CONFIG_MAP.put( RTFCacheConfigMetricsCollector.class, new MetricsConfiguration.MetricConfig(60000, 0)); + MetricsConfiguration.CONFIG_MAP.put( + RTFShardOperationCollector.class, new MetricsConfiguration.MetricConfig(5000, 0)); } // These methods are utility functions for the Node Stat Metrics Collectors. These methods are @@ -148,4 +151,9 @@ public static double calculateCPUUtilization( LOG.debug("Performance Analyzer CPUUtilization calculation with cpuUtil {}", cpuUtil); return cpuUtil; } + + @VisibleForTesting + public static double computeShareFactor(long phaseTookTime, long totalTime) { + return Math.min(1, ((double) phaseTookTime) / Math.max(1.0, totalTime)); + } } diff --git a/src/test/java/org/opensearch/performanceanalyzer/ShardMetricsCollectorTests.java b/src/test/java/org/opensearch/performanceanalyzer/ShardMetricsCollectorTests.java new file mode 100644 index 00000000..872aba52 --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/ShardMetricsCollectorTests.java @@ -0,0 +1,84 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.clearInvocations; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.when; + +import org.junit.Before; +import org.junit.Test; +import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.telemetry.metrics.tags.Tags; + +public class ShardMetricsCollectorTests { + private ShardMetricsCollector shardMetricsCollector; + private static MetricsRegistry metricsRegistry; + private static Histogram cpuUtilizationHistogram; + private static Histogram heapUsedHistogram; + + @Before + public void init() { + if (cpuUtilizationHistogram != null && heapUsedHistogram != null) { + // Clear any previous mock interactions + clearInvocations(cpuUtilizationHistogram, heapUsedHistogram); + } + + metricsRegistry = mock(MetricsRegistry.class); + cpuUtilizationHistogram = mock(Histogram.class); + heapUsedHistogram = mock(Histogram.class); + + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + + when(metricsRegistry.createHistogram(anyString(), anyString(), anyString())) + .thenAnswer( + invocationOnMock -> { + String histogramName = (String) invocationOnMock.getArguments()[0]; + if (histogramName.equals(ShardMetricsCollector.SHARD_CPU_UTILIZATION)) { + return cpuUtilizationHistogram; + } else if (histogramName.equals( + ShardMetricsCollector.SHARD_HEAP_ALLOCATED)) { + return heapUsedHistogram; + } + return null; + }); + } + + @Test + public void testRecordMetrics() { + shardMetricsCollector = ShardMetricsCollector.INSTANCE; + shardMetricsCollector.initialize(); + Tags testTags = Tags.create().addTag("shard_id", "1").addTag("operation", "search"); + + shardMetricsCollector.recordCpuUtilization(75.0, testTags); + verify(cpuUtilizationHistogram, times(1)).record(75.0, testTags); + + shardMetricsCollector.recordHeapUsed(1024.0, testTags); + verify(heapUsedHistogram, times(1)).record(1024.0, testTags); + } + + @Test + public void testNullHistogram() { + // Reset collector and set null registry + shardMetricsCollector = ShardMetricsCollector.INSTANCE; + OpenSearchResources.INSTANCE.setMetricsRegistry(null); + shardMetricsCollector.initialize(); + + Tags testTags = Tags.create().addTag("shard_id", "1").addTag("operation", "search"); + + // Verify no exceptions when recording with null histograms + shardMetricsCollector.recordCpuUtilization(75.0, testTags); + shardMetricsCollector.recordHeapUsed(1024.0, testTags); + + // Verify no interactions + verifyZeroInteractions(cpuUtilizationHistogram, heapUsedHistogram); + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationCollectorTests.java b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationCollectorTests.java new file mode 100644 index 00000000..f21a67ed --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFShardOperationCollectorTests.java @@ -0,0 +1,122 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.collectors.telemetry; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.indices.IndicesService; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper; +import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.MetricsRegistry; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +public class RTFShardOperationCollectorTests extends OpenSearchSingleNodeTestCase { + + private long startTimeInMills = 1153721339; + private static final String TEST_INDEX = "test"; + private RTFShardOperationCollector rtfShardOperationRateCollector; + + @Mock private MetricsRegistry metricsRegistry; + @Mock private Counter indexingRateCounter; + @Mock private Counter searchRateCounter; + @Mock private ConfigOverridesWrapper configOverridesWrapper; + @Mock private PerformanceAnalyzerController performanceAnalyzerController; + + @Before + public void init() { + MockitoAnnotations.initMocks(this); + + MetricsConfiguration.CONFIG_MAP.put( + RTFShardOperationCollector.class, MetricsConfiguration.cdefault); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + OpenSearchResources.INSTANCE.setIndicesService(indicesService); + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + + when(metricsRegistry.createCounter(anyString(), anyString(), anyString())) + .thenReturn(indexingRateCounter) + .thenReturn(searchRateCounter); + + when(metricsRegistry.createCounter(anyString(), anyString(), anyString())) + .thenAnswer( + invocationOnMock -> { + String counterName = (String) invocationOnMock.getArguments()[0]; + if (counterName.contains( + RTFMetrics.ShardOperationsValue.Constants + .SHARD_INDEXING_RATE)) { + return indexingRateCounter; + } + return searchRateCounter; + }); + + when(performanceAnalyzerController.isCollectorDisabled(any(), anyString())) + .thenReturn(false); + + rtfShardOperationRateCollector = + spy( + new RTFShardOperationCollector( + performanceAnalyzerController, configOverridesWrapper)); + } + + @Test + public void testCollectMetrics() throws IOException { + createIndex(TEST_INDEX); + rtfShardOperationRateCollector.collectMetrics(startTimeInMills); + + // first time collection does not publish metrics + verify(indexingRateCounter, never()).add(anyDouble(), any()); + verify(searchRateCounter, never()).add(anyDouble(), any()); + + startTimeInMills += 5000; + rtfShardOperationRateCollector.collectMetrics(startTimeInMills); + + // 0 operation count does not publish metrics + verify(indexingRateCounter, never()).add(anyDouble(), any()); + verify(searchRateCounter, never()).add(anyDouble(), any()); + + // creating indexing and search operation + client().prepareIndex(TEST_INDEX) + .setId("1") + .setSource("{\"field\":\"value1\"}", XContentType.JSON) + .get(); + client().prepareIndex(TEST_INDEX) + .setId("2") + .setSource("{\"field\":\"value2\"}", XContentType.JSON) + .get(); + + client().admin().indices().prepareRefresh(TEST_INDEX).get(); + client().prepareSearch(TEST_INDEX).setQuery(QueryBuilders.matchAllQuery()).get(); + + startTimeInMills += 5000; + rtfShardOperationRateCollector.collectMetrics(startTimeInMills); + + verify(indexingRateCounter, atLeastOnce()).add(anyDouble(), any()); + verify(searchRateCounter, atLeastOnce()).add(anyDouble(), any()); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + } +} diff --git a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java index 16aba4bc..7d5c726c 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/listener/RTFPerformanceAnalyzerSearchListenerTests.java @@ -21,8 +21,11 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.tasks.resourcetracker.TaskResourceUsage; import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.ShardMetricsCollector; +import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics; import org.opensearch.performanceanalyzer.commons.util.Util; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.performanceanalyzer.util.Utils; import org.opensearch.search.internal.SearchContext; import org.opensearch.search.internal.ShardSearchRequest; import org.opensearch.tasks.Task; @@ -42,6 +45,9 @@ public class RTFPerformanceAnalyzerSearchListenerTests { @Mock private MetricsRegistry metricsRegistry; @Mock private Histogram cpuUtilizationHistogram; @Mock private Histogram heapUsedHistogram; + @Mock private Histogram searchLatencyHistogram; + @Mock private Histogram shardMetricsCpuHistogram; + @Mock private Histogram shardMetricsHeapHistogram; @Mock private Index index; @Mock private TaskResourceUsage taskResourceUsage; @@ -58,16 +64,31 @@ public void init() { initMocks(this); OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); Mockito.when(controller.isPerformanceAnalyzerEnabled()).thenReturn(true); + + // First set up metrics registry with most lenient matching Mockito.when( metricsRegistry.createHistogram( - Mockito.eq("cpu_utilization"), - Mockito.anyString(), - Mockito.eq("rate"))) - .thenReturn(cpuUtilizationHistogram); - Mockito.when( - metricsRegistry.createHistogram( - Mockito.eq("heap_allocated"), Mockito.anyString(), Mockito.eq("B"))) - .thenReturn(heapUsedHistogram); + Mockito.anyString(), Mockito.anyString(), Mockito.anyString())) + .thenAnswer( + invocation -> { + String name = invocation.getArgument(0); + if (name.equals(ShardMetricsCollector.SHARD_CPU_UTILIZATION)) { + return shardMetricsCpuHistogram; + } else if (name.equals(ShardMetricsCollector.SHARD_HEAP_ALLOCATED)) { + return shardMetricsHeapHistogram; + } else if (name.equals( + RTFMetrics.OSMetrics.CPU_UTILIZATION.toString())) { + return cpuUtilizationHistogram; + } else if (name.equals( + RTFMetrics.OSMetrics.HEAP_ALLOCATED.toString())) { + return heapUsedHistogram; + } else if (name.equals( + RTFMetrics.ShardOperationsValue.SHARD_SEARCH_LATENCY + .toString())) { + return searchLatencyHistogram; + } + return null; + }); searchListener = new RTFPerformanceAnalyzerSearchListener(controller); assertEquals( RTFPerformanceAnalyzerSearchListener.class.getSimpleName(), @@ -96,9 +117,13 @@ public void testQueryPhase() { initializeValidSearchContext(true); Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + Mockito.when(shardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); searchListener.preQueryPhase(searchContext); searchListener.queryPhase(searchContext, 0l); Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + Mockito.verify(searchLatencyHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); } @Test @@ -106,6 +131,9 @@ public void testQueryPhaseFailed() { initializeValidSearchContext(true); Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + Mockito.when(shardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); searchListener.preQueryPhase(searchContext); searchListener.failedQueryPhase(searchContext); Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); @@ -116,9 +144,13 @@ public void testFetchPhase() { initializeValidSearchContext(true); Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + Mockito.when(shardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); searchListener.preFetchPhase(searchContext); searchListener.fetchPhase(searchContext, 0l); Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); + Mockito.verify(searchLatencyHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); } @Test @@ -126,6 +158,9 @@ public void testFetchPhaseFailed() { initializeValidSearchContext(true); Mockito.when(controller.getCollectorsRunModeValue()) .thenReturn(Util.CollectorMode.TELEMETRY.getValue()); + Mockito.when(shardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); searchListener.preFetchPhase(searchContext); searchListener.failedFetchPhase(searchContext); Mockito.verify(task).addResourceTrackingCompletionListener(Mockito.any()); @@ -133,18 +168,20 @@ public void testFetchPhaseFailed() { @Test public void testOperationShareFactor() { - assertEquals( - Double.valueOf(10.0 / 15), - RTFPerformanceAnalyzerSearchListener.computeShareFactor(10, 15), - 0); - assertEquals( - Double.valueOf(1), - RTFPerformanceAnalyzerSearchListener.computeShareFactor(15, 10), - 0); + assertEquals(Double.valueOf(10.0 / 15), Utils.computeShareFactor(10, 15), 0); + assertEquals(Double.valueOf(1), Utils.computeShareFactor(15, 10), 0); } @Test public void testTaskCompletionListener() { + Histogram shardCpu = ShardMetricsCollector.INSTANCE.getCpuUtilizationHistogram(); + Histogram shardHeap = ShardMetricsCollector.INSTANCE.getHeapUsedHistogram(); + + if (shardCpu != null && shardHeap != null) { + // Clear any previous mock interactions + Mockito.clearInvocations(shardCpu, shardHeap); + } + ShardMetricsCollector.INSTANCE.initialize(); initializeValidSearchContext(true); RTFPerformanceAnalyzerSearchListener rtfSearchListener = new RTFPerformanceAnalyzerSearchListener(controller); @@ -158,9 +195,12 @@ public void testTaskCompletionListener() { NotifyOnceListener taskCompletionListener = rtfSearchListener.createListener(searchContext, 0l, 0l, "test", false); taskCompletionListener.onResponse(task); + Mockito.verify(cpuUtilizationHistogram) .record(Mockito.anyDouble(), Mockito.any(Tags.class)); Mockito.verify(heapUsedHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); + Mockito.verify(shardCpu).record(Mockito.anyDouble(), Mockito.any(Tags.class)); + Mockito.verify(shardHeap).record(Mockito.anyDouble(), Mockito.any(Tags.class)); } private void initializeValidSearchContext(boolean isValid) { diff --git a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java index d73a75dd..4c0c800d 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/transport/RTFPerformanceAnalyzerTransportChannelTests.java @@ -31,8 +31,11 @@ import org.opensearch.core.index.Index; import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.performanceanalyzer.OpenSearchResources; +import org.opensearch.performanceanalyzer.ShardMetricsCollector; import org.opensearch.performanceanalyzer.util.Utils; import org.opensearch.telemetry.metrics.Histogram; +import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.telemetry.metrics.tags.Tags; import org.opensearch.transport.TransportChannel; @@ -42,6 +45,11 @@ public class RTFPerformanceAnalyzerTransportChannelTests { @Mock private TransportChannel originalChannel; @Mock private TransportResponse response; @Mock private Histogram cpuUtilizationHistogram; + @Mock private Histogram indexingLatencyHistogram; + @Mock private Histogram heapUsedHistogram; + @Mock private Histogram shardMetricsCpuHistogram; + @Mock private Histogram shardMetricsHeapHistogram; + private ShardId shardId; @Mock private ShardId mockedShardId; @Mock private Index index; @@ -56,7 +64,35 @@ public void init() { String indexName = "testIndex"; shardId = new ShardId(new Index(indexName, "uuid"), 1); channel = new RTFPerformanceAnalyzerTransportChannel(); - channel.set(originalChannel, cpuUtilizationHistogram, indexName, shardId, false); + + // Setup metrics registry to return our mock histograms + MetricsRegistry metricsRegistry = Mockito.mock(MetricsRegistry.class); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq(ShardMetricsCollector.SHARD_CPU_UTILIZATION), + Mockito.anyString(), + Mockito.anyString())) + .thenReturn(shardMetricsCpuHistogram); + Mockito.when( + metricsRegistry.createHistogram( + Mockito.eq(ShardMetricsCollector.SHARD_HEAP_ALLOCATED), + Mockito.anyString(), + Mockito.anyString())) + .thenReturn(shardMetricsHeapHistogram); + + // Set the metrics registry + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); + + // Initialize ShardMetricsCollector + ShardMetricsCollector.INSTANCE.initialize(); + channel.set( + originalChannel, + cpuUtilizationHistogram, + indexingLatencyHistogram, + heapUsedHistogram, + indexName, + shardId, + false); } @Test @@ -96,24 +132,89 @@ public void testResponseWithException() throws IOException { @Test public void testRecordCPUUtilizationMetric() { + Histogram shardCpu = ShardMetricsCollector.INSTANCE.getCpuUtilizationHistogram(); + if (shardCpu != null) { + // Clear any previous mock interactions + Mockito.clearInvocations(shardCpu); + } RTFPerformanceAnalyzerTransportChannel channel = new RTFPerformanceAnalyzerTransportChannel(); - channel.set(originalChannel, cpuUtilizationHistogram, "testIndex", mockedShardId, false); + channel.set( + originalChannel, + cpuUtilizationHistogram, + indexingLatencyHistogram, + heapUsedHistogram, + "testIndex", + mockedShardId, + false); Mockito.when(mockedShardId.getIndex()).thenReturn(index); Mockito.when(index.getName()).thenReturn("myTestIndex"); Mockito.when(index.getUUID()).thenReturn("abc-def"); channel.recordCPUUtilizationMetric(mockedShardId, 10l, "bulkShard", false); Mockito.verify(cpuUtilizationHistogram) .record(Mockito.anyDouble(), Mockito.any(Tags.class)); + Mockito.verify(shardCpu).record(anyDouble(), any(Tags.class)); + } + + @Test + public void testRecordIndexingLatencyMetric() { + RTFPerformanceAnalyzerTransportChannel channel = + new RTFPerformanceAnalyzerTransportChannel(); + channel.set( + originalChannel, + cpuUtilizationHistogram, + indexingLatencyHistogram, + heapUsedHistogram, + "testIndex", + mockedShardId, + false); + Mockito.when(mockedShardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); + channel.recordIndexingLatencyMetric(mockedShardId, 123.456, "bulkShard", false); + Mockito.verify(indexingLatencyHistogram) + .record(Mockito.anyDouble(), Mockito.any(Tags.class)); } @Test + public void testRecordHeapUsedMetric() { + Histogram shardHeap = ShardMetricsCollector.INSTANCE.getHeapUsedHistogram(); + if (shardHeap != null) { + // Clear any previous mock interactions + Mockito.clearInvocations(shardHeap); + } + RTFPerformanceAnalyzerTransportChannel channel = + new RTFPerformanceAnalyzerTransportChannel(); + channel.set( + originalChannel, + cpuUtilizationHistogram, + indexingLatencyHistogram, + heapUsedHistogram, + "testIndex", + mockedShardId, + false); + Mockito.when(mockedShardId.getIndex()).thenReturn(index); + Mockito.when(index.getName()).thenReturn("myTestIndex"); + Mockito.when(index.getUUID()).thenReturn("abc-def"); + channel.recordHeapUsedMetric(mockedShardId, 10l, "bulkShard", false); + Mockito.verify(heapUsedHistogram).record(Mockito.anyDouble(), Mockito.any(Tags.class)); + // Verify the shard metrics histogram + Mockito.verify(shardHeap).record(anyDouble(), any(Tags.class)); + } + public void testRTFPAChannelDelegatesToOriginal() throws InvocationTargetException, IllegalAccessException { TransportChannel handlerSpy = spy(originalChannel); RTFPerformanceAnalyzerTransportChannel rtfChannel = new RTFPerformanceAnalyzerTransportChannel(); - rtfChannel.set(handlerSpy, cpuUtilizationHistogram, index.getName(), shardId, false); + rtfChannel.set( + handlerSpy, + cpuUtilizationHistogram, + indexingLatencyHistogram, + heapUsedHistogram, + index.getName(), + shardId, + false); List overridableMethods = Arrays.stream(TransportChannel.class.getMethods()) @@ -125,7 +226,7 @@ public void testRTFPAChannelDelegatesToOriginal() .collect(Collectors.toList()); for (Method method : overridableMethods) { - // completeStream and sendresponsebatch Methods are experimental and not + // completeStream and sendresponsebatch Methods are experimental and not // implemented in PAChannel if (Set.of("sendresponsebatch", "completestream") .contains(method.getName().toLowerCase())) {