From b294bd02a8bd698499cad8e2927bd0ab045890c5 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Wed, 19 Nov 2025 14:14:58 +0100 Subject: [PATCH 1/3] Cleanup TestS3Retries test - register resources with closer as early as possible - use toxic name without spaces; it seems that names with spaces don't work correctly - drop redundant container configuration (network alias) --- .../io/trino/filesystem/s3/TestS3Retries.java | 32 ++++++++----------- 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3Retries.java b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3Retries.java index e82522b8dbde..29cdc3bbd487 100644 --- a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3Retries.java +++ b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3Retries.java @@ -13,11 +13,11 @@ */ package io.trino.filesystem.s3; -import com.google.common.io.Closer; import eu.rekawek.toxiproxy.Proxy; import eu.rekawek.toxiproxy.ToxiproxyClient; import eu.rekawek.toxiproxy.model.ToxicDirection; import io.trino.filesystem.Location; +import io.trino.plugin.base.util.AutoCloseableCloser; import io.trino.testing.containers.Minio; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -38,8 +38,9 @@ import static io.trino.testing.containers.Minio.MINIO_API_PORT; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; -@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@TestInstance(PER_CLASS) public class TestS3Retries { private static final int TOXIPROXY_CONTROL_PORT = 8474; @@ -48,36 +49,32 @@ public class TestS3Retries private S3Client s3client; - private final Closer closer = Closer.create(); + private final AutoCloseableCloser closer = AutoCloseableCloser.create(); @BeforeAll final void init() throws IOException { - Network network = Network.newNetwork(); - closer.register(network::close); - Minio minio = Minio.builder() + Network network = closer.register(Network.newNetwork()); + Minio minio = closer.register(Minio.builder() .withNetwork(network) - .build(); + .build()); minio.start(); minio.createBucket("bucket"); minio.writeFile(getTestData(), "bucket", "object"); - closer.register(minio::close); - ToxiproxyContainer toxiproxy = new ToxiproxyContainer("ghcr.io/shopify/toxiproxy:2.5.0") + ToxiproxyContainer toxiproxy = closer.register(new ToxiproxyContainer("ghcr.io/shopify/toxiproxy:2.5.0") .withExposedPorts(TOXIPROXY_CONTROL_PORT, MINIO_PROXY_PORT) - .withNetwork(network) - .withNetworkAliases("minio"); + .withNetwork(network)); toxiproxy.start(); - closer.register(toxiproxy::close); ToxiproxyClient toxiproxyClient = new ToxiproxyClient(toxiproxy.getHost(), toxiproxy.getControlPort()); Proxy proxy = toxiproxyClient.createProxy("minio", "0.0.0.0:" + MINIO_PROXY_PORT, "minio:" + MINIO_API_PORT); // the number of transferred bytes includes both the response headers (around 570 bytes) and body proxy.toxics() - .limitData("broken connection", ToxicDirection.DOWNSTREAM, 700); + .limitData("broken_connection", ToxicDirection.DOWNSTREAM, 700); - s3client = S3Client.builder() + s3client = closer.register(S3Client.builder() .endpointOverride(URI.create("http://" + toxiproxy.getHost() + ":" + toxiproxy.getMappedPort(MINIO_PROXY_PORT))) .region(Region.of(Minio.MINIO_REGION)) .forcePathStyle(true) @@ -85,13 +82,12 @@ final void init() AwsBasicCredentials.create(Minio.MINIO_ACCESS_KEY, Minio.MINIO_SECRET_KEY))) // explicitly configure the number of retries .overrideConfiguration(o -> o.retryStrategy(b -> b.maxAttempts(3))) - .build(); - closer.register(s3client::close); + .build()); } @AfterAll final void cleanup() - throws IOException + throws Exception { closer.close(); } @@ -106,7 +102,7 @@ private static byte[] getTestData() @Test public void testRetries() { - S3Location location = new S3Location(Location.of("s3://bucket/object")); + S3Location location = new S3Location(Location.of("s3://%s/object".formatted("bucket"))); GetObjectRequest request = GetObjectRequest.builder() .bucket(location.bucket()) .key(location.key()) From b8694c7485f2f46353e894de500d26206ca1d846 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Wed, 19 Nov 2025 14:25:12 +0100 Subject: [PATCH 2/3] Refactor TestS3Retries to accomodate future tests Network chaos (toxiproxy) setup is specific to assertions going to be performed. While we could test different setups and assertions with separate similar classes, this would be a lot of similar boilerplate code. This commit refactor the test class so that the test method configures network breakers. --- .../io/trino/filesystem/s3/TestS3Retries.java | 65 +++++++++++-------- 1 file changed, 37 insertions(+), 28 deletions(-) diff --git a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3Retries.java b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3Retries.java index 29cdc3bbd487..eb9f81a0b286 100644 --- a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3Retries.java +++ b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3Retries.java @@ -19,10 +19,12 @@ import io.trino.filesystem.Location; import io.trino.plugin.base.util.AutoCloseableCloser; import io.trino.testing.containers.Minio; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; +import io.trino.testing.minio.MinioClient; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.parallel.Execution; import org.testcontainers.containers.Network; import org.testcontainers.toxiproxy.ToxiproxyContainer; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; @@ -32,36 +34,43 @@ import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; -import java.io.IOException; import java.net.URI; import java.util.Arrays; +import static io.trino.testing.TestingNames.randomNameSuffix; import static io.trino.testing.containers.Minio.MINIO_API_PORT; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_METHOD; +import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; -@TestInstance(PER_CLASS) +@TestInstance(PER_METHOD) +@Execution(value = SAME_THREAD, reason = "to constrain resource usage") public class TestS3Retries { private static final int TOXIPROXY_CONTROL_PORT = 8474; private static final int MINIO_PROXY_PORT = 1234; - private static final int TEST_DATA_SIZE = 1024; + private AutoCloseableCloser closer = AutoCloseableCloser.create(); + private String bucketName; + private MinioClient minioClient; + private Proxy toxiProxy; private S3Client s3client; - private final AutoCloseableCloser closer = AutoCloseableCloser.create(); - - @BeforeAll + @BeforeEach final void init() - throws IOException + throws Exception { + closer.close(); + closer = AutoCloseableCloser.create(); + Network network = closer.register(Network.newNetwork()); Minio minio = closer.register(Minio.builder() .withNetwork(network) .build()); minio.start(); - minio.createBucket("bucket"); - minio.writeFile(getTestData(), "bucket", "object"); + bucketName = "bucket-" + randomNameSuffix(); + minio.createBucket(bucketName); + minioClient = closer.register(minio.createMinioClient()); ToxiproxyContainer toxiproxy = closer.register(new ToxiproxyContainer("ghcr.io/shopify/toxiproxy:2.5.0") .withExposedPorts(TOXIPROXY_CONTROL_PORT, MINIO_PROXY_PORT) @@ -69,10 +78,7 @@ final void init() toxiproxy.start(); ToxiproxyClient toxiproxyClient = new ToxiproxyClient(toxiproxy.getHost(), toxiproxy.getControlPort()); - Proxy proxy = toxiproxyClient.createProxy("minio", "0.0.0.0:" + MINIO_PROXY_PORT, "minio:" + MINIO_API_PORT); - // the number of transferred bytes includes both the response headers (around 570 bytes) and body - proxy.toxics() - .limitData("broken_connection", ToxicDirection.DOWNSTREAM, 700); + toxiProxy = toxiproxyClient.createProxy("minio", "0.0.0.0:" + MINIO_PROXY_PORT, "minio:" + MINIO_API_PORT); s3client = closer.register(S3Client.builder() .endpointOverride(URI.create("http://" + toxiproxy.getHost() + ":" + toxiproxy.getMappedPort(MINIO_PROXY_PORT))) @@ -85,34 +91,37 @@ final void init() .build()); } - @AfterAll + @AfterEach final void cleanup() throws Exception { closer.close(); } - private static byte[] getTestData() + @Test + public void testRead() + throws Exception { - byte[] data = new byte[TEST_DATA_SIZE]; + int testDataSize = 1024; + byte[] data = new byte[testDataSize]; Arrays.fill(data, (byte) 1); - return data; - } + minioClient.putObject(bucketName, data, "object"); - @Test - public void testRetries() - { - S3Location location = new S3Location(Location.of("s3://%s/object".formatted("bucket"))); + // the number of transferred bytes includes both the response headers (around 570 bytes) and body + toxiProxy.toxics() + .limitData("broken_connection", ToxicDirection.DOWNSTREAM, 700); + + S3Location location = new S3Location(Location.of("s3://%s/object".formatted(bucketName))); GetObjectRequest request = GetObjectRequest.builder() .bucket(location.bucket()) .key(location.key()) .build(); S3Input input = new S3Input(location.location(), s3client, request); - byte[] bytes = new byte[TEST_DATA_SIZE]; - assertThatThrownBy(() -> input.readFully(0, bytes, 0, TEST_DATA_SIZE)).cause() + byte[] bytes = new byte[testDataSize]; + assertThatThrownBy(() -> input.readFully(0, bytes, 0, testDataSize)).cause() .hasSuppressedException(SdkClientException.create("Request attempt 2 failure: Error reading getObject response")); - assertThatThrownBy(() -> input.readTail(bytes, 0, TEST_DATA_SIZE)).cause() + assertThatThrownBy(() -> input.readTail(bytes, 0, testDataSize)).cause() .hasSuppressedException(SdkClientException.create("Request attempt 2 failure: Error reading getObject response")); } } From 85bf30cb1d8ab0911086c002ae694d40e13b21b2 Mon Sep 17 00:00:00 2001 From: Piotr Findeisen Date: Wed, 19 Nov 2025 15:26:03 +0100 Subject: [PATCH 3/3] Test S3 file writing retries This attempts to reproduce a problem that was observed on a production environment where S3 client side retries combined with `s3.exclusive-create=true` flag (default) where believed to cause failures during Iceberg maintenance procedures. This test did not reproduce that problem, but might be useful anyway. --- .../io/trino/filesystem/s3/TestS3Retries.java | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3Retries.java b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3Retries.java index eb9f81a0b286..fc256ee8746e 100644 --- a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3Retries.java +++ b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3Retries.java @@ -34,11 +34,18 @@ import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import java.io.OutputStream; import java.net.URI; import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ThreadLocalRandom; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.trino.testing.TestingNames.randomNameSuffix; import static io.trino.testing.containers.Minio.MINIO_API_PORT; +import static java.lang.Math.toIntExact; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_METHOD; import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; @@ -124,4 +131,67 @@ public void testRead() assertThatThrownBy(() -> input.readTail(bytes, 0, testDataSize)).cause() .hasSuppressedException(SdkClientException.create("Request attempt 2 failure: Error reading getObject response")); } + + @Test + public void testCreate() + throws Exception + { + S3FileSystemConfig config = new S3FileSystemConfig(); + int partSize = toIntExact(config.getStreamingPartSize().toBytes()); + S3Context context = new S3Context( + partSize, + config.isRequesterPays(), + S3Context.S3SseContext.of( + config.getSseType(), + config.getSseKmsKeyId(), + config.getSseCustomerKey()), + Optional.empty(), + config.getStorageClass(), + config.getCannedAcl(), + config.isSupportsExclusiveCreate()); + + // The toxicity is randomized, so iterate for determinism + for (int iteration = 0; iteration < 5; iteration++) { + for (boolean exclusive : List.of(false, true)) { + testCreate(context, exclusive, 0); + testCreate(context, exclusive, 10); + testCreate(context, exclusive, partSize + 1); + } + } + } + + private void testCreate(S3Context context, boolean exclusive, int dataSize) + throws Exception + { + String objectKey = "test-exclusive-create-" + randomNameSuffix(); + String s3Uri = "s3://%s/%s".formatted(bucketName, objectKey); + S3Location location = new S3Location(Location.of(s3Uri)); + S3OutputFile outputFile = new S3OutputFile(directExecutor(), s3client, context, location, Optional.empty()); + + byte[] data = new byte[dataSize]; + ThreadLocalRandom.current().nextBytes(data); + if (exclusive) { + outputFile.createExclusive(data); + } + else { + try (OutputStream outputStream = outputFile.create()) { + outputStream.write(data); + } + } + + assertThat(minioClient.getObjectContents(bucketName, objectKey)).as("Object data read back from storage") + .isEqualTo(data); + + // Subsequent exclusive creation should fail + assertThatThrownBy(() -> outputFile.createExclusive(data)) + .isInstanceOf(java.nio.file.FileAlreadyExistsException.class) + .hasMessage(s3Uri); + + // Subsequent non-exclusive create may succeed (or fail cleanly) + try (OutputStream outputStream = outputFile.create()) { + outputStream.write(data); + } + + minioClient.removeObject(bucketName, objectKey); + } }