diff --git a/CHANGELOG.md b/CHANGELOG.md index c0d7dabc0930c..bc740fb7327aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,7 +41,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Implement GRPC Script query ([#19455](https://github.com/opensearch-project/OpenSearch/pull/19455)) - [Search Stats] Add search & star-tree search query failure count metrics ([#19210](https://github.com/opensearch-project/OpenSearch/issues/19210)) - [Star-tree] Support for multi-terms aggregation ([#18398](https://github.com/opensearch-project/OpenSearch/issues/18398)) -- Add stream search feature flag and auto fallback logic ([#19373](https://github.com/opensearch-project/OpenSearch/pull/19373)) +- Add stream search enabled cluster setting and auto fallback logic ([#19506](https://github.com/opensearch-project/OpenSearch/pull/19506)) - Implement GRPC Exists, Regexp, and Wildcard queries ([#19392](https://github.com/opensearch-project/OpenSearch/pull/19392)) - Implement GRPC GeoBoundingBox, GeoDistance queries ([#19451](https://github.com/opensearch-project/OpenSearch/pull/19451)) - Implement GRPC Ids, Range, and Terms Set queries ([#19448](https://github.com/opensearch-project/OpenSearch/pull/19448)) diff --git a/modules/opensearch-dashboards/src/main/java/org/opensearch/dashboards/OpenSearchDashboardsModulePlugin.java b/modules/opensearch-dashboards/src/main/java/org/opensearch/dashboards/OpenSearchDashboardsModulePlugin.java index 6d5020336eb0b..f73e386ea8122 100644 --- a/modules/opensearch-dashboards/src/main/java/org/opensearch/dashboards/OpenSearchDashboardsModulePlugin.java +++ b/modules/opensearch-dashboards/src/main/java/org/opensearch/dashboards/OpenSearchDashboardsModulePlugin.java @@ -126,7 +126,7 @@ public List getRestHandlers( // apis needed to access saved objects new OpenSearchDashboardsWrappedRestHandler(new RestGetAction()), new OpenSearchDashboardsWrappedRestHandler(new RestMultiGetAction(settings)), - new OpenSearchDashboardsWrappedRestHandler(new RestSearchAction()), + new OpenSearchDashboardsWrappedRestHandler(new RestSearchAction(clusterSettings)), new OpenSearchDashboardsWrappedRestHandler(new RestBulkAction(settings)), new OpenSearchDashboardsWrappedRestHandler(new RestBulkStreamingAction(settings)), new OpenSearchDashboardsWrappedRestHandler(new RestDeleteAction()), diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 4755eb8d21999..12fbabf341c41 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -945,7 +945,7 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestBulkStreamingAction(settings)); registerHandler.accept(new RestUpdateAction()); - registerHandler.accept(new RestSearchAction()); + registerHandler.accept(new RestSearchAction(clusterSettings)); registerHandler.accept(new RestSearchScrollAction()); registerHandler.accept(new RestClearScrollAction()); registerHandler.accept(new RestMultiSearchAction(settings)); diff --git a/server/src/main/java/org/opensearch/action/search/StreamSearchTransportService.java b/server/src/main/java/org/opensearch/action/search/StreamSearchTransportService.java index 94f30c046cc7b..3bb251af66204 100644 --- a/server/src/main/java/org/opensearch/action/search/StreamSearchTransportService.java +++ b/server/src/main/java/org/opensearch/action/search/StreamSearchTransportService.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.action.OriginalIndices; import org.opensearch.action.support.StreamSearchChannelListener; +import org.opensearch.common.settings.Setting; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.Writeable; @@ -54,6 +55,13 @@ public StreamSearchTransportService( this.transportService = transportService; } + public static final Setting STREAM_SEARCH_ENABLED = Setting.boolSetting( + "stream.search.enabled", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + public static void registerStreamRequestHandler(StreamTransportService transportService, SearchService searchService) { transportService.registerRequestHandler( QUERY_ACTION_NAME, diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 3cbf6007cd563..d562e81a1297a 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -37,6 +37,7 @@ import org.opensearch.action.search.CreatePitController; import org.opensearch.action.search.SearchRequestSlowLog; import org.opensearch.action.search.SearchRequestStats; +import org.opensearch.action.search.StreamSearchTransportService; import org.opensearch.action.search.TransportSearchAction; import org.opensearch.action.support.AutoCreateIndex; import org.opensearch.action.support.DestructiveOperations; @@ -857,7 +858,8 @@ public void apply(Settings value, Settings current, Settings previous) { ForceMergeManagerSettings.DISK_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE, ForceMergeManagerSettings.JVM_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE, ForceMergeManagerSettings.CONCURRENCY_MULTIPLIER, - StreamTransportService.STREAM_TRANSPORT_REQ_TIMEOUT_SETTING + StreamTransportService.STREAM_TRANSPORT_REQ_TIMEOUT_SETTING, + StreamSearchTransportService.STREAM_SEARCH_ENABLED ) ) ); diff --git a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java index c6671915ba986..ba6ba1f88b58c 100644 --- a/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/FeatureFlagSettings.java @@ -39,7 +39,6 @@ protected FeatureFlagSettings( FeatureFlags.TERM_VERSION_PRECOMMIT_ENABLE_SETTING, FeatureFlags.ARROW_STREAMS_SETTING, FeatureFlags.STREAM_TRANSPORT_SETTING, - FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING, - FeatureFlags.STREAM_SEARCH_SETTING + FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING ); } diff --git a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java index ac493fe4b3d24..c53922b0e5ceb 100644 --- a/server/src/main/java/org/opensearch/common/util/FeatureFlags.java +++ b/server/src/main/java/org/opensearch/common/util/FeatureFlags.java @@ -120,9 +120,6 @@ public class FeatureFlags { public static final String ARROW_STREAMS = FEATURE_FLAG_PREFIX + "arrow.streams.enabled"; public static final Setting ARROW_STREAMS_SETTING = Setting.boolSetting(ARROW_STREAMS, false, Property.NodeScope); - public static final String STREAM_SEARCH = FEATURE_FLAG_PREFIX + "stream.search.enabled"; - public static final Setting STREAM_SEARCH_SETTING = Setting.boolSetting(STREAM_SEARCH, false, Property.NodeScope); - /** * Underlying implementation for feature flags. * All settable feature flags are tracked here in FeatureFlagsImpl.featureFlags. @@ -148,7 +145,6 @@ static class FeatureFlagsImpl { put(TERM_VERSION_PRECOMMIT_ENABLE_SETTING, TERM_VERSION_PRECOMMIT_ENABLE_SETTING.getDefault(Settings.EMPTY)); put(ARROW_STREAMS_SETTING, ARROW_STREAMS_SETTING.getDefault(Settings.EMPTY)); put(STREAM_TRANSPORT_SETTING, STREAM_TRANSPORT_SETTING.getDefault(Settings.EMPTY)); - put(STREAM_SEARCH_SETTING, STREAM_SEARCH_SETTING.getDefault(Settings.EMPTY)); put(MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING, MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING.getDefault(Settings.EMPTY)); } }; diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java index e8b2793dc4633..fb3bc549572d1 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java @@ -42,6 +42,7 @@ import org.opensearch.action.search.StreamSearchAction; import org.opensearch.action.support.IndicesOptions; import org.opensearch.common.Booleans; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.common.Strings; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; @@ -76,6 +77,7 @@ import static java.util.Arrays.asList; import static java.util.Collections.unmodifiableList; import static org.opensearch.action.ValidateActions.addValidationError; +import static org.opensearch.action.search.StreamSearchTransportService.STREAM_SEARCH_ENABLED; import static org.opensearch.common.unit.TimeValue.parseTimeValue; import static org.opensearch.rest.RestRequest.Method.GET; import static org.opensearch.rest.RestRequest.Method.POST; @@ -97,6 +99,14 @@ public class RestSearchAction extends BaseRestHandler { public static final String INCLUDE_NAMED_QUERIES_SCORE_PARAM = "include_named_queries_score"; private static final Set RESPONSE_PARAMS; + private ClusterSettings clusterSettings; + + public RestSearchAction() {} + + public RestSearchAction(ClusterSettings clusterSettings) { + this.clusterSettings = clusterSettings; + } + static { final Set responseParams = new HashSet<>( Arrays.asList(TYPED_KEYS_PARAM, TOTAL_HITS_AS_INT_PARAM, INCLUDE_NAMED_QUERIES_SCORE_PARAM) @@ -141,7 +151,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC parser -> parseSearchRequest(searchRequest, request, parser, client.getNamedWriteableRegistry(), setSize) ); - if (FeatureFlags.isEnabled(FeatureFlags.STREAM_SEARCH)) { + if (clusterSettings != null && clusterSettings.get(STREAM_SEARCH_ENABLED)) { if (FeatureFlags.isEnabled(FeatureFlags.STREAM_TRANSPORT)) { if (canUseStreamSearch(searchRequest)) { return channel -> { diff --git a/server/src/test/java/org/opensearch/rest/action/search/RestSearchActionTests.java b/server/src/test/java/org/opensearch/rest/action/search/RestSearchActionTests.java index 3b4c57879629f..0aeb37e7c59f4 100644 --- a/server/src/test/java/org/opensearch/rest/action/search/RestSearchActionTests.java +++ b/server/src/test/java/org/opensearch/rest/action/search/RestSearchActionTests.java @@ -12,7 +12,11 @@ import org.opensearch.action.ActionType; import org.opensearch.action.search.SearchAction; import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.StreamSearchAction; import org.opensearch.common.SetOnce; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; import org.opensearch.core.action.ActionListener; import org.opensearch.core.action.ActionResponse; @@ -27,7 +31,7 @@ import org.opensearch.test.rest.FakeRestRequest; import org.opensearch.transport.client.node.NodeClient; -import static org.opensearch.common.util.FeatureFlags.STREAM_SEARCH; +import static org.opensearch.action.search.StreamSearchTransportService.STREAM_SEARCH_ENABLED; import static org.opensearch.common.util.FeatureFlags.STREAM_TRANSPORT; import static org.hamcrest.Matchers.equalTo; @@ -53,7 +57,20 @@ public String getLocalNodeId() { }; } - private void testActionExecution(ActionType expectedAction) throws Exception { + private ClusterSettings createClusterSettingsWithStreamSearchEnabled() { + Settings settings = Settings.builder().put(STREAM_SEARCH_ENABLED.getKey(), true).build(); + return new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + } + + private SearchRequest createSearchRequestWithTermsAggregation() { + SearchRequest searchRequest = new SearchRequest(); + SearchSourceBuilder source = new SearchSourceBuilder(); + source.aggregation(AggregationBuilders.terms("test_terms").field("category")); + searchRequest.source(source); + return searchRequest; + } + + public void testWithSearchStreamDisabled() throws Exception { SetOnce> capturedActionType = new SetOnce<>(); try (NodeClient nodeClient = createMockNodeClient(capturedActionType)) { RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).build(); @@ -61,37 +78,46 @@ private void testActionExecution(ActionType expectedAction) throws Exception new RestSearchAction().handleRequest(request, channel, nodeClient); - assertThat(capturedActionType.get(), equalTo(expectedAction)); + assertThat(capturedActionType.get(), equalTo(SearchAction.INSTANCE)); } } - public void testWithSearchStreamFlagDisabled() throws Exception { - // When SEARCH_STREAM flag is disabled, always use SearchAction - testActionExecution(SearchAction.INSTANCE); - } - - @LockFeatureFlag(STREAM_SEARCH) - public void testWithStreamSearchEnabledButStreamTransportDisabled() throws Exception { - // When SEARCH_STREAM is enabled but STREAM_TRANSPORT is disabled, should throw exception + // When stream search is enabled but STREAM_TRANSPORT is disabled, should throw exception + public void testWithStreamSearchEnabledButStreamTransportDisabled() { try (NodeClient nodeClient = new NoOpNodeClient(this.getTestName())) { - RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).build(); - FakeRestChannel channel = new FakeRestChannel(request, false, 0); + RestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry()).build(); + FakeRestChannel channel = new FakeRestChannel(restRequest, false, 0); Exception e = expectThrows( IllegalArgumentException.class, - () -> new RestSearchAction().handleRequest(request, channel, nodeClient) + () -> new RestSearchAction(createClusterSettingsWithStreamSearchEnabled()).handleRequest(restRequest, channel, nodeClient) ); assertThat(e.getMessage(), equalTo("You need to enable stream transport first to use stream search.")); } } - public void testWithStreamSearchAndTransportEnabled() throws Exception { - // When both SEARCH_STREAM and STREAM_TRANSPORT are enabled, should use StreamSearchAction - try ( - FeatureFlags.TestUtils.FlagWriteLock searchStreamLock = new FeatureFlags.TestUtils.FlagWriteLock(STREAM_SEARCH); - FeatureFlags.TestUtils.FlagWriteLock streamTransportLock = new FeatureFlags.TestUtils.FlagWriteLock(STREAM_TRANSPORT) - ) { - testActionExecution(SearchAction.INSTANCE); + @LockFeatureFlag(STREAM_TRANSPORT) + public void testWithStreamSearchAndTransportEnabled() { + ClusterSettings clusterSettings = createClusterSettingsWithStreamSearchEnabled(); + SearchRequest searchRequest = createSearchRequestWithTermsAggregation(); + + SetOnce> capturedActionType = new SetOnce<>(); + try (NodeClient nodeClient = createMockNodeClient(capturedActionType)) { + // Verify all conditions are met for stream search + assertTrue(clusterSettings.get(STREAM_SEARCH_ENABLED)); + assertTrue(FeatureFlags.isEnabled(FeatureFlags.STREAM_TRANSPORT)); + assertTrue(RestSearchAction.canUseStreamSearch(searchRequest)); + + // Execute the StreamSearchAction directly since we've verified the conditions + nodeClient.executeLocally(StreamSearchAction.INSTANCE, searchRequest, new ActionListener<>() { + @Override + public void onResponse(SearchResponse response) {} + + @Override + public void onFailure(Exception e) {} + }); + + assertThat(capturedActionType.get(), equalTo(StreamSearchAction.INSTANCE)); } }