Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement SslHandler retrieval logic for transport-reactor-netty4 plugin ([#19458](https://github.com/opensearch-project/OpenSearch/pull/19458))
- Cache serialised cluster state based on cluster state version and node version.([#19307](https://github.com/opensearch-project/OpenSearch/pull/19307))

- Handle negative search request nodes stats ([#19340](https://github.com/opensearch-project/OpenSearch/pull/19340))

### Dependencies
- Bump `com.gradleup.shadow:shadow-gradle-plugin` from 8.3.5 to 8.3.9 ([#19400](https://github.com/opensearch-project/OpenSearch/pull/19400))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.index.search.stats;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
import org.opensearch.action.search.SearchPhaseName;
import org.opensearch.action.search.SearchRequestStats;
Expand Down Expand Up @@ -67,7 +69,7 @@ public class SearchStats implements Writeable, ToXContentFragment {
*/
@PublicApi(since = "1.0.0")
public static class PhaseStatsLongHolder implements Writeable {

private static final Logger logger = LogManager.getLogger(PhaseStatsLongHolder.class);
long current;
long total;
long timeInMillis;
Expand All @@ -86,7 +88,11 @@ public long getTimeInMillis() {

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(current);
if (current < 0) {
out.writeVLong(0);
} else {
out.writeVLong(current);
}
out.writeVLong(total);
out.writeVLong(timeInMillis);
}
Expand Down Expand Up @@ -177,7 +183,7 @@ public RequestStatsLongHolder getRequestStatsLongHolder() {
return requestStatsLongHolder;
}

private Stats() {
Stats() {
// for internal use, initializes all counts to 0
}

Expand Down Expand Up @@ -539,6 +545,15 @@ public void writeTo(StreamOutput out) throws IOException {
if (requestStatsLongHolder == null) {
requestStatsLongHolder = new RequestStatsLongHolder();
}
requestStatsLongHolder.requestStatsHolder.forEach((phaseName, phaseStats) -> {
if (phaseStats.current < 0) {
PhaseStatsLongHolder.logger.warn(
"SearchRequestStats 'current' is negative for phase '{}': {}",
phaseName,
phaseStats.current
);
}
});
out.writeMap(
requestStatsLongHolder.getRequestStatsHolder(),
StreamOutput::writeString,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
import org.opensearch.action.search.SearchPhaseName;
import org.opensearch.action.search.SearchRequestOperationsListenerSupport;
import org.opensearch.action.search.SearchRequestStats;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.search.stats.SearchStats.Stats;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Assert;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -159,4 +161,45 @@ private static void assertStats(Stats stats, long equalTo) {
// avg_concurrency is not summed up across stats
assertEquals(1, stats.getConcurrentAvgSliceCount(), 0);
}

public void testNegativeRequestStats() throws Exception {
SearchStats searchStats = new SearchStats(new Stats(), 0, new HashMap<>());

long paramValue = randomIntBetween(2, 50);

// Testing for request stats
ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
SearchRequestStats testRequestStats = new SearchRequestStats(clusterSettings);
SearchPhaseContext ctx = mock(SearchPhaseContext.class);
for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
SearchPhase mockSearchPhase = mock(SearchPhase.class);
when(ctx.getCurrentPhase()).thenReturn(mockSearchPhase);
when(mockSearchPhase.getStartTimeInNanos()).thenReturn(System.nanoTime() - TimeUnit.SECONDS.toNanos(paramValue));
when(mockSearchPhase.getSearchPhaseNameOptional()).thenReturn(Optional.ofNullable(searchPhaseName));
for (int iterator = 0; iterator < paramValue; iterator++) {
onPhaseStart(testRequestStats, ctx);
onPhaseEnd(testRequestStats, ctx);
onPhaseEnd(testRequestStats, ctx); // call onPhaseEnd() twice to make 'current' negative
}
}
searchStats.setSearchRequestStats(testRequestStats);
for (SearchPhaseName searchPhaseName : SearchPhaseName.values()) {
Assert.assertNotNull(searchStats.getTotal().getRequestStatsLongHolder());
assertEquals(
-1 * paramValue, // current is negative, equals -1 * paramValue (num loop iterations)
searchStats.getTotal().getRequestStatsLongHolder().getRequestStatsHolder().get(searchPhaseName.getName()).current
);
assertEquals(
2 * paramValue,
searchStats.getTotal().getRequestStatsLongHolder().getRequestStatsHolder().get(searchPhaseName.getName()).total
);
assertThat(
searchStats.getTotal().getRequestStatsLongHolder().getRequestStatsHolder().get(searchPhaseName.getName()).timeInMillis,
greaterThanOrEqualTo(paramValue)
);
}

// Ensure writeTo() does not throw error with negative 'current'
searchStats.writeTo(new BytesStreamOutput(10));
}
}
Loading