Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFDisksCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFHeapMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFNodeStatsAllShardsMetricsCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFShardOperationCollector;
import org.opensearch.performanceanalyzer.collectors.telemetry.RTFThreadPoolMetricsCollector;
import org.opensearch.performanceanalyzer.commons.OSMetricsGeneratorFactory;
import org.opensearch.performanceanalyzer.commons.collectors.DisksCollector;
Expand Down Expand Up @@ -239,6 +240,9 @@ private void scheduleTelemetryCollectors() {
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFCacheConfigMetricsCollector(
performanceAnalyzerController, configOverridesWrapper));
scheduledMetricCollectorsExecutor.addScheduledMetricCollector(
new RTFShardOperationCollector(
performanceAnalyzerController, configOverridesWrapper));
}

private void scheduleRcaCollectors() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -57,9 +58,7 @@ public class NodeStatsAllShardsMetricsCollector extends PerformanceAnalyzerMetri
private static final int KEYS_PATH_LENGTH = 2;
private static final Logger LOG =
LogManager.getLogger(NodeStatsAllShardsMetricsCollector.class);
private HashMap<ShardId, IndexShard> currentShards;
private HashMap<ShardId, ShardStats> currentPerShardStats;
private HashMap<ShardId, ShardStats> prevPerShardStats;
private Map<ShardId, ShardStats> prevPerShardStats;
private final PerformanceAnalyzerController controller;

public NodeStatsAllShardsMetricsCollector(final PerformanceAnalyzerController controller) {
Expand All @@ -68,21 +67,10 @@ public NodeStatsAllShardsMetricsCollector(final PerformanceAnalyzerController co
"NodeStatsMetrics",
NODE_STATS_ALL_SHARDS_METRICS_COLLECTOR_EXECUTION_TIME,
NODESTATS_COLLECTION_ERROR);
currentShards = new HashMap<>();
prevPerShardStats = new HashMap<>();
currentPerShardStats = new HashMap<>();
this.controller = controller;
}

private void populateCurrentShards() {
if (!currentShards.isEmpty()) {
prevPerShardStats.putAll(currentPerShardStats);
currentPerShardStats.clear();
}
currentShards.clear();
currentShards = Utils.getShards();
}

