diff --git a/CHANGELOG.md b/CHANGELOG.md index bcc2f1570c506..4d1f6033f85a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -83,6 +83,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Implement SslHandler retrieval logic for transport-reactor-netty4 plugin ([#19458](https://github.com/opensearch-project/OpenSearch/pull/19458)) - Cache serialised cluster state based on cluster state version and node version.([#19307](https://github.com/opensearch-project/OpenSearch/pull/19307)) +- Handle negative search request nodes stats ([#19340](https://github.com/opensearch-project/OpenSearch/pull/19340)) ### Dependencies - Bump `com.gradleup.shadow:shadow-gradle-plugin` from 8.3.5 to 8.3.9 ([#19400](https://github.com/opensearch-project/OpenSearch/pull/19400)) diff --git a/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java b/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java index 213ff3299fde1..9e04c4d7bc5ce 100644 --- a/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java +++ b/server/src/main/java/org/opensearch/index/search/stats/SearchStats.java @@ -32,6 +32,8 @@ package org.opensearch.index.search.stats; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.opensearch.Version; import org.opensearch.action.search.SearchPhaseName; import org.opensearch.action.search.SearchRequestStats; @@ -67,7 +69,7 @@ public class SearchStats implements Writeable, ToXContentFragment { */ @PublicApi(since = "1.0.0") public static class PhaseStatsLongHolder implements Writeable { - + private static final Logger logger = LogManager.getLogger(PhaseStatsLongHolder.class); long current; long total; long timeInMillis; @@ -86,7 +88,11 @@ public long getTimeInMillis() { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVLong(current); + if (current < 0) { + out.writeVLong(0); + } else { + out.writeVLong(current); + } out.writeVLong(total); out.writeVLong(timeInMillis); } @@ -177,7 +183,7 @@ public RequestStatsLongHolder getRequestStatsLongHolder() { return requestStatsLongHolder; } - private Stats() { + Stats() { // for internal use, initializes all counts to 0 } @@ -539,6 +545,15 @@ public void writeTo(StreamOutput out) throws IOException { if (requestStatsLongHolder == null) { requestStatsLongHolder = new RequestStatsLongHolder(); } + requestStatsLongHolder.requestStatsHolder.forEach((phaseName, phaseStats) -> { + if (phaseStats.current < 0) { + PhaseStatsLongHolder.logger.warn( + "SearchRequestStats 'current' is negative for phase '{}': {}", + phaseName, + phaseStats.current + ); + } + }); out.writeMap( requestStatsLongHolder.getRequestStatsHolder(), StreamOutput::writeString, diff --git a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java index fb6d95822798a..ecae6081c9d47 100644 --- a/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java +++ b/server/src/test/java/org/opensearch/index/search/stats/SearchStatsTests.java @@ -37,10 +37,12 @@ import org.opensearch.action.search.SearchPhaseName; import org.opensearch.action.search.SearchRequestOperationsListenerSupport; import org.opensearch.action.search.SearchRequestStats; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.index.search.stats.SearchStats.Stats; import org.opensearch.test.OpenSearchTestCase; +import org.junit.Assert; import java.util.HashMap; import java.util.Map; @@ -159,4 +161,45 @@ private static void assertStats(Stats stats, long equalTo) { // avg_concurrency is not summed up across stats assertEquals(1, stats.getConcurrentAvgSliceCount(), 0); } + + public void testNegativeRequestStats() throws Exception { + SearchStats searchStats = new SearchStats(new Stats(), 0, new HashMap<>()); + + long paramValue = randomIntBetween(2, 50); + + // Testing for request stats + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + SearchRequestStats testRequestStats = new SearchRequestStats(clusterSettings); + SearchPhaseContext ctx = mock(SearchPhaseContext.class); + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + SearchPhase mockSearchPhase = mock(SearchPhase.class); + when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase); + when(mockSearchPhase.getStartTimeInNanos()).thenReturn(System.nanoTime() - TimeUnit.SECONDS.toNanos(paramValue)); + when(mockSearchPhase.getSearchPhaseNameOptional()).thenReturn(Optional.ofNullable(searchPhaseName)); + for (int iterator = 0; iterator < paramValue; iterator++) { + onPhaseStart(testRequestStats, ctx); + onPhaseEnd(testRequestStats, ctx); + onPhaseEnd(testRequestStats, ctx); // call onPhaseEnd() twice to make 'current' negative + } + } + searchStats.setSearchRequestStats(testRequestStats); + for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) { + Assert.assertNotNull(searchStats.getTotal().getRequestStatsLongHolder()); + assertEquals( + -1 * paramValue, // current is negative, equals -1 * paramValue (num loop iterations) + searchStats.getTotal().getRequestStatsLongHolder().getRequestStatsHolder().get(searchPhaseName.getName()).current + ); + assertEquals( + 2 * paramValue, + searchStats.getTotal().getRequestStatsLongHolder().getRequestStatsHolder().get(searchPhaseName.getName()).total + ); + assertThat( + searchStats.getTotal().getRequestStatsLongHolder().getRequestStatsHolder().get(searchPhaseName.getName()).timeInMillis, + greaterThanOrEqualTo(paramValue) + ); + } + + // Ensure writeTo() does not throw error with negative 'current' + searchStats.writeTo(new BytesStreamOutput(10)); + } }