Skip to content

Commit 86ada27

Browse files
Change stream search feature flag to cluster setting (#19506)
* Change stream search feature flag to cluster setting Signed-off-by: bowenlan-amzn <[email protected]> * Update tests Signed-off-by: bowenlan-amzn <[email protected]> * Update changelog Signed-off-by: bowenlan-amzn <[email protected]> --------- Signed-off-by: bowenlan-amzn <[email protected]>
1 parent 983c4d7 commit 86ada27

File tree

9 files changed

+73
-32
lines changed

9 files changed

+73
-32
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4141
- Implement GRPC Script query ([#19455](https://github.com/opensearch-project/OpenSearch/pull/19455))
4242
- [Search Stats] Add search & star-tree search query failure count metrics ([#19210](https://github.com/opensearch-project/OpenSearch/issues/19210))
4343
- [Star-tree] Support for multi-terms aggregation ([#18398](https://github.com/opensearch-project/OpenSearch/issues/18398))
44-
- Add stream search feature flag and auto fallback logic ([#19373](https://github.com/opensearch-project/OpenSearch/pull/19373))
44+
- Add stream search enabled cluster setting and auto fallback logic ([#19506](https://github.com/opensearch-project/OpenSearch/pull/19506))
4545
- Implement GRPC Exists, Regexp, and Wildcard queries ([#19392](https://github.com/opensearch-project/OpenSearch/pull/19392))
4646
- Implement GRPC GeoBoundingBox, GeoDistance queries ([#19451](https://github.com/opensearch-project/OpenSearch/pull/19451))
4747
- Implement GRPC Ids, Range, and Terms Set queries ([#19448](https://github.com/opensearch-project/OpenSearch/pull/19448))

modules/opensearch-dashboards/src/main/java/org/opensearch/dashboards/OpenSearchDashboardsModulePlugin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public List<RestHandler> getRestHandlers(
126126
// apis needed to access saved objects
127127
new OpenSearchDashboardsWrappedRestHandler(new RestGetAction()),
128128
new OpenSearchDashboardsWrappedRestHandler(new RestMultiGetAction(settings)),
129-
new OpenSearchDashboardsWrappedRestHandler(new RestSearchAction()),
129+
new OpenSearchDashboardsWrappedRestHandler(new RestSearchAction(clusterSettings)),
130130
new OpenSearchDashboardsWrappedRestHandler(new RestBulkAction(settings)),
131131
new OpenSearchDashboardsWrappedRestHandler(new RestBulkStreamingAction(settings)),
132132
new OpenSearchDashboardsWrappedRestHandler(new RestDeleteAction()),

server/src/main/java/org/opensearch/action/ActionModule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -945,7 +945,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
945945
registerHandler.accept(new RestBulkStreamingAction(settings));
946946
registerHandler.accept(new RestUpdateAction());
947947

948-
registerHandler.accept(new RestSearchAction());
948+
registerHandler.accept(new RestSearchAction(clusterSettings));
949949
registerHandler.accept(new RestSearchScrollAction());
950950
registerHandler.accept(new RestClearScrollAction());
951951
registerHandler.accept(new RestMultiSearchAction(settings));

server/src/main/java/org/opensearch/action/search/StreamSearchTransportService.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.logging.log4j.Logger;
1313
import org.opensearch.action.OriginalIndices;
1414
import org.opensearch.action.support.StreamSearchChannelListener;
15+
import org.opensearch.common.settings.Setting;
1516
import org.opensearch.core.action.ActionListener;
1617
import org.opensearch.core.common.io.stream.StreamInput;
1718
import org.opensearch.core.common.io.stream.Writeable;
@@ -54,6 +55,13 @@ public StreamSearchTransportService(
5455
this.transportService = transportService;
5556
}
5657

58+
public static final Setting<Boolean> STREAM_SEARCH_ENABLED = Setting.boolSetting(
59+
"stream.search.enabled",
60+
false,
61+
Setting.Property.Dynamic,
62+
Setting.Property.NodeScope
63+
);
64+
5765
public static void registerStreamRequestHandler(StreamTransportService transportService, SearchService searchService) {
5866
transportService.registerRequestHandler(
5967
QUERY_ACTION_NAME,

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.opensearch.action.search.CreatePitController;
3838
import org.opensearch.action.search.SearchRequestSlowLog;
3939
import org.opensearch.action.search.SearchRequestStats;
40+
import org.opensearch.action.search.StreamSearchTransportService;
4041
import org.opensearch.action.search.TransportSearchAction;
4142
import org.opensearch.action.support.AutoCreateIndex;
4243
import org.opensearch.action.support.DestructiveOperations;
@@ -857,7 +858,8 @@ public void apply(Settings value, Settings current, Settings previous) {
857858
ForceMergeManagerSettings.DISK_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE,
858859
ForceMergeManagerSettings.JVM_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE,
859860
ForceMergeManagerSettings.CONCURRENCY_MULTIPLIER,
860-
StreamTransportService.STREAM_TRANSPORT_REQ_TIMEOUT_SETTING
861+
StreamTransportService.STREAM_TRANSPORT_REQ_TIMEOUT_SETTING,
862+
StreamSearchTransportService.STREAM_SEARCH_ENABLED
861863
)
862864
)
863865
);

server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ protected FeatureFlagSettings(
3939
FeatureFlags.TERM_VERSION_PRECOMMIT_ENABLE_SETTING,
4040
FeatureFlags.ARROW_STREAMS_SETTING,
4141
FeatureFlags.STREAM_TRANSPORT_SETTING,
42-
FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING,
43-
FeatureFlags.STREAM_SEARCH_SETTING
42+
FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING
4443
);
4544
}

server/src/main/java/org/opensearch/common/util/FeatureFlags.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,6 @@ public class FeatureFlags {
120120
public static final String ARROW_STREAMS = FEATURE_FLAG_PREFIX + "arrow.streams.enabled";
121121
public static final Setting<Boolean> ARROW_STREAMS_SETTING = Setting.boolSetting(ARROW_STREAMS, false, Property.NodeScope);
122122

123-
public static final String STREAM_SEARCH = FEATURE_FLAG_PREFIX + "stream.search.enabled";
124-
public static final Setting<Boolean> STREAM_SEARCH_SETTING = Setting.boolSetting(STREAM_SEARCH, false, Property.NodeScope);
125-
126123
/**
127124
* Underlying implementation for feature flags.
128125
* All settable feature flags are tracked here in FeatureFlagsImpl.featureFlags.
@@ -148,7 +145,6 @@ static class FeatureFlagsImpl {
148145
put(TERM_VERSION_PRECOMMIT_ENABLE_SETTING, TERM_VERSION_PRECOMMIT_ENABLE_SETTING.getDefault(Settings.EMPTY));
149146
put(ARROW_STREAMS_SETTING, ARROW_STREAMS_SETTING.getDefault(Settings.EMPTY));
150147
put(STREAM_TRANSPORT_SETTING, STREAM_TRANSPORT_SETTING.getDefault(Settings.EMPTY));
151-
put(STREAM_SEARCH_SETTING, STREAM_SEARCH_SETTING.getDefault(Settings.EMPTY));
152148
put(MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING, MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING.getDefault(Settings.EMPTY));
153149
}
154150
};

server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.opensearch.action.search.StreamSearchAction;
4343
import org.opensearch.action.support.IndicesOptions;
4444
import org.opensearch.common.Booleans;
45+
import org.opensearch.common.settings.ClusterSettings;
4546
import org.opensearch.common.util.FeatureFlags;
4647
import org.opensearch.core.common.Strings;
4748
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
@@ -76,6 +77,7 @@
7677
import static java.util.Arrays.asList;
7778
import static java.util.Collections.unmodifiableList;
7879
import static org.opensearch.action.ValidateActions.addValidationError;
80+
import static org.opensearch.action.search.StreamSearchTransportService.STREAM_SEARCH_ENABLED;
7981
import static org.opensearch.common.unit.TimeValue.parseTimeValue;
8082
import static org.opensearch.rest.RestRequest.Method.GET;
8183
import static org.opensearch.rest.RestRequest.Method.POST;
@@ -97,6 +99,14 @@ public class RestSearchAction extends BaseRestHandler {
9799
public static final String INCLUDE_NAMED_QUERIES_SCORE_PARAM = "include_named_queries_score";
98100
private static final Set<String> RESPONSE_PARAMS;
99101

102+
private ClusterSettings clusterSettings;
103+
104+
public RestSearchAction() {}
105+
106+
public RestSearchAction(ClusterSettings clusterSettings) {
107+
this.clusterSettings = clusterSettings;
108+
}
109+
100110
static {
101111
final Set<String> responseParams = new HashSet<>(
102112
Arrays.asList(TYPED_KEYS_PARAM, TOTAL_HITS_AS_INT_PARAM, INCLUDE_NAMED_QUERIES_SCORE_PARAM)
@@ -141,7 +151,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
141151
parser -> parseSearchRequest(searchRequest, request, parser, client.getNamedWriteableRegistry(), setSize)
142152
);
143153

144-
if (FeatureFlags.isEnabled(FeatureFlags.STREAM_SEARCH)) {
154+
if (clusterSettings != null && clusterSettings.get(STREAM_SEARCH_ENABLED)) {
145155
if (FeatureFlags.isEnabled(FeatureFlags.STREAM_TRANSPORT)) {
146156
if (canUseStreamSearch(searchRequest)) {
147157
return channel -> {

server/src/test/java/org/opensearch/rest/action/search/RestSearchActionTests.java

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@
1212
import org.opensearch.action.ActionType;
1313
import org.opensearch.action.search.SearchAction;
1414
import org.opensearch.action.search.SearchRequest;
15+
import org.opensearch.action.search.SearchResponse;
16+
import org.opensearch.action.search.StreamSearchAction;
1517
import org.opensearch.common.SetOnce;
18+
import org.opensearch.common.settings.ClusterSettings;
19+
import org.opensearch.common.settings.Settings;
1620
import org.opensearch.common.util.FeatureFlags;
1721
import org.opensearch.core.action.ActionListener;
1822
import org.opensearch.core.action.ActionResponse;
@@ -27,7 +31,7 @@
2731
import org.opensearch.test.rest.FakeRestRequest;
2832
import org.opensearch.transport.client.node.NodeClient;
2933

30-
import static org.opensearch.common.util.FeatureFlags.STREAM_SEARCH;
34+
import static org.opensearch.action.search.StreamSearchTransportService.STREAM_SEARCH_ENABLED;
3135
import static org.opensearch.common.util.FeatureFlags.STREAM_TRANSPORT;
3236
import static org.hamcrest.Matchers.equalTo;
3337

@@ -53,45 +57,67 @@ public String getLocalNodeId() {
5357
};
5458
}
5559

56-
private void testActionExecution(ActionType<?> expectedAction) throws Exception {
60+
private ClusterSettings createClusterSettingsWithStreamSearchEnabled() {
61+
Settings settings = Settings.builder().put(STREAM_SEARCH_ENABLED.getKey(), true).build();
62+
return new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
63+
}
64+
65+
private SearchRequest createSearchRequestWithTermsAggregation() {
66+
SearchRequest searchRequest = new SearchRequest();
67+
SearchSourceBuilder source = new SearchSourceBuilder();
68+
source.aggregation(AggregationBuilders.terms("test_terms").field("category"));
69+
searchRequest.source(source);
70+
return searchRequest;
71+
}
72+
73+
public void testWithSearchStreamDisabled() throws Exception {
5774
SetOnce<ActionType<?>> capturedActionType = new SetOnce<>();
5875
try (NodeClient nodeClient = createMockNodeClient(capturedActionType)) {
5976
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).build();
6077
FakeRestChannel channel = new FakeRestChannel(request, false, 0);
6178

6279
new RestSearchAction().handleRequest(request, channel, nodeClient);
6380

64-
assertThat(capturedActionType.get(), equalTo(expectedAction));
81+
assertThat(capturedActionType.get(), equalTo(SearchAction.INSTANCE));
6582
}
6683
}
6784

68-
public void testWithSearchStreamFlagDisabled() throws Exception {
69-
// When SEARCH_STREAM flag is disabled, always use SearchAction
70-
testActionExecution(SearchAction.INSTANCE);
71-
}
72-
73-
@LockFeatureFlag(STREAM_SEARCH)
74-
public void testWithStreamSearchEnabledButStreamTransportDisabled() throws Exception {
75-
// When SEARCH_STREAM is enabled but STREAM_TRANSPORT is disabled, should throw exception
85+
// When stream search is enabled but STREAM_TRANSPORT is disabled, should throw exception
86+
public void testWithStreamSearchEnabledButStreamTransportDisabled() {
7687
try (NodeClient nodeClient = new NoOpNodeClient(this.getTestName())) {
77-
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).build();
78-
FakeRestChannel channel = new FakeRestChannel(request, false, 0);
88+
RestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry()).build();
89+
FakeRestChannel channel = new FakeRestChannel(restRequest, false, 0);
7990

8091
Exception e = expectThrows(
8192
IllegalArgumentException.class,
82-
() -> new RestSearchAction().handleRequest(request, channel, nodeClient)
93+
() -> new RestSearchAction(createClusterSettingsWithStreamSearchEnabled()).handleRequest(restRequest, channel, nodeClient)
8394
);
8495
assertThat(e.getMessage(), equalTo("You need to enable stream transport first to use stream search."));
8596
}
8697
}
8798

88-
public void testWithStreamSearchAndTransportEnabled() throws Exception {
89-
// When both SEARCH_STREAM and STREAM_TRANSPORT are enabled, should use StreamSearchAction
90-
try (
91-
FeatureFlags.TestUtils.FlagWriteLock searchStreamLock = new FeatureFlags.TestUtils.FlagWriteLock(STREAM_SEARCH);
92-
FeatureFlags.TestUtils.FlagWriteLock streamTransportLock = new FeatureFlags.TestUtils.FlagWriteLock(STREAM_TRANSPORT)
93-
) {
94-
testActionExecution(SearchAction.INSTANCE);
99+
@LockFeatureFlag(STREAM_TRANSPORT)
100+
public void testWithStreamSearchAndTransportEnabled() {
101+
ClusterSettings clusterSettings = createClusterSettingsWithStreamSearchEnabled();
102+
SearchRequest searchRequest = createSearchRequestWithTermsAggregation();
103+
104+
SetOnce<ActionType<?>> capturedActionType = new SetOnce<>();
105+
try (NodeClient nodeClient = createMockNodeClient(capturedActionType)) {
106+
// Verify all conditions are met for stream search
107+
assertTrue(clusterSettings.get(STREAM_SEARCH_ENABLED));
108+
assertTrue(FeatureFlags.isEnabled(FeatureFlags.STREAM_TRANSPORT));
109+
assertTrue(RestSearchAction.canUseStreamSearch(searchRequest));
110+
111+
// Execute the StreamSearchAction directly since we've verified the conditions
112+
nodeClient.executeLocally(StreamSearchAction.INSTANCE, searchRequest, new ActionListener<>() {
113+
@Override
114+
public void onResponse(SearchResponse response) {}
115+
116+
@Override
117+
public void onFailure(Exception e) {}
118+
});
119+
120+
assertThat(capturedActionType.get(), equalTo(StreamSearchAction.INSTANCE));
95121
}
96122
}
97123

0 commit comments

Comments
 (0)