Skip to content

Update opensearch to 3.0.0 #25789

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 9 additions & 28 deletions plugin/trino-opensearch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

<properties>
<air.compiler.fail-warnings>true</air.compiler.fail-warnings>
<dep.opensearch.version>2.19.2</dep.opensearch.version>
<dep.opensearch.version>3.0.0</dep.opensearch.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -105,36 +105,13 @@
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<version>4.1.5</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
</dependency>

<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore-nio</artifactId>
<groupId>org.apache.httpcomponents.core5</groupId>
<artifactId>httpcore5</artifactId>
</dependency>

<dependency>
Expand Down Expand Up @@ -164,6 +141,10 @@
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.bouncycastle</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@
import com.amazonaws.auth.AWS4Signer;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.http.HttpMethodName;
import org.apache.http.Header;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.NameValuePair;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.entity.BasicHttpEntity;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HttpContext;
import org.apache.hc.client5.http.RouteInfo;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.HttpEntityContainer;
import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.HttpRequestInterceptor;
import org.apache.hc.core5.http.NameValuePair;
import org.apache.hc.core5.http.io.entity.BasicHttpEntity;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.net.URIBuilder;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -41,7 +45,6 @@
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.plugin.opensearch.AwsSecurityConfig.DeploymentType;
import static java.lang.String.CASE_INSENSITIVE_ORDER;
import static org.apache.http.protocol.HttpCoreContext.HTTP_TARGET_HOST;

class AwsRequestSigner
implements HttpRequestInterceptor
Expand All @@ -64,12 +67,15 @@ public AwsRequestSigner(String region, DeploymentType deploymentType, AWSCredent
}

@Override
public void process(HttpRequest request, HttpContext context)
public void process(HttpRequest request, EntityDetails entityDetails, HttpContext context)
throws IOException
{
String method = request.getRequestLine().getMethod();
if (!(context instanceof HttpClientContext httpClientContext)) {
throw new IllegalStateException("HttpContext is not HttpClientContext");
}
String method = request.getMethod();

URI uri = URI.create(request.getRequestLine().getUri());
URI uri = URI.create(request.getRequestUri());
URIBuilder uriBuilder = new URIBuilder(uri);

Map<String, List<String>> parameters = new TreeMap<>(CASE_INSENSITIVE_ORDER);
Expand All @@ -78,21 +84,21 @@ public void process(HttpRequest request, HttpContext context)
.add(parameter.getValue());
}

Map<String, String> headers = Arrays.stream(request.getAllHeaders())
Map<String, String> headers = Arrays.stream(request.getHeaders())
.collect(toImmutableMap(Header::getName, Header::getValue));

