diff --git a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3Retries.java b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3Retries.java index e82522b8dbde..fc256ee8746e 100644 --- a/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3Retries.java +++ b/lib/trino-filesystem-s3/src/test/java/io/trino/filesystem/s3/TestS3Retries.java @@ -13,16 +13,18 @@ */ package io.trino.filesystem.s3; -import com.google.common.io.Closer; import eu.rekawek.toxiproxy.Proxy; import eu.rekawek.toxiproxy.ToxiproxyClient; import eu.rekawek.toxiproxy.model.ToxicDirection; import io.trino.filesystem.Location; +import io.trino.plugin.base.util.AutoCloseableCloser; import io.trino.testing.containers.Minio; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; +import io.trino.testing.minio.MinioClient; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.parallel.Execution; import org.testcontainers.containers.Network; import org.testcontainers.toxiproxy.ToxiproxyContainer; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; @@ -32,52 +34,60 @@ import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.GetObjectRequest; -import java.io.IOException; +import java.io.OutputStream; import java.net.URI; import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ThreadLocalRandom; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static io.trino.testing.TestingNames.randomNameSuffix; import static io.trino.testing.containers.Minio.MINIO_API_PORT; +import static java.lang.Math.toIntExact; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_METHOD; +import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; -@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@TestInstance(PER_METHOD) +@Execution(value = SAME_THREAD, reason = "to constrain resource usage") public class TestS3Retries { private static final int TOXIPROXY_CONTROL_PORT = 8474; private static final int MINIO_PROXY_PORT = 1234; - private static final int TEST_DATA_SIZE = 1024; + private AutoCloseableCloser closer = AutoCloseableCloser.create(); + private String bucketName; + private MinioClient minioClient; + private Proxy toxiProxy; private S3Client s3client; - private final Closer closer = Closer.create(); - - @BeforeAll + @BeforeEach final void init() - throws IOException + throws Exception { - Network network = Network.newNetwork(); - closer.register(network::close); - Minio minio = Minio.builder() + closer.close(); + closer = AutoCloseableCloser.create(); + + Network network = closer.register(Network.newNetwork()); + Minio minio = closer.register(Minio.builder() .withNetwork(network) - .build(); + .build()); minio.start(); - minio.createBucket("bucket"); - minio.writeFile(getTestData(), "bucket", "object"); - closer.register(minio::close); + bucketName = "bucket-" + randomNameSuffix(); + minio.createBucket(bucketName); + minioClient = closer.register(minio.createMinioClient()); - ToxiproxyContainer toxiproxy = new ToxiproxyContainer("ghcr.io/shopify/toxiproxy:2.5.0") + ToxiproxyContainer toxiproxy = closer.register(new ToxiproxyContainer("ghcr.io/shopify/toxiproxy:2.5.0") .withExposedPorts(TOXIPROXY_CONTROL_PORT, MINIO_PROXY_PORT) - .withNetwork(network) - .withNetworkAliases("minio"); + .withNetwork(network)); toxiproxy.start(); - closer.register(toxiproxy::close); ToxiproxyClient toxiproxyClient = new ToxiproxyClient(toxiproxy.getHost(), toxiproxy.getControlPort()); - Proxy proxy = toxiproxyClient.createProxy("minio", "0.0.0.0:" + MINIO_PROXY_PORT, "minio:" + MINIO_API_PORT); - // the number of transferred bytes includes both the response headers (around 570 bytes) and body - proxy.toxics() - .limitData("broken connection", ToxicDirection.DOWNSTREAM, 700); + toxiProxy = toxiproxyClient.createProxy("minio", "0.0.0.0:" + MINIO_PROXY_PORT, "minio:" + MINIO_API_PORT); - s3client = S3Client.builder() + s3client = closer.register(S3Client.builder() .endpointOverride(URI.create("http://" + toxiproxy.getHost() + ":" + toxiproxy.getMappedPort(MINIO_PROXY_PORT))) .region(Region.of(Minio.MINIO_REGION)) .forcePathStyle(true) @@ -85,38 +95,103 @@ final void init() AwsBasicCredentials.create(Minio.MINIO_ACCESS_KEY, Minio.MINIO_SECRET_KEY))) // explicitly configure the number of retries .overrideConfiguration(o -> o.retryStrategy(b -> b.maxAttempts(3))) - .build(); - closer.register(s3client::close); + .build()); } - @AfterAll + @AfterEach final void cleanup() - throws IOException + throws Exception { closer.close(); } - private static byte[] getTestData() + @Test + public void testRead() + throws Exception { - byte[] data = new byte[TEST_DATA_SIZE]; + int testDataSize = 1024; + byte[] data = new byte[testDataSize]; Arrays.fill(data, (byte) 1); - return data; - } + minioClient.putObject(bucketName, data, "object"); - @Test - public void testRetries() - { - S3Location location = new S3Location(Location.of("s3://bucket/object")); + // the number of transferred bytes includes both the response headers (around 570 bytes) and body + toxiProxy.toxics() + .limitData("broken_connection", ToxicDirection.DOWNSTREAM, 700); + + S3Location location = new S3Location(Location.of("s3://%s/object".formatted(bucketName))); GetObjectRequest request = GetObjectRequest.builder() .bucket(location.bucket()) .key(location.key()) .build(); S3Input input = new S3Input(location.location(), s3client, request); - byte[] bytes = new byte[TEST_DATA_SIZE]; - assertThatThrownBy(() -> input.readFully(0, bytes, 0, TEST_DATA_SIZE)).cause() + byte[] bytes = new byte[testDataSize]; + assertThatThrownBy(() -> input.readFully(0, bytes, 0, testDataSize)).cause() .hasSuppressedException(SdkClientException.create("Request attempt 2 failure: Error reading getObject response")); - assertThatThrownBy(() -> input.readTail(bytes, 0, TEST_DATA_SIZE)).cause() + assertThatThrownBy(() -> input.readTail(bytes, 0, testDataSize)).cause() .hasSuppressedException(SdkClientException.create("Request attempt 2 failure: Error reading getObject response")); } + + @Test + public void testCreate() + throws Exception + { + S3FileSystemConfig config = new S3FileSystemConfig(); + int partSize = toIntExact(config.getStreamingPartSize().toBytes()); + S3Context context = new S3Context( + partSize, + config.isRequesterPays(), + S3Context.S3SseContext.of( + config.getSseType(), + config.getSseKmsKeyId(), + config.getSseCustomerKey()), + Optional.empty(), + config.getStorageClass(), + config.getCannedAcl(), + config.isSupportsExclusiveCreate()); + + // The toxicity is randomized, so iterate for determinism + for (int iteration = 0; iteration < 5; iteration++) { + for (boolean exclusive : List.of(false, true)) { + testCreate(context, exclusive, 0); + testCreate(context, exclusive, 10); + testCreate(context, exclusive, partSize + 1); + } + } + } + + private void testCreate(S3Context context, boolean exclusive, int dataSize) + throws Exception + { + String objectKey = "test-exclusive-create-" + randomNameSuffix(); + String s3Uri = "s3://%s/%s".formatted(bucketName, objectKey); + S3Location location = new S3Location(Location.of(s3Uri)); + S3OutputFile outputFile = new S3OutputFile(directExecutor(), s3client, context, location, Optional.empty()); + + byte[] data = new byte[dataSize]; + ThreadLocalRandom.current().nextBytes(data); + if (exclusive) { + outputFile.createExclusive(data); + } + else { + try (OutputStream outputStream = outputFile.create()) { + outputStream.write(data); + } + } + + assertThat(minioClient.getObjectContents(bucketName, objectKey)).as("Object data read back from storage") + .isEqualTo(data); + + // Subsequent exclusive creation should fail + assertThatThrownBy(() -> outputFile.createExclusive(data)) + .isInstanceOf(java.nio.file.FileAlreadyExistsException.class) + .hasMessage(s3Uri); + + // Subsequent non-exclusive create may succeed (or fail cleanly) + try (OutputStream outputStream = outputFile.create()) { + outputStream.write(data); + } + + minioClient.removeObject(bucketName, objectKey); + } }