private static final Map<String, ValueCalculator> maps =
new HashMap<String, ValueCalculator>() {
{
Expand Down Expand Up @@ -152,13 +140,13 @@ public void collectMetrics(long startTime) {
if (indicesService == null) {
return;
}
populateCurrentShards();
populatePerShardStats(indicesService);

for (HashMap.Entry currentShard : currentPerShardStats.entrySet()) {
ShardId shardId = (ShardId) currentShard.getKey();
ShardStats currentShardStats = (ShardStats) currentShard.getValue();
if (prevPerShardStats.size() == 0) {
Map<ShardId, ShardStats> currentPerShardStats = populatePerShardStats(indicesService);

for (HashMap.Entry<ShardId, ShardStats> currentShard : currentPerShardStats.entrySet()) {
ShardId shardId = currentShard.getKey();
ShardStats currentShardStats = currentShard.getValue();
if (prevPerShardStats.isEmpty() || !prevPerShardStats.containsKey(shardId)) {
// Populating value for the first run.
populateMetricValue(
currentShardStats, startTime, shardId.getIndexName(), shardId.id());
Expand All @@ -179,6 +167,7 @@ public void collectMetrics(long startTime) {
populateDiffMetricValue(
prevValue, currValue, startTime, shardId.getIndexName(), shardId.id());
}
prevPerShardStats = currentPerShardStats;
}

// - Separated to have a unit test; and catch any code changes around this field
Expand All @@ -188,10 +177,12 @@ Field getNodeIndicesStatsByShardField() throws Exception {
return field;
}

public void populatePerShardStats(IndicesService indicesService) {
public Map<ShardId, ShardStats> populatePerShardStats(IndicesService indicesService) {
// Populate the shard stats per shard.
for (HashMap.Entry currentShard : currentShards.entrySet()) {
IndexShard currentIndexShard = (IndexShard) currentShard.getValue();
HashMap<ShardId, IndexShard> currentShards = Utils.getShards();
Map<ShardId, ShardStats> currentPerShardStats = new HashMap<>(Collections.emptyMap());
for (HashMap.Entry<ShardId, IndexShard> currentShard : currentShards.entrySet()) {
IndexShard currentIndexShard = currentShard.getValue();
IndexShardStats currentIndexShardStats =
Utils.indexShardStats(
indicesService,
Expand All @@ -200,20 +191,24 @@ public void populatePerShardStats(IndicesService indicesService) {
CommonStatsFlags.Flag.QueryCache,
CommonStatsFlags.Flag.FieldData,
CommonStatsFlags.Flag.RequestCache));
for (ShardStats shardStats : currentIndexShardStats.getShards()) {
currentPerShardStats.put(currentIndexShardStats.getShardId(), shardStats);
if (currentIndexShardStats != null) {
for (ShardStats shardStats : currentIndexShardStats.getShards()) {
currentPerShardStats.put(currentIndexShardStats.getShardId(), shardStats);
}
}
}
return currentPerShardStats;
}

public void populateMetricValue(
ShardStats shardStats, long startTime, String IndexName, int ShardId) {
StringBuilder value = new StringBuilder();
value.append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds());
// Populate the result with cache specific metrics only.
value.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor)
.append(new NodeStatsMetricsAllShardsPerCollectionStatus(shardStats).serialize());
saveMetricValues(value.toString(), startTime, IndexName, String.valueOf(ShardId));
String value =
PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds()
+
// Populate the result with cache specific metrics only.
PerformanceAnalyzerMetrics.sMetricNewLineDelimitor
+ new NodeStatsMetricsAllShardsPerCollectionStatus(shardStats).serialize();
saveMetricValues(value, startTime, IndexName, String.valueOf(ShardId));
}

public void populateDiffMetricValue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -42,8 +43,6 @@ public class RTFNodeStatsAllShardsMetricsCollector extends PerformanceAnalyzerMe
.samplingInterval;
private static final Logger LOG =
LogManager.getLogger(RTFNodeStatsAllShardsMetricsCollector.class);
private Map<ShardId, IndexShard> currentShards;
private Map<ShardId, ShardStats> currentPerShardStats;
private Map<ShardId, ShardStats> prevPerShardStats;
private MetricsRegistry metricsRegistry;
private Counter cacheQueryHitMetrics;
Expand All @@ -67,23 +66,12 @@ public RTFNodeStatsAllShardsMetricsCollector(
"RTFNodeStatsMetricsCollector",
RTF_NODE_STATS_ALL_SHARDS_METRICS_COLLECTOR_EXECUTION_TIME,
RTF_NODESTATS_COLLECTION_ERROR);
currentShards = new HashMap<>();
prevPerShardStats = new HashMap<>();
currentPerShardStats = new HashMap<>();
this.metricsInitialised = false;
this.performanceAnalyzerController = performanceAnalyzerController;
this.configOverridesWrapper = configOverridesWrapper;
}

private void populateCurrentShards() {
if (!currentShards.isEmpty()) {
prevPerShardStats.putAll(currentPerShardStats);
currentPerShardStats.clear();
}
currentShards.clear();
currentShards = Utils.getShards();
}

private static final ImmutableMap<String, ValueCalculator> valueCalculators =
ImmutableMap.of(
RTFMetrics.ShardStatsValue.INDEXING_THROTTLE_TIME.toString(),
Expand Down Expand Up @@ -117,7 +105,7 @@ private void populateCurrentShards() {
public void collectMetrics(long startTime) {
if (performanceAnalyzerController.isCollectorDisabled(
configOverridesWrapper, getCollectorName())) {
LOG.info("RTFDisksCollector is disabled. Skipping collection.");
LOG.info("RTFNodeStatsMetricsCollector is disabled. Skipping collection.");
return;
}
IndicesService indicesService = OpenSearchResources.INSTANCE.getIndicesService();
Expand All @@ -133,38 +121,30 @@ configOverridesWrapper, getCollectorName())) {

LOG.debug("Executing collect metrics for RTFNodeStatsAllShardsMetricsCollector");
initialiseMetricsIfNeeded();
populateCurrentShards();
populatePerShardStats(indicesService);

for (Map.Entry currentShard : currentPerShardStats.entrySet()) {
ShardId shardId = (ShardId) currentShard.getKey();
ShardStats currentShardStats = (ShardStats) currentShard.getValue();
if (prevPerShardStats.size() == 0) {
// Populating value for the first run.
Map<ShardId, ShardStats> currentPerShardStats = populatePerShardStats(indicesService);

for (Map.Entry<ShardId, ShardStats> currentShard : currentPerShardStats.entrySet()) {
ShardId shardId = currentShard.getKey();
ShardStats currentShardStats = currentShard.getValue();
if (prevPerShardStats.isEmpty() || prevPerShardStats.get(shardId) == null) {
// Populating value for the first run of shard.
recordMetrics(
new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats),
shardId);
continue;
}
ShardStats prevShardStats = prevPerShardStats.get(shardId);
if (prevShardStats == null) {
// Populate value for shards which are new and were not present in the previous
// run.
recordMetrics(
new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats),
shardId);
continue;
}
NodeStatsMetricsAllShardsPerCollectionStatus prevValue =
new NodeStatsMetricsAllShardsPerCollectionStatus(prevShardStats);
NodeStatsMetricsAllShardsPerCollectionStatus currValue =
new NodeStatsMetricsAllShardsPerCollectionStatus(currentShardStats);
populateDiffMetricValue(prevValue, currValue, shardId);
}
prevPerShardStats = currentPerShardStats;
}

private void initialiseMetricsIfNeeded() {
if (metricsInitialised == false) {
if (!metricsInitialised) {
cacheQueryHitMetrics =
metricsRegistry.createCounter(
RTFMetrics.ShardStatsValue.Constants.QUEY_CACHE_HIT_COUNT_VALUE,
Expand Down Expand Up @@ -222,10 +202,12 @@ private void initialiseMetricsIfNeeded() {
}
}

public void populatePerShardStats(IndicesService indicesService) {
public Map<ShardId, ShardStats> populatePerShardStats(IndicesService indicesService) {
// Populate the shard stats per shard.
for (Map.Entry currentShard : currentShards.entrySet()) {
IndexShard currentIndexShard = (IndexShard) currentShard.getValue();
Map<ShardId, IndexShard> currentShards = Utils.getShards();
Map<ShardId, ShardStats> currentPerShardStats = new HashMap<>(Collections.emptyMap());
for (Map.Entry<ShardId, IndexShard> currentShard : currentShards.entrySet()) {
IndexShard currentIndexShard = currentShard.getValue();
IndexShardStats currentIndexShardStats =
Utils.indexShardStats(
indicesService,
Expand All @@ -234,10 +216,13 @@ public void populatePerShardStats(IndicesService indicesService) {
CommonStatsFlags.Flag.QueryCache,
CommonStatsFlags.Flag.FieldData,
CommonStatsFlags.Flag.RequestCache));
for (ShardStats shardStats : currentIndexShardStats.getShards()) {
currentPerShardStats.put(currentIndexShardStats.getShardId(), shardStats);
if (currentIndexShardStats != null) {
for (ShardStats shardStats : currentIndexShardStats.getShards()) {
currentPerShardStats.put(currentIndexShardStats.getShardId(), shardStats);
}
}
}
return currentPerShardStats;
}

private void recordMetrics(
Expand Down
Loading
Loading