Skip to content

Commit 34b5eae

Browse files
bowenlan-amznrishabhmaurya
authored andcommitted
Add stream search feature flag and auto fallback logic (opensearch-project#19373)
* Use feature flag instead of query param Signed-off-by: bowenlan-amzn <[email protected]> * add tests Signed-off-by: bowenlan-amzn <[email protected]> * add changelog Signed-off-by: bowenlan-amzn <[email protected]> * Disable stream search for normal search Signed-off-by: bowenlan-amzn <[email protected]> * Address comment Signed-off-by: bowenlan-amzn <[email protected]> --------- Signed-off-by: bowenlan-amzn <[email protected]> Signed-off-by: Rishabh Maurya <[email protected]> Co-authored-by: Rishabh Maurya <[email protected]>
1 parent 7e63038 commit 34b5eae

File tree

5 files changed

+185
-7
lines changed

5 files changed

+185
-7
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
3838
- Implement GRPC Script query ([#19455](https://github.com/opensearch-project/OpenSearch/pull/19455))
3939
- [Search Stats] Add search & star-tree search query failure count metrics ([#19210](https://github.com/opensearch-project/OpenSearch/issues/19210))
4040
- [Star-tree] Support for multi-terms aggregation ([#18398](https://github.com/opensearch-project/OpenSearch/issues/18398))
41+
- Add stream search feature flag and auto fallback logic ([#19373](https://github.com/opensearch-project/OpenSearch/pull/19373))
4142

4243
### Changed
4344
- Refactor `if-else` chains to use `Java 17 pattern matching switch expressions`(([#18965](https://github.com/opensearch-project/OpenSearch/pull/18965))

server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ protected FeatureFlagSettings(
3939
FeatureFlags.TERM_VERSION_PRECOMMIT_ENABLE_SETTING,
4040
FeatureFlags.ARROW_STREAMS_SETTING,
4141
FeatureFlags.STREAM_TRANSPORT_SETTING,
42-
FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING
42+
FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING,
43+
FeatureFlags.STREAM_SEARCH_SETTING
4344
);
4445
}

server/src/main/java/org/opensearch/common/util/FeatureFlags.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,9 @@ public class FeatureFlags {
120120
public static final String ARROW_STREAMS = FEATURE_FLAG_PREFIX + "arrow.streams.enabled";
121121
public static final Setting<Boolean> ARROW_STREAMS_SETTING = Setting.boolSetting(ARROW_STREAMS, false, Property.NodeScope);
122122

123+
public static final String STREAM_SEARCH = FEATURE_FLAG_PREFIX + "stream.search.enabled";
124+
public static final Setting<Boolean> STREAM_SEARCH_SETTING = Setting.boolSetting(STREAM_SEARCH, false, Property.NodeScope);
125+
123126
/**
124127
* Underlying implementation for feature flags.
125128
* All settable feature flags are tracked here in FeatureFlagsImpl.featureFlags.
@@ -145,6 +148,7 @@ static class FeatureFlagsImpl {
145148
put(TERM_VERSION_PRECOMMIT_ENABLE_SETTING, TERM_VERSION_PRECOMMIT_ENABLE_SETTING.getDefault(Settings.EMPTY));
146149
put(ARROW_STREAMS_SETTING, ARROW_STREAMS_SETTING.getDefault(Settings.EMPTY));
147150
put(STREAM_TRANSPORT_SETTING, STREAM_TRANSPORT_SETTING.getDefault(Settings.EMPTY));
151+
put(STREAM_SEARCH_SETTING, STREAM_SEARCH_SETTING.getDefault(Settings.EMPTY));
148152
put(MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING, MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING.getDefault(Settings.EMPTY));
149153
}
150154
};

server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232

3333
package org.opensearch.rest.action.search;
3434

35+
import org.apache.logging.log4j.LogManager;
36+
import org.apache.logging.log4j.Logger;
3537
import org.opensearch.ExceptionsHelper;
3638
import org.opensearch.action.ActionRequestValidationException;
3739
import org.opensearch.action.search.SearchAction;
@@ -52,6 +54,8 @@
5254
import org.opensearch.rest.action.RestStatusToXContentListener;
5355
import org.opensearch.search.Scroll;
5456
import org.opensearch.search.SearchService;
57+
import org.opensearch.search.aggregations.AggregatorFactories;
58+
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
5559
import org.opensearch.search.builder.SearchSourceBuilder;
5660
import org.opensearch.search.fetch.StoredFieldsContext;
5761
import org.opensearch.search.fetch.subphase.FetchSourceContext;
@@ -83,6 +87,7 @@
8387
* @opensearch.api
8488
*/
8589
public class RestSearchAction extends BaseRestHandler {
90+
private static final Logger logger = LogManager.getLogger(RestSearchAction.class);
8691
/**
8792
* Indicates whether hits.total should be rendered as an integer or an object
8893
* in the rest search response.
@@ -136,13 +141,16 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
136141
parser -> parseSearchRequest(searchRequest, request, parser, client.getNamedWriteableRegistry(), setSize)
137142
);
138143

139-
boolean stream = request.paramAsBoolean("stream", false);
140-
if (stream) {
144+
if (FeatureFlags.isEnabled(FeatureFlags.STREAM_SEARCH)) {
141145
if (FeatureFlags.isEnabled(FeatureFlags.STREAM_TRANSPORT)) {
142-
return channel -> {
143-
RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());
144-
cancelClient.execute(StreamSearchAction.INSTANCE, searchRequest, new RestStatusToXContentListener<>(channel));
145-
};
146+
if (canUseStreamSearch(searchRequest)) {
147+
return channel -> {
148+
RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());
149+
cancelClient.execute(StreamSearchAction.INSTANCE, searchRequest, new RestStatusToXContentListener<>(channel));
150+
};
151+
} else {
152+
logger.debug("Stream search requested but search contains unsupported aggregations. Falling back to normal search.");
153+
}
146154
} else {
147155
throw new IllegalArgumentException("You need to enable stream transport first to use stream search.");
148156
}
@@ -435,4 +443,24 @@ protected Set<String> responseParams() {
435443
public boolean allowsUnsafeBuffers() {
436444
return true;
437445
}
446+
447+
/**
448+
* Determines if a search request can use stream search.
449+
*
450+
* @param searchRequest the search request to validate
451+
* @return true if the request can use stream search, false otherwise
452+
*/
453+
static boolean canUseStreamSearch(SearchRequest searchRequest) {
454+
if (searchRequest.source() == null || searchRequest.source().aggregations() == null) {
455+
return false; // No aggregations, stream search is not allowed
456+
}
457+
458+
AggregatorFactories.Builder aggregations = searchRequest.source().aggregations();
459+
if (aggregations.count() != 1) {
460+
return false; // Must have exactly one aggregation
461+
}
462+
463+
// Check if the single aggregation is a terms aggregation
464+
return aggregations.getAggregatorFactories().stream().anyMatch(factory -> factory instanceof TermsAggregationBuilder);
465+
}
438466
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*/
8+
9+
package org.opensearch.rest.action.search;
10+
11+
import org.opensearch.action.ActionRequest;
12+
import org.opensearch.action.ActionType;
13+
import org.opensearch.action.search.SearchAction;
14+
import org.opensearch.action.search.SearchRequest;
15+
import org.opensearch.common.SetOnce;
16+
import org.opensearch.common.util.FeatureFlags;
17+
import org.opensearch.core.action.ActionListener;
18+
import org.opensearch.core.action.ActionResponse;
19+
import org.opensearch.index.query.QueryBuilders;
20+
import org.opensearch.rest.RestRequest;
21+
import org.opensearch.search.aggregations.AggregationBuilders;
22+
import org.opensearch.search.builder.SearchSourceBuilder;
23+
import org.opensearch.tasks.Task;
24+
import org.opensearch.test.OpenSearchTestCase;
25+
import org.opensearch.test.client.NoOpNodeClient;
26+
import org.opensearch.test.rest.FakeRestChannel;
27+
import org.opensearch.test.rest.FakeRestRequest;
28+
import org.opensearch.transport.client.node.NodeClient;
29+
30+
import static org.opensearch.common.util.FeatureFlags.STREAM_SEARCH;
31+
import static org.opensearch.common.util.FeatureFlags.STREAM_TRANSPORT;
32+
import static org.hamcrest.Matchers.equalTo;
33+
34+
public class RestSearchActionTests extends OpenSearchTestCase {
35+
36+
private NodeClient createMockNodeClient(SetOnce<ActionType<?>> capturedActionType) {
37+
return new NoOpNodeClient(this.getTestName()) {
38+
@Override
39+
public <Request extends ActionRequest, Response extends ActionResponse> Task executeLocally(
40+
ActionType<Response> action,
41+
Request request,
42+
ActionListener<Response> listener
43+
) {
44+
capturedActionType.set(action);
45+
listener.onResponse(null);
46+
return new Task(1L, "test", action.name(), "test task", null, null);
47+
}
48+
49+
@Override
50+
public String getLocalNodeId() {
51+
return "test-node";
52+
}
53+
};
54+
}
55+
56+
private void testActionExecution(ActionType<?> expectedAction) throws Exception {
57+
SetOnce<ActionType<?>> capturedActionType = new SetOnce<>();
58+
try (NodeClient nodeClient = createMockNodeClient(capturedActionType)) {
59+
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).build();
60+
FakeRestChannel channel = new FakeRestChannel(request, false, 0);
61+
62+
new RestSearchAction().handleRequest(request, channel, nodeClient);
63+
64+
assertThat(capturedActionType.get(), equalTo(expectedAction));
65+
}
66+
}
67+
68+
public void testWithSearchStreamFlagDisabled() throws Exception {
69+
// When SEARCH_STREAM flag is disabled, always use SearchAction
70+
testActionExecution(SearchAction.INSTANCE);
71+
}
72+
73+
@LockFeatureFlag(STREAM_SEARCH)
74+
public void testWithStreamSearchEnabledButStreamTransportDisabled() throws Exception {
75+
// When SEARCH_STREAM is enabled but STREAM_TRANSPORT is disabled, should throw exception
76+
try (NodeClient nodeClient = new NoOpNodeClient(this.getTestName())) {
77+
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).build();
78+
FakeRestChannel channel = new FakeRestChannel(request, false, 0);
79+
80+
Exception e = expectThrows(
81+
IllegalArgumentException.class,
82+
() -> new RestSearchAction().handleRequest(request, channel, nodeClient)
83+
);
84+
assertThat(e.getMessage(), equalTo("You need to enable stream transport first to use stream search."));
85+
}
86+
}
87+
88+
public void testWithStreamSearchAndTransportEnabled() throws Exception {
89+
// When both SEARCH_STREAM and STREAM_TRANSPORT are enabled, should use StreamSearchAction
90+
try (
91+
FeatureFlags.TestUtils.FlagWriteLock searchStreamLock = new FeatureFlags.TestUtils.FlagWriteLock(STREAM_SEARCH);
92+
FeatureFlags.TestUtils.FlagWriteLock streamTransportLock = new FeatureFlags.TestUtils.FlagWriteLock(STREAM_TRANSPORT)
93+
) {
94+
testActionExecution(SearchAction.INSTANCE);
95+
}
96+
}
97+
98+
// Tests for canUseStreamSearch method
99+
public void testCanUseStreamSearchWithNullSource() {
100+
SearchRequest searchRequest = new SearchRequest();
101+
assertFalse(RestSearchAction.canUseStreamSearch(searchRequest));
102+
}
103+
104+
public void testCanUseStreamSearchWithNoAggregations() {
105+
SearchRequest searchRequest = new SearchRequest();
106+
SearchSourceBuilder source = new SearchSourceBuilder();
107+
source.query(QueryBuilders.matchAllQuery());
108+
searchRequest.source(source);
109+
assertFalse(RestSearchAction.canUseStreamSearch(searchRequest));
110+
}
111+
112+
public void testCanUseStreamSearchWithSingleTermsAggregation() {
113+
SearchRequest searchRequest = new SearchRequest();
114+
SearchSourceBuilder source = new SearchSourceBuilder();
115+
source.aggregation(AggregationBuilders.terms("test_terms").field("category"));
116+
searchRequest.source(source);
117+
assertTrue(RestSearchAction.canUseStreamSearch(searchRequest));
118+
}
119+
120+
public void testCanUseStreamSearchWithMultipleAggregations() {
121+
SearchRequest searchRequest = new SearchRequest();
122+
SearchSourceBuilder source = new SearchSourceBuilder();
123+
source.aggregation(AggregationBuilders.terms("test_terms").field("category"));
124+
source.aggregation(AggregationBuilders.avg("test_avg").field("price"));
125+
searchRequest.source(source);
126+
assertFalse(RestSearchAction.canUseStreamSearch(searchRequest));
127+
}
128+
129+
public void testCanUseStreamSearchWithSingleNonTermsAggregation() {
130+
SearchRequest searchRequest = new SearchRequest();
131+
SearchSourceBuilder source = new SearchSourceBuilder();
132+
source.aggregation(AggregationBuilders.avg("test_avg").field("price"));
133+
searchRequest.source(source);
134+
assertFalse(RestSearchAction.canUseStreamSearch(searchRequest));
135+
}
136+
137+
public void testCanUseStreamSearchWithSingleHistogramAggregation() {
138+
SearchRequest searchRequest = new SearchRequest();
139+
SearchSourceBuilder source = new SearchSourceBuilder();
140+
source.aggregation(AggregationBuilders.histogram("test_histogram").field("timestamp").interval(1000));
141+
searchRequest.source(source);
142+
assertFalse(RestSearchAction.canUseStreamSearch(searchRequest));
143+
}
144+
}

0 commit comments

Comments
 (0)