InputStream content = null;
if (request instanceof HttpEntityEnclosingRequest enclosingRequest) {
if (request instanceof HttpEntityContainer enclosingRequest) {
if (enclosingRequest.getEntity() != null) {
content = enclosingRequest.getEntity().getContent();
}
}

DefaultRequest<?> awsRequest = new DefaultRequest<>(serviceName);

HttpHost host = (HttpHost) context.getAttribute(HTTP_TARGET_HOST);
if (host != null) {
awsRequest.setEndpoint(URI.create(host.toURI()));
RouteInfo route = httpClientContext.getHttpRoute();
if (route != null) {
awsRequest.setEndpoint(URI.create(route.getTargetHost().toURI()));
}
awsRequest.setHttpMethod(HttpMethodName.fromValue(method));
awsRequest.setResourcePath(uri.getRawPath());
Expand All @@ -106,14 +112,14 @@ public void process(HttpRequest request, HttpContext context)
.map(entry -> new BasicHeader(entry.getKey(), entry.getValue()))
.toArray(Header[]::new);

request.setHeaders(newHeaders);

InputStream newContent = awsRequest.getContent();
checkState(newContent == null || request instanceof HttpEntityEnclosingRequest);
checkState(newContent == null || request instanceof HttpEntityContainer);
if (newContent != null) {
BasicHttpEntity entity = new BasicHttpEntity();
entity.setContent(newContent);
((HttpEntityEnclosingRequest) request).setEntity(entity);
HttpEntityContainer entityContainer = (HttpEntityContainer) request;
HttpEntity existingEntity = entityContainer.getEntity();
HttpEntity newEntity = new BasicHttpEntity(newContent, existingEntity.getContentLength(), ContentType.create(existingEntity.getContentType()), existingEntity.getContentEncoding());
entityContainer.setEntity(newEntity);
}
request.setHeaders(newHeaders);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
import io.airlift.log.Logger;
import io.airlift.stats.TimeStat;
import io.trino.plugin.opensearch.OpenSearchConfig;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.HttpHost;
import org.opensearch.client.Node;
import org.opensearch.client.Request;
import org.opensearch.client.RequestOptions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,25 @@
import io.trino.spi.TrinoException;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
import org.apache.hc.client5.http.auth.AuthScope;
import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
import org.apache.hc.client5.http.ssl.NoopHostnameVerifier;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.ParseException;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.util.Timeout;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.SearchRequest;
Expand All @@ -73,6 +78,7 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.Iterator;
Expand Down Expand Up @@ -177,7 +183,7 @@ private void refreshNodes()
.map(OpenSearchNode::address)
.filter(Optional::isPresent)
.map(Optional::get)
.map(address -> HttpHost.create(format("%s://%s", tlsEnabled ? "https" : "http", address)))
.map(address -> createHost(address, tlsEnabled))
.toArray(HttpHost[]::new);

if (hosts.length > 0 && !ignorePublishAddress) {
Expand All @@ -193,50 +199,71 @@ private void refreshNodes()
}
}

@VisibleForTesting
public static HttpHost createHost(String address, boolean tlsEnabled)
{
try {
return HttpHost.create(format("%s://%s", tlsEnabled ? "https" : "http", address));
}
catch (URISyntaxException e) {
throw new RuntimeException("Could not create host address", e);
}
}

private static BackpressureRestHighLevelClient createClient(
OpenSearchConfig config,
Optional<AwsSecurityConfig> awsSecurityConfig,
Optional<PasswordConfig> passwordConfig,
TimeStat backpressureStats)
{
RestClientBuilder builder = RestClient.builder(
config.getHosts().stream()
.map(httpHost -> new HttpHost(httpHost, config.getPort(), config.isTlsEnabled() ? "https" : "http"))
.toArray(HttpHost[]::new));
HttpHost[] hosts = config.getHosts().stream()
.map(httpHost -> createHost("%s:%d".formatted(httpHost, config.getPort()), config.isTlsEnabled()))
.toArray(HttpHost[]::new);
RestClientBuilder builder = RestClient.builder(hosts);

builder.setHttpClientConfigCallback(_ -> {
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(toIntExact(config.getConnectTimeout().toMillis()))
.setSocketTimeout(toIntExact(config.getRequestTimeout().toMillis()))
.setConnectionRequestTimeout(Timeout.of(config.getRequestTimeout().toJavaTime()))
.build();

IOReactorConfig reactorConfig = IOReactorConfig.custom()
.setIoThreadCount(config.getHttpThreadCount())
.build();

PoolingAsyncClientConnectionManagerBuilder connectionManager = PoolingAsyncClientConnectionManagerBuilder.create();
connectionManager.setDefaultConnectionConfig(ConnectionConfig.custom()
.setConnectTimeout(Timeout.of(config.getConnectTimeout().toJavaTime()))
.build());
connectionManager.setMaxConnPerRoute(config.getMaxHttpConnections());
connectionManager.setMaxConnTotal(config.getMaxHttpConnections());

// the client builder passed to the call-back is configured to use system properties, which makes it
// impossible to configure concurrency settings, so we need to build a new one from scratch
HttpAsyncClientBuilder clientBuilder = HttpAsyncClientBuilder.create()
HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom()
.setDefaultRequestConfig(requestConfig)
.setDefaultIOReactorConfig(reactorConfig)
.setMaxConnPerRoute(config.getMaxHttpConnections())
.setMaxConnTotal(config.getMaxHttpConnections());
.setIOReactorConfig(reactorConfig);

if (config.isTlsEnabled()) {
ClientTlsStrategyBuilder tlsStrategyBuilder = ClientTlsStrategyBuilder.create();
buildSslContext(config.getKeystorePath(), config.getKeystorePassword(), config.getTrustStorePath(), config.getTruststorePassword())
.ifPresent(clientBuilder::setSSLContext);
.ifPresent(tlsStrategyBuilder::setSslContext);

if (!config.isVerifyHostnames()) {
clientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
tlsStrategyBuilder.setHostnameVerifier(NoopHostnameVerifier.INSTANCE);
}

connectionManager.setTlsStrategy(tlsStrategyBuilder.build());
}

clientBuilder.setConnectionManager(connectionManager.build());

passwordConfig.ifPresent(securityConfig -> {
CredentialsProvider credentials = new BasicCredentialsProvider();
credentials.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(securityConfig.getUser(), securityConfig.getPassword()));
BasicCredentialsProvider credentials = new BasicCredentialsProvider();
credentials.setCredentials(new AuthScope(null, -1), new UsernamePasswordCredentials(securityConfig.getUser(), securityConfig.getPassword().toCharArray()));
clientBuilder.setDefaultCredentialsProvider(credentials);
});

awsSecurityConfig.ifPresent(securityConfig -> clientBuilder.addInterceptorLast(new AwsRequestSigner(
awsSecurityConfig.ifPresent(securityConfig -> clientBuilder.addRequestInterceptorLast(new AwsRequestSigner(
securityConfig.getRegion(),
securityConfig.getDeploymentType(),
getAwsCredentialsProvider(securityConfig))));
Expand Down Expand Up @@ -552,8 +579,7 @@ public String executeQuery(String index, String query)
"GET",
path,
ImmutableMap.of(),
new ByteArrayEntity(query.getBytes(UTF_8)),
new BasicHeader("Content-Type", "application/json"),
new StringEntity(query, ContentType.APPLICATION_JSON, false),
new BasicHeader("Accept-Encoding", "application/json"));
}
catch (IOException e) {
Expand All @@ -564,7 +590,7 @@ public String executeQuery(String index, String query)
try {
body = EntityUtils.toString(response.getEntity());
}
catch (IOException e) {
catch (IOException | ParseException e) {
throw new TrinoException(OPENSEARCH_INVALID_RESPONSE, e);
}

Expand Down Expand Up @@ -749,7 +775,6 @@ private <T> T doRequest(String path, ResponseHandler<T> handler)
private static TrinoException propagate(ResponseException exception)
{
HttpEntity entity = exception.getResponse().getEntity();

if (entity != null && entity.getContentType() != null) {
try {
JsonNode reason = OBJECT_MAPPER.readTree(entity.getContent()).path("error")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import io.trino.testing.MaterializedResult;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingConnectorBehavior;
import org.apache.http.HttpHost;
import org.apache.hc.core5.http.HttpHost;
import org.intellij.lang.annotations.Language;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Disabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingTrinoClient;
import io.trino.tpch.TpchTable;
import org.apache.http.HttpHost;
import org.apache.hc.core5.http.HttpHost;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestHighLevelClient;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.airlift.json.ObjectMapperProvider;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import org.apache.http.HttpHost;
import org.apache.hc.core5.http.HttpHost;
import org.intellij.lang.annotations.Language;
import org.junit.jupiter.api.Test;
import org.opensearch.client.Request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import io.trino.sql.planner.assertions.BasePushdownPlanTest;
import io.trino.sql.planner.assertions.PlanMatchPattern;
import io.trino.testing.PlanTester;
import org.apache.http.HttpHost;
import org.apache.hc.core5.http.HttpHost;
import org.intellij.lang.annotations.Language;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -151,7 +151,7 @@ void testPushdownDisabled()
.build();
@Language("JSON")
String properties =
"""
"""
{
"properties": {
"col0": {
Expand Down
Loading