Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement GRPC Script query ([#19455](https://github.com/opensearch-project/OpenSearch/pull/19455))
- [Search Stats] Add search & star-tree search query failure count metrics ([#19210](https://github.com/opensearch-project/OpenSearch/issues/19210))
- [Star-tree] Support for multi-terms aggregation ([#18398](https://github.com/opensearch-project/OpenSearch/issues/18398))
- Add stream search feature flag and auto fallback logic ([#19373](https://github.com/opensearch-project/OpenSearch/pull/19373))
- Add stream search enabled cluster setting and auto fallback logic ([#19506](https://github.com/opensearch-project/OpenSearch/pull/19506))
- Implement GRPC Exists, Regexp, and Wildcard queries ([#19392](https://github.com/opensearch-project/OpenSearch/pull/19392))
- Implement GRPC GeoBoundingBox, GeoDistance queries ([#19451](https://github.com/opensearch-project/OpenSearch/pull/19451))
- Implement GRPC Ids, Range, and Terms Set queries ([#19448](https://github.com/opensearch-project/OpenSearch/pull/19448))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public List<RestHandler> getRestHandlers(
// apis needed to access saved objects
new OpenSearchDashboardsWrappedRestHandler(new RestGetAction()),
new OpenSearchDashboardsWrappedRestHandler(new RestMultiGetAction(settings)),
new OpenSearchDashboardsWrappedRestHandler(new RestSearchAction()),
new OpenSearchDashboardsWrappedRestHandler(new RestSearchAction(clusterSettings)),
new OpenSearchDashboardsWrappedRestHandler(new RestBulkAction(settings)),
new OpenSearchDashboardsWrappedRestHandler(new RestBulkStreamingAction(settings)),
new OpenSearchDashboardsWrappedRestHandler(new RestDeleteAction()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestBulkStreamingAction(settings));
registerHandler.accept(new RestUpdateAction());

registerHandler.accept(new RestSearchAction());
registerHandler.accept(new RestSearchAction(clusterSettings));
registerHandler.accept(new RestSearchScrollAction());
registerHandler.accept(new RestClearScrollAction());
registerHandler.accept(new RestMultiSearchAction(settings));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.OriginalIndices;
import org.opensearch.action.support.StreamSearchChannelListener;
import org.opensearch.common.settings.Setting;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.Writeable;
Expand Down Expand Up @@ -54,6 +55,13 @@ public StreamSearchTransportService(
this.transportService = transportService;
}

public static final Setting<Boolean> STREAM_SEARCH_ENABLED = Setting.boolSetting(
"stream.search.enabled",
false,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public static void registerStreamRequestHandler(StreamTransportService transportService, SearchService searchService) {
transportService.registerRequestHandler(
QUERY_ACTION_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.action.search.CreatePitController;
import org.opensearch.action.search.SearchRequestSlowLog;
import org.opensearch.action.search.SearchRequestStats;
import org.opensearch.action.search.StreamSearchTransportService;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.support.AutoCreateIndex;
import org.opensearch.action.support.DestructiveOperations;
Expand Down Expand Up @@ -857,7 +858,8 @@ public void apply(Settings value, Settings current, Settings previous) {
ForceMergeManagerSettings.DISK_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE,
ForceMergeManagerSettings.JVM_THRESHOLD_PERCENTAGE_FOR_AUTO_FORCE_MERGE,
ForceMergeManagerSettings.CONCURRENCY_MULTIPLIER,
StreamTransportService.STREAM_TRANSPORT_REQ_TIMEOUT_SETTING
StreamTransportService.STREAM_TRANSPORT_REQ_TIMEOUT_SETTING,
StreamSearchTransportService.STREAM_SEARCH_ENABLED
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ protected FeatureFlagSettings(
FeatureFlags.TERM_VERSION_PRECOMMIT_ENABLE_SETTING,
FeatureFlags.ARROW_STREAMS_SETTING,
FeatureFlags.STREAM_TRANSPORT_SETTING,
FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING,
FeatureFlags.STREAM_SEARCH_SETTING
FeatureFlags.MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,6 @@ public class FeatureFlags {
public static final String ARROW_STREAMS = FEATURE_FLAG_PREFIX + "arrow.streams.enabled";
public static final Setting<Boolean> ARROW_STREAMS_SETTING = Setting.boolSetting(ARROW_STREAMS, false, Property.NodeScope);

public static final String STREAM_SEARCH = FEATURE_FLAG_PREFIX + "stream.search.enabled";
public static final Setting<Boolean> STREAM_SEARCH_SETTING = Setting.boolSetting(STREAM_SEARCH, false, Property.NodeScope);

/**
* Underlying implementation for feature flags.
* All settable feature flags are tracked here in FeatureFlagsImpl.featureFlags.
Expand All @@ -148,7 +145,6 @@ static class FeatureFlagsImpl {
put(TERM_VERSION_PRECOMMIT_ENABLE_SETTING, TERM_VERSION_PRECOMMIT_ENABLE_SETTING.getDefault(Settings.EMPTY));
put(ARROW_STREAMS_SETTING, ARROW_STREAMS_SETTING.getDefault(Settings.EMPTY));
put(STREAM_TRANSPORT_SETTING, STREAM_TRANSPORT_SETTING.getDefault(Settings.EMPTY));
put(STREAM_SEARCH_SETTING, STREAM_SEARCH_SETTING.getDefault(Settings.EMPTY));
put(MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING, MERGED_SEGMENT_WARMER_EXPERIMENTAL_SETTING.getDefault(Settings.EMPTY));
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.action.search.StreamSearchAction;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.common.Booleans;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
Expand Down Expand Up @@ -76,6 +77,7 @@
import static java.util.Arrays.asList;
import static java.util.Collections.unmodifiableList;
import static org.opensearch.action.ValidateActions.addValidationError;
import static org.opensearch.action.search.StreamSearchTransportService.STREAM_SEARCH_ENABLED;
import static org.opensearch.common.unit.TimeValue.parseTimeValue;
import static org.opensearch.rest.RestRequest.Method.GET;
import static org.opensearch.rest.RestRequest.Method.POST;
Expand All @@ -97,6 +99,14 @@ public class RestSearchAction extends BaseRestHandler {
public static final String INCLUDE_NAMED_QUERIES_SCORE_PARAM = "include_named_queries_score";
private static final Set<String> RESPONSE_PARAMS;

private ClusterSettings clusterSettings;

public RestSearchAction() {}

public RestSearchAction(ClusterSettings clusterSettings) {
this.clusterSettings = clusterSettings;
}

static {
final Set<String> responseParams = new HashSet<>(
Arrays.asList(TYPED_KEYS_PARAM, TOTAL_HITS_AS_INT_PARAM, INCLUDE_NAMED_QUERIES_SCORE_PARAM)
Expand Down Expand Up @@ -141,7 +151,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
parser -> parseSearchRequest(searchRequest, request, parser, client.getNamedWriteableRegistry(), setSize)
);

if (FeatureFlags.isEnabled(FeatureFlags.STREAM_SEARCH)) {
if (clusterSettings != null && clusterSettings.get(STREAM_SEARCH_ENABLED)) {
if (FeatureFlags.isEnabled(FeatureFlags.STREAM_TRANSPORT)) {
if (canUseStreamSearch(searchRequest)) {
return channel -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
import org.opensearch.action.ActionType;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.StreamSearchAction;
import org.opensearch.common.SetOnce;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
Expand All @@ -27,7 +31,7 @@
import org.opensearch.test.rest.FakeRestRequest;
import org.opensearch.transport.client.node.NodeClient;

import static org.opensearch.common.util.FeatureFlags.STREAM_SEARCH;
import static org.opensearch.action.search.StreamSearchTransportService.STREAM_SEARCH_ENABLED;
import static org.opensearch.common.util.FeatureFlags.STREAM_TRANSPORT;
import static org.hamcrest.Matchers.equalTo;

Expand All @@ -53,45 +57,67 @@ public String getLocalNodeId() {
};
}

private void testActionExecution(ActionType<?> expectedAction) throws Exception {
private ClusterSettings createClusterSettingsWithStreamSearchEnabled() {
Settings settings = Settings.builder().put(STREAM_SEARCH_ENABLED.getKey(), true).build();
return new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
}

private SearchRequest createSearchRequestWithTermsAggregation() {
SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder source = new SearchSourceBuilder();
source.aggregation(AggregationBuilders.terms("test_terms").field("category"));
searchRequest.source(source);
return searchRequest;
}

public void testWithSearchStreamDisabled() throws Exception {
SetOnce<ActionType<?>> capturedActionType = new SetOnce<>();
try (NodeClient nodeClient = createMockNodeClient(capturedActionType)) {
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).build();
FakeRestChannel channel = new FakeRestChannel(request, false, 0);

new RestSearchAction().handleRequest(request, channel, nodeClient);

assertThat(capturedActionType.get(), equalTo(expectedAction));
assertThat(capturedActionType.get(), equalTo(SearchAction.INSTANCE));
}
}

public void testWithSearchStreamFlagDisabled() throws Exception {
// When SEARCH_STREAM flag is disabled, always use SearchAction
testActionExecution(SearchAction.INSTANCE);
}

@LockFeatureFlag(STREAM_SEARCH)
public void testWithStreamSearchEnabledButStreamTransportDisabled() throws Exception {
// When SEARCH_STREAM is enabled but STREAM_TRANSPORT is disabled, should throw exception
// When stream search is enabled but STREAM_TRANSPORT is disabled, should throw exception
public void testWithStreamSearchEnabledButStreamTransportDisabled() {
try (NodeClient nodeClient = new NoOpNodeClient(this.getTestName())) {
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).build();
FakeRestChannel channel = new FakeRestChannel(request, false, 0);
RestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry()).build();
FakeRestChannel channel = new FakeRestChannel(restRequest, false, 0);

Exception e = expectThrows(
IllegalArgumentException.class,
() -> new RestSearchAction().handleRequest(request, channel, nodeClient)
() -> new RestSearchAction(createClusterSettingsWithStreamSearchEnabled()).handleRequest(restRequest, channel, nodeClient)
);
assertThat(e.getMessage(), equalTo("You need to enable stream transport first to use stream search."));
}
}

public void testWithStreamSearchAndTransportEnabled() throws Exception {
// When both SEARCH_STREAM and STREAM_TRANSPORT are enabled, should use StreamSearchAction
try (
FeatureFlags.TestUtils.FlagWriteLock searchStreamLock = new FeatureFlags.TestUtils.FlagWriteLock(STREAM_SEARCH);
FeatureFlags.TestUtils.FlagWriteLock streamTransportLock = new FeatureFlags.TestUtils.FlagWriteLock(STREAM_TRANSPORT)
) {
testActionExecution(SearchAction.INSTANCE);
@LockFeatureFlag(STREAM_TRANSPORT)
public void testWithStreamSearchAndTransportEnabled() {
ClusterSettings clusterSettings = createClusterSettingsWithStreamSearchEnabled();
SearchRequest searchRequest = createSearchRequestWithTermsAggregation();

SetOnce<ActionType<?>> capturedActionType = new SetOnce<>();
try (NodeClient nodeClient = createMockNodeClient(capturedActionType)) {
// Verify all conditions are met for stream search
assertTrue(clusterSettings.get(STREAM_SEARCH_ENABLED));
assertTrue(FeatureFlags.isEnabled(FeatureFlags.STREAM_TRANSPORT));
assertTrue(RestSearchAction.canUseStreamSearch(searchRequest));

// Execute the StreamSearchAction directly since we've verified the conditions
nodeClient.executeLocally(StreamSearchAction.INSTANCE, searchRequest, new ActionListener<>() {
@Override
public void onResponse(SearchResponse response) {}

@Override
public void onFailure(Exception e) {}
});

assertThat(capturedActionType.get(), equalTo(StreamSearchAction.INSTANCE));
}
}

Expand Down
Loading