Skip to content

Allow using S3 CRT client for native S3 filesystem #26212

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lib/trino-filesystem-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@
<artifactId>aws-core</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>aws-crt-client</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>http-client-spi</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public static RetryStrategy getRetryStrategy(RetryMode retryMode)
private boolean supportsExclusiveCreate = true;
private boolean crossRegionAccessEnabled;
private String applicationId = "Trino";
private boolean enableCrtClient;

public String getAwsAccessKey()
{
Expand Down Expand Up @@ -650,4 +651,17 @@ public S3FileSystemConfig setApplicationId(String applicationId)
this.applicationId = applicationId;
return this;
}

public boolean isEnableCrtClient()
{
return enableCrtClient;
}

@Config("s3.use-crt-client")
@ConfigDescription("Use AWS SDK for Java 2.x CRT client for S3 operations")
public S3FileSystemConfig setEnableCrtClient(boolean enableCrtClient)
{
this.enableCrtClient = enableCrtClient;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.filesystem.s3;

import com.google.inject.Inject;
import io.airlift.units.Duration;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkTelemetry;
import io.trino.filesystem.Location;
Expand All @@ -29,6 +30,7 @@
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.apache.ProxyConfiguration;
import software.amazon.awssdk.http.crt.AwsCrtHttpClient;
import software.amazon.awssdk.metrics.MetricPublisher;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.LegacyMd5Plugin;
Expand Down Expand Up @@ -294,6 +296,33 @@ private static ClientOverrideConfiguration createOverrideConfiguration(OpenTelem

private static SdkHttpClient createHttpClient(S3FileSystemConfig config)
{
if (config.isEnableCrtClient()) {
AwsCrtHttpClient.Builder builder = AwsCrtHttpClient
.builder()
.maxConcurrency(config.getMaxConnections());

config.getSocketConnectTimeout()
.map(Duration::toJavaTime)
.ifPresent(builder::connectionTimeout);

config.getConnectionMaxIdleTime()
.map(Duration::toJavaTime)
.ifPresent(builder::connectionMaxIdleTime);

if (config.getHttpProxy() != null) {
builder.proxyConfiguration(proxyBuilder -> {
proxyBuilder.host(config.getHttpProxy().getHost());
proxyBuilder.port(config.getHttpProxy().getPort());
proxyBuilder.scheme(config.isHttpProxySecure() ? "https" : "http");
proxyBuilder.username(config.getHttpProxyUsername());
proxyBuilder.password(config.getHttpProxyPassword());
proxyBuilder.nonProxyHosts(config.getNonProxyHosts());
});
}

return builder.build();
}

ApacheHttpClient.Builder client = ApacheHttpClient.builder()
.maxConnections(config.getMaxConnections())
.tcpKeepAlive(config.getTcpKeepAlive());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ public int read()
public int read(byte[] bytes, int offset, int length)
throws IOException
{
if (offset < 0) {
throw new IndexOutOfBoundsException("Offset cannot be negative: " + offset);
}
ensureOpen();
seekStream(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ protected S3FileSystemFactory createS3FileSystemFactory()
.setEndpoint(endpoint)
.setSupportsExclusiveCreate(true)
.setSignerType(S3FileSystemConfig.SignerType.AwsS3V4Signer)
.setStreamingPartSize(DataSize.valueOf("5.5MB")), new S3FileSystemStats());
.setStreamingPartSize(DataSize.valueOf("5.5MB"))
.setEnableCrtClient(true), new S3FileSystemStats());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ public void testDefaults()
.setHttpProxyPreemptiveBasicProxyAuth(false)
.setSupportsExclusiveCreate(true)
.setCrossRegionAccessEnabled(false)
.setApplicationId("Trino"));
.setApplicationId("Trino")
.setEnableCrtClient(false));
}

@Test
Expand Down Expand Up @@ -120,6 +121,7 @@ public void testExplicitPropertyMappings()
.put("s3.exclusive-create", "false")
.put("s3.application-id", "application id")
.put("s3.cross-region-access", "true")
.put("s3.use-crt-client", "true")
.buildOrThrow();

S3FileSystemConfig expected = new S3FileSystemConfig()
Expand Down Expand Up @@ -158,7 +160,8 @@ public void testExplicitPropertyMappings()
.setHttpProxyPreemptiveBasicProxyAuth(true)
.setSupportsExclusiveCreate(false)
.setCrossRegionAccessEnabled(true)
.setApplicationId("application id");
.setApplicationId("application id")
.setEnableCrtClient(true);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ protected S3FileSystemFactory createS3FileSystemFactory()
.setAwsAccessKey(Minio.MINIO_ACCESS_KEY)
.setAwsSecretKey(Minio.MINIO_SECRET_KEY)
.setSupportsExclusiveCreate(true)
.setEnableCrtClient(true)
.setStreamingPartSize(DataSize.valueOf("5.5MB")), new S3FileSystemStats());
}

Expand Down