diff --git a/plugin/trino-opensearch/pom.xml b/plugin/trino-opensearch/pom.xml index 24ea6ef60bf0..424349537dcd 100644 --- a/plugin/trino-opensearch/pom.xml +++ b/plugin/trino-opensearch/pom.xml @@ -15,7 +15,7 @@ true - 2.19.2 + 3.0.0 @@ -105,36 +105,13 @@ - org.apache.httpcomponents - httpasyncclient - 4.1.5 - - - commons-logging - commons-logging - - - - - - org.apache.httpcomponents - httpclient - - - commons-logging - commons-logging - - + org.apache.httpcomponents.client5 + httpclient5 - org.apache.httpcomponents - httpcore - - - - org.apache.httpcomponents - httpcore-nio + org.apache.httpcomponents.core5 + httpcore5 @@ -164,6 +141,10 @@ commons-logging commons-logging + + org.bouncycastle + * + diff --git a/plugin/trino-opensearch/src/main/java/io/trino/plugin/opensearch/client/AwsRequestSigner.java b/plugin/trino-opensearch/src/main/java/io/trino/plugin/opensearch/client/AwsRequestSigner.java index 052e3505c838..0d3a21fe978d 100644 --- a/plugin/trino-opensearch/src/main/java/io/trino/plugin/opensearch/client/AwsRequestSigner.java +++ b/plugin/trino-opensearch/src/main/java/io/trino/plugin/opensearch/client/AwsRequestSigner.java @@ -17,16 +17,20 @@ import com.amazonaws.auth.AWS4Signer; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.http.HttpMethodName; -import org.apache.http.Header; -import org.apache.http.HttpEntityEnclosingRequest; -import org.apache.http.HttpHost; -import org.apache.http.HttpRequest; -import org.apache.http.HttpRequestInterceptor; -import org.apache.http.NameValuePair; -import org.apache.http.client.utils.URIBuilder; -import org.apache.http.entity.BasicHttpEntity; -import org.apache.http.message.BasicHeader; -import org.apache.http.protocol.HttpContext; +import org.apache.hc.client5.http.RouteInfo; +import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpEntityContainer; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpRequestInterceptor; +import org.apache.hc.core5.http.NameValuePair; +import org.apache.hc.core5.http.io.entity.BasicHttpEntity; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.net.URIBuilder; import java.io.IOException; import java.io.InputStream; @@ -41,7 +45,6 @@ import static com.google.common.collect.ImmutableMap.toImmutableMap; import static io.trino.plugin.opensearch.AwsSecurityConfig.DeploymentType; import static java.lang.String.CASE_INSENSITIVE_ORDER; -import static org.apache.http.protocol.HttpCoreContext.HTTP_TARGET_HOST; class AwsRequestSigner implements HttpRequestInterceptor @@ -64,12 +67,15 @@ public AwsRequestSigner(String region, DeploymentType deploymentType, AWSCredent } @Override - public void process(HttpRequest request, HttpContext context) + public void process(HttpRequest request, EntityDetails entityDetails, HttpContext context) throws IOException { - String method = request.getRequestLine().getMethod(); + if (!(context instanceof HttpClientContext httpClientContext)) { + throw new IllegalStateException("HttpContext is not HttpClientContext"); + } + String method = request.getMethod(); - URI uri = URI.create(request.getRequestLine().getUri()); + URI uri = URI.create(request.getRequestUri()); URIBuilder uriBuilder = new URIBuilder(uri); Map> parameters = new TreeMap<>(CASE_INSENSITIVE_ORDER); @@ -78,11 +84,11 @@ public void process(HttpRequest request, HttpContext context) .add(parameter.getValue()); } - Map headers = Arrays.stream(request.getAllHeaders()) + Map headers = Arrays.stream(request.getHeaders()) .collect(toImmutableMap(Header::getName, Header::getValue)); InputStream content = null; - if (request instanceof HttpEntityEnclosingRequest enclosingRequest) { + if (request instanceof HttpEntityContainer enclosingRequest) { if (enclosingRequest.getEntity() != null) { content = enclosingRequest.getEntity().getContent(); } @@ -90,9 +96,9 @@ public void process(HttpRequest request, HttpContext context) DefaultRequest awsRequest = new DefaultRequest<>(serviceName); - HttpHost host = (HttpHost) context.getAttribute(HTTP_TARGET_HOST); - if (host != null) { - awsRequest.setEndpoint(URI.create(host.toURI())); + RouteInfo route = httpClientContext.getHttpRoute(); + if (route != null) { + awsRequest.setEndpoint(URI.create(route.getTargetHost().toURI())); } awsRequest.setHttpMethod(HttpMethodName.fromValue(method)); awsRequest.setResourcePath(uri.getRawPath()); @@ -106,14 +112,14 @@ public void process(HttpRequest request, HttpContext context) .map(entry -> new BasicHeader(entry.getKey(), entry.getValue())) .toArray(Header[]::new); - request.setHeaders(newHeaders); - InputStream newContent = awsRequest.getContent(); - checkState(newContent == null || request instanceof HttpEntityEnclosingRequest); + checkState(newContent == null || request instanceof HttpEntityContainer); if (newContent != null) { - BasicHttpEntity entity = new BasicHttpEntity(); - entity.setContent(newContent); - ((HttpEntityEnclosingRequest) request).setEntity(entity); + HttpEntityContainer entityContainer = (HttpEntityContainer) request; + HttpEntity existingEntity = entityContainer.getEntity(); + HttpEntity newEntity = new BasicHttpEntity(newContent, existingEntity.getContentLength(), ContentType.create(existingEntity.getContentType()), existingEntity.getContentEncoding()); + entityContainer.setEntity(newEntity); } + request.setHeaders(newHeaders); } } diff --git a/plugin/trino-opensearch/src/main/java/io/trino/plugin/opensearch/client/BackpressureRestClient.java b/plugin/trino-opensearch/src/main/java/io/trino/plugin/opensearch/client/BackpressureRestClient.java index b4a34203f76d..f9a1874dd587 100644 --- a/plugin/trino-opensearch/src/main/java/io/trino/plugin/opensearch/client/BackpressureRestClient.java +++ b/plugin/trino-opensearch/src/main/java/io/trino/plugin/opensearch/client/BackpressureRestClient.java @@ -23,9 +23,9 @@ import io.airlift.log.Logger; import io.airlift.stats.TimeStat; import io.trino.plugin.opensearch.OpenSearchConfig; -import org.apache.http.Header; -import org.apache.http.HttpEntity; -import org.apache.http.HttpHost; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpHost; import org.opensearch.client.Node; import org.opensearch.client.Request; import org.opensearch.client.RequestOptions; diff --git a/plugin/trino-opensearch/src/main/java/io/trino/plugin/opensearch/client/OpenSearchClient.java b/plugin/trino-opensearch/src/main/java/io/trino/plugin/opensearch/client/OpenSearchClient.java index 3100b6a7a4d6..9c855db91931 100644 --- a/plugin/trino-opensearch/src/main/java/io/trino/plugin/opensearch/client/OpenSearchClient.java +++ b/plugin/trino-opensearch/src/main/java/io/trino/plugin/opensearch/client/OpenSearchClient.java @@ -39,20 +39,25 @@ import io.trino.spi.TrinoException; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; -import org.apache.http.HttpEntity; -import org.apache.http.HttpHost; -import org.apache.http.auth.AuthScope; -import org.apache.http.auth.UsernamePasswordCredentials; -import org.apache.http.client.CredentialsProvider; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.conn.ssl.NoopHostnameVerifier; -import org.apache.http.entity.ByteArrayEntity; -import org.apache.http.entity.StringEntity; -import org.apache.http.impl.client.BasicCredentialsProvider; -import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; -import org.apache.http.impl.nio.reactor.IOReactorConfig; -import org.apache.http.message.BasicHeader; -import org.apache.http.util.EntityUtils; +import org.apache.hc.client5.http.auth.AuthScope; +import org.apache.hc.client5.http.auth.UsernamePasswordCredentials; +import org.apache.hc.client5.http.config.ConnectionConfig; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder; +import org.apache.hc.client5.http.impl.async.HttpAsyncClients; +import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; +import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; +import org.apache.hc.client5.http.ssl.NoopHostnameVerifier; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.ParseException; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.hc.core5.http.io.entity.StringEntity; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.reactor.IOReactorConfig; +import org.apache.hc.core5.util.Timeout; import org.opensearch.OpenSearchStatusException; import org.opensearch.action.search.ClearScrollRequest; import org.opensearch.action.search.SearchRequest; @@ -73,6 +78,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.net.URISyntaxException; import java.security.GeneralSecurityException; import java.util.Arrays; import java.util.Iterator; @@ -177,7 +183,7 @@ private void refreshNodes() .map(OpenSearchNode::address) .filter(Optional::isPresent) .map(Optional::get) - .map(address -> HttpHost.create(format("%s://%s", tlsEnabled ? "https" : "http", address))) + .map(address -> createHost(address, tlsEnabled)) .toArray(HttpHost[]::new); if (hosts.length > 0 && !ignorePublishAddress) { @@ -193,50 +199,71 @@ private void refreshNodes() } } + @VisibleForTesting + public static HttpHost createHost(String address, boolean tlsEnabled) + { + try { + return HttpHost.create(format("%s://%s", tlsEnabled ? "https" : "http", address)); + } + catch (URISyntaxException e) { + throw new RuntimeException("Could not create host address", e); + } + } + private static BackpressureRestHighLevelClient createClient( OpenSearchConfig config, Optional awsSecurityConfig, Optional passwordConfig, TimeStat backpressureStats) { - RestClientBuilder builder = RestClient.builder( - config.getHosts().stream() - .map(httpHost -> new HttpHost(httpHost, config.getPort(), config.isTlsEnabled() ? "https" : "http")) - .toArray(HttpHost[]::new)); + HttpHost[] hosts = config.getHosts().stream() + .map(httpHost -> createHost("%s:%d".formatted(httpHost, config.getPort()), config.isTlsEnabled())) + .toArray(HttpHost[]::new); + RestClientBuilder builder = RestClient.builder(hosts); builder.setHttpClientConfigCallback(_ -> { RequestConfig requestConfig = RequestConfig.custom() - .setConnectTimeout(toIntExact(config.getConnectTimeout().toMillis())) - .setSocketTimeout(toIntExact(config.getRequestTimeout().toMillis())) + .setConnectionRequestTimeout(Timeout.of(config.getRequestTimeout().toJavaTime())) .build(); IOReactorConfig reactorConfig = IOReactorConfig.custom() .setIoThreadCount(config.getHttpThreadCount()) .build(); + PoolingAsyncClientConnectionManagerBuilder connectionManager = PoolingAsyncClientConnectionManagerBuilder.create(); + connectionManager.setDefaultConnectionConfig(ConnectionConfig.custom() + .setConnectTimeout(Timeout.of(config.getConnectTimeout().toJavaTime())) + .build()); + connectionManager.setMaxConnPerRoute(config.getMaxHttpConnections()); + connectionManager.setMaxConnTotal(config.getMaxHttpConnections()); + // the client builder passed to the call-back is configured to use system properties, which makes it // impossible to configure concurrency settings, so we need to build a new one from scratch - HttpAsyncClientBuilder clientBuilder = HttpAsyncClientBuilder.create() + HttpAsyncClientBuilder clientBuilder = HttpAsyncClients.custom() .setDefaultRequestConfig(requestConfig) - .setDefaultIOReactorConfig(reactorConfig) - .setMaxConnPerRoute(config.getMaxHttpConnections()) - .setMaxConnTotal(config.getMaxHttpConnections()); + .setIOReactorConfig(reactorConfig); + if (config.isTlsEnabled()) { + ClientTlsStrategyBuilder tlsStrategyBuilder = ClientTlsStrategyBuilder.create(); buildSslContext(config.getKeystorePath(), config.getKeystorePassword(), config.getTrustStorePath(), config.getTruststorePassword()) - .ifPresent(clientBuilder::setSSLContext); + .ifPresent(tlsStrategyBuilder::setSslContext); if (!config.isVerifyHostnames()) { - clientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE); + tlsStrategyBuilder.setHostnameVerifier(NoopHostnameVerifier.INSTANCE); } + + connectionManager.setTlsStrategy(tlsStrategyBuilder.build()); } + clientBuilder.setConnectionManager(connectionManager.build()); + passwordConfig.ifPresent(securityConfig -> { - CredentialsProvider credentials = new BasicCredentialsProvider(); - credentials.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(securityConfig.getUser(), securityConfig.getPassword())); + BasicCredentialsProvider credentials = new BasicCredentialsProvider(); + credentials.setCredentials(new AuthScope(null, -1), new UsernamePasswordCredentials(securityConfig.getUser(), securityConfig.getPassword().toCharArray())); clientBuilder.setDefaultCredentialsProvider(credentials); }); - awsSecurityConfig.ifPresent(securityConfig -> clientBuilder.addInterceptorLast(new AwsRequestSigner( + awsSecurityConfig.ifPresent(securityConfig -> clientBuilder.addRequestInterceptorLast(new AwsRequestSigner( securityConfig.getRegion(), securityConfig.getDeploymentType(), getAwsCredentialsProvider(securityConfig)))); @@ -552,8 +579,7 @@ public String executeQuery(String index, String query) "GET", path, ImmutableMap.of(), - new ByteArrayEntity(query.getBytes(UTF_8)), - new BasicHeader("Content-Type", "application/json"), + new StringEntity(query, ContentType.APPLICATION_JSON, false), new BasicHeader("Accept-Encoding", "application/json")); } catch (IOException e) { @@ -564,7 +590,7 @@ public String executeQuery(String index, String query) try { body = EntityUtils.toString(response.getEntity()); } - catch (IOException e) { + catch (IOException | ParseException e) { throw new TrinoException(OPENSEARCH_INVALID_RESPONSE, e); } @@ -749,7 +775,6 @@ private T doRequest(String path, ResponseHandler handler) private static TrinoException propagate(ResponseException exception) { HttpEntity entity = exception.getResponse().getEntity(); - if (entity != null && entity.getContentType() != null) { try { JsonNode reason = OBJECT_MAPPER.readTree(entity.getContent()).path("error") diff --git a/plugin/trino-opensearch/src/test/java/io/trino/plugin/opensearch/BaseOpenSearchConnectorTest.java b/plugin/trino-opensearch/src/test/java/io/trino/plugin/opensearch/BaseOpenSearchConnectorTest.java index 13ad0104ecd9..dfd215c90227 100644 --- a/plugin/trino-opensearch/src/test/java/io/trino/plugin/opensearch/BaseOpenSearchConnectorTest.java +++ b/plugin/trino-opensearch/src/test/java/io/trino/plugin/opensearch/BaseOpenSearchConnectorTest.java @@ -26,7 +26,7 @@ import io.trino.testing.MaterializedResult; import io.trino.testing.QueryRunner; import io.trino.testing.TestingConnectorBehavior; -import org.apache.http.HttpHost; +import org.apache.hc.core5.http.HttpHost; import org.intellij.lang.annotations.Language; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Disabled; diff --git a/plugin/trino-opensearch/src/test/java/io/trino/plugin/opensearch/OpenSearchQueryRunner.java b/plugin/trino-opensearch/src/test/java/io/trino/plugin/opensearch/OpenSearchQueryRunner.java index 2f8f5767d387..a0dfa5a857a7 100644 --- a/plugin/trino-opensearch/src/test/java/io/trino/plugin/opensearch/OpenSearchQueryRunner.java +++ b/plugin/trino-opensearch/src/test/java/io/trino/plugin/opensearch/OpenSearchQueryRunner.java @@ -27,7 +27,7 @@ import io.trino.testing.QueryRunner; import io.trino.testing.TestingTrinoClient; import io.trino.tpch.TpchTable; -import org.apache.http.HttpHost; +import org.apache.hc.core5.http.HttpHost; import org.opensearch.client.RestClient; import org.opensearch.client.RestHighLevelClient; diff --git a/plugin/trino-opensearch/src/test/java/io/trino/plugin/opensearch/TestOpenSearchComplexTypePredicatePushDown.java b/plugin/trino-opensearch/src/test/java/io/trino/plugin/opensearch/TestOpenSearchComplexTypePredicatePushDown.java index eed568ab1a5e..b8f2b2cdac27 100644 --- a/plugin/trino-opensearch/src/test/java/io/trino/plugin/opensearch/TestOpenSearchComplexTypePredicatePushDown.java +++ b/plugin/trino-opensearch/src/test/java/io/trino/plugin/opensearch/TestOpenSearchComplexTypePredicatePushDown.java @@ -20,7 +20,7 @@ import io.airlift.json.ObjectMapperProvider; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.QueryRunner; -import org.apache.http.HttpHost; +import org.apache.hc.core5.http.HttpHost; import org.intellij.lang.annotations.Language; import org.junit.jupiter.api.Test; import org.opensearch.client.Request; diff --git a/plugin/trino-opensearch/src/test/java/io/trino/plugin/opensearch/TestOpenSearchProjectionPushdownPlans.java b/plugin/trino-opensearch/src/test/java/io/trino/plugin/opensearch/TestOpenSearchProjectionPushdownPlans.java index 84a9fc8b7740..ab60260a3b8c 100644 --- a/plugin/trino-opensearch/src/test/java/io/trino/plugin/opensearch/TestOpenSearchProjectionPushdownPlans.java +++ b/plugin/trino-opensearch/src/test/java/io/trino/plugin/opensearch/TestOpenSearchProjectionPushdownPlans.java @@ -40,7 +40,7 @@ import io.trino.sql.planner.assertions.BasePushdownPlanTest; import io.trino.sql.planner.assertions.PlanMatchPattern; import io.trino.testing.PlanTester; -import org.apache.http.HttpHost; +import org.apache.hc.core5.http.HttpHost; import org.intellij.lang.annotations.Language; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Test; @@ -151,7 +151,7 @@ void testPushdownDisabled() .build(); @Language("JSON") String properties = - """ + """ { "properties": { "col0": { diff --git a/plugin/trino-opensearch/src/test/java/io/trino/plugin/opensearch/TestPasswordAuthentication.java b/plugin/trino-opensearch/src/test/java/io/trino/plugin/opensearch/TestPasswordAuthentication.java index a0a1a624609e..77a72245b80e 100644 --- a/plugin/trino-opensearch/src/test/java/io/trino/plugin/opensearch/TestPasswordAuthentication.java +++ b/plugin/trino-opensearch/src/test/java/io/trino/plugin/opensearch/TestPasswordAuthentication.java @@ -20,8 +20,9 @@ import com.google.common.net.HostAndPort; import io.trino.sql.query.QueryAssertions; import io.trino.testing.QueryRunner; -import org.apache.http.HttpHost; -import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; +import org.apache.hc.client5.http.impl.async.HttpAsyncClientBuilder; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; +import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -43,6 +44,7 @@ import static io.airlift.testing.Closeables.closeAll; import static io.trino.plugin.base.ssl.SslUtils.createSSLContext; import static io.trino.plugin.opensearch.OpenSearchServer.OPENSEARCH_IMAGE; +import static io.trino.plugin.opensearch.client.OpenSearchClient.createHost; import static java.lang.String.format; import static java.nio.charset.StandardCharsets.UTF_8; import static org.assertj.core.api.Assertions.assertThat; @@ -71,7 +73,7 @@ public void setUp() .buildOrThrow()); HostAndPort address = opensearch.getAddress(); - client = new RestHighLevelClient(RestClient.builder(new HttpHost(address.getHost(), address.getPort(), "https")) + client = new RestHighLevelClient(RestClient.builder(createHost("%s:%d".formatted(address.getHost(), address.getPort()), true)) .setHttpClientConfigCallback(this::setupSslContext)); QueryRunner runner = OpenSearchQueryRunner.builder(opensearch.getAddress()) @@ -92,15 +94,20 @@ public void setUp() private HttpAsyncClientBuilder setupSslContext(HttpAsyncClientBuilder clientBuilder) { try { - return clientBuilder.setSSLContext(createSSLContext( + PoolingAsyncClientConnectionManagerBuilder connectionManager = PoolingAsyncClientConnectionManagerBuilder.create(); + ClientTlsStrategyBuilder strategy = ClientTlsStrategyBuilder.create(); + strategy.setSslContext(createSSLContext( Optional.empty(), Optional.empty(), Optional.of(new File(Resources.getResource("truststore.jks").toURI())), Optional.of("123456"))); + connectionManager.setTlsStrategy(strategy.build()); + clientBuilder.setConnectionManager(connectionManager.build()); } catch (GeneralSecurityException | IOException | URISyntaxException e) { throw new RuntimeException(e); } + return clientBuilder; } @AfterAll