Skip to content

Commit dffa947

Browse files
committed
Respect range parameter for getObject in presign aware S3 clients
If present, the range parameter from GetObjectRequest should be added to PresignedUrlDownloadRequest, so that the client gets the expected bytes.
1 parent 15ccf77 commit dffa947

File tree

3 files changed

+35
-3
lines changed

3 files changed

+35
-3
lines changed

trino-aws-proxy-spark3/src/main/java/io/trino/aws/proxy/spark3/PresignAwareAmazonS3.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,8 @@ public S3Object getObject(GetObjectRequest getObjectRequest)
626626
return getPresignedUrl("GET", getObjectRequest.getBucketName(), getObjectRequest.getKey())
627627
.map(presigned -> {
628628
PresignedUrlDownloadRequest presignedUrlDownloadRequest = new PresignedUrlDownloadRequest(presigned.url);
629+
Optional.ofNullable(getObjectRequest.getRange())
630+
.ifPresent(range -> presignedUrlDownloadRequest.withRange(range[0], range[1]));
629631
return delegate.download(presignedUrlDownloadRequest).getS3Object();
630632
})
631633
.orElseGet(() -> delegate.getObject(getObjectRequest));
@@ -638,6 +640,8 @@ public ObjectMetadata getObject(GetObjectRequest getObjectRequest, File destinat
638640
return getPresignedUrl("GET", getObjectRequest.getBucketName(), getObjectRequest.getKey())
639641
.map(presigned -> {
640642
PresignedUrlDownloadRequest presignedUrlDownloadRequest = new PresignedUrlDownloadRequest(presigned.url);
643+
Optional.ofNullable(getObjectRequest.getRange())
644+
.ifPresent(range -> presignedUrlDownloadRequest.withRange(range[0], range[1]));
641645
delegate.download(presignedUrlDownloadRequest, destinationFile);
642646
return presigned.objectMetadata;
643647
})

trino-aws-proxy/src/test/java/io/trino/aws/proxy/server/TestDatabaseSecurity.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,9 @@ public TestDatabaseSecurity(S3Client s3Client, PySparkContainer pySparkContainer
103103
public void testDatabaseSecurity()
104104
throws Exception
105105
{
106+
// create the test bucket
107+
s3Client.createBucket(r -> r.bucket("test"));
108+
106109
createDatabaseAndTable(s3Client, pySparkContainer);
107110

108111
clearInputStreamAndClose(inputToContainerStdin(pySparkContainer.containerId(), "spark.sql(\"select * from %s.%s\").show()".formatted(DATABASE_NAME, TABLE_NAME)), line -> line.equals("| John Galt| 28|"));

trino-aws-proxy/src/test/java/io/trino/aws/proxy/server/TestPySparkSql.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.trino.aws.proxy.server.testing.containers.PySparkContainer;
2020
import io.trino.aws.proxy.server.testing.harness.BuilderFilter;
2121
import io.trino.aws.proxy.server.testing.harness.TrinoAwsProxyTest;
22+
import org.junit.jupiter.api.BeforeAll;
2223
import org.junit.jupiter.api.Test;
2324
import software.amazon.awssdk.services.s3.S3Client;
2425

@@ -54,6 +55,13 @@ public TestPySparkSql(S3Client s3Client, PySparkContainer pySparkContainer)
5455
this.pySparkContainer = requireNonNull(pySparkContainer, "pySparkContainer is null");
5556
}
5657

58+
@BeforeAll
59+
public void setupBucket()
60+
{
61+
// create the test bucket
62+
s3Client.createBucket(r -> r.bucket("test"));
63+
}
64+
5765
@Test
5866
public void testSql()
5967
throws Exception
@@ -75,12 +83,29 @@ public void testSql()
7583
"spark.sql(\"select * from %s.%s\").show()".formatted(DATABASE_NAME, TABLE_NAME)), line -> line.equals("| c| 30|"));
7684
}
7785

78-
public static void createDatabaseAndTable(S3Client s3Client, PySparkContainer container)
86+
@Test
87+
public void testParquet()
7988
throws Exception
8089
{
81-
// create the test bucket
82-
s3Client.createBucket(r -> r.bucket("test"));
90+
// upload a CSV file
91+
s3Client.putObject(r -> r.bucket("test").key("test_parquet/file.csv"), Path.of(Resources.getResource("test.csv").toURI()));
8392

93+
// read the CSV file and write it as Parquet
94+
clearInputStreamAndClose(inputToContainerStdin(pySparkContainer.containerId(), """
95+
df = spark.read.csv("s3a://test/test_parquet/file.csv")
96+
df.write.parquet("s3a://test/test_parquet/file.parquet")
97+
"""), line -> line.equals(">>> ") || line.matches(".*Write Job [\\w-]+ committed.*"));
98+
99+
// read the Parquet file
100+
clearInputStreamAndClose(inputToContainerStdin(pySparkContainer.containerId(), """
101+
parquetDF = spark.read.parquet("s3a://test/test_parquet/file.parquet")
102+
parquetDF.show()
103+
"""), line -> line.equals("| John Galt| 28|"));
104+
}
105+
106+
public static void createDatabaseAndTable(S3Client s3Client, PySparkContainer container)
107+
throws Exception
108+
{
84109
// upload a CSV file as a potential table
85110
s3Client.putObject(r -> r.bucket("test").key("table/file.csv"), Path.of(Resources.getResource("test.csv").toURI()));
86111

0 commit comments

Comments
 (0)