Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@
*/
package io.trino.filesystem.s3;

import com.google.common.io.Closer;
import eu.rekawek.toxiproxy.Proxy;
import eu.rekawek.toxiproxy.ToxiproxyClient;
import eu.rekawek.toxiproxy.model.ToxicDirection;
import io.trino.filesystem.Location;
import io.trino.plugin.base.util.AutoCloseableCloser;
import io.trino.testing.containers.Minio;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import io.trino.testing.minio.MinioClient;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.parallel.Execution;
import org.testcontainers.containers.Network;
import org.testcontainers.toxiproxy.ToxiproxyContainer;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
Expand All @@ -32,91 +34,164 @@
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;

import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;

import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.trino.testing.TestingNames.randomNameSuffix;
import static io.trino.testing.containers.Minio.MINIO_API_PORT;
import static java.lang.Math.toIntExact;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_METHOD;
import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@TestInstance(PER_METHOD)
@Execution(value = SAME_THREAD, reason = "to constrain resource usage")
public class TestS3Retries
{
private static final int TOXIPROXY_CONTROL_PORT = 8474;
private static final int MINIO_PROXY_PORT = 1234;
private static final int TEST_DATA_SIZE = 1024;

private AutoCloseableCloser closer = AutoCloseableCloser.create();
private String bucketName;
private MinioClient minioClient;
private Proxy toxiProxy;
private S3Client s3client;

private final Closer closer = Closer.create();

@BeforeAll
@BeforeEach
final void init()
throws IOException
throws Exception
{
Network network = Network.newNetwork();
closer.register(network::close);
Minio minio = Minio.builder()
closer.close();
closer = AutoCloseableCloser.create();

Network network = closer.register(Network.newNetwork());
Minio minio = closer.register(Minio.builder()
.withNetwork(network)
.build();
.build());
minio.start();
minio.createBucket("bucket");
minio.writeFile(getTestData(), "bucket", "object");
closer.register(minio::close);
bucketName = "bucket-" + randomNameSuffix();
minio.createBucket(bucketName);
minioClient = closer.register(minio.createMinioClient());

ToxiproxyContainer toxiproxy = new ToxiproxyContainer("ghcr.io/shopify/toxiproxy:2.5.0")
ToxiproxyContainer toxiproxy = closer.register(new ToxiproxyContainer("ghcr.io/shopify/toxiproxy:2.5.0")
.withExposedPorts(TOXIPROXY_CONTROL_PORT, MINIO_PROXY_PORT)
.withNetwork(network)
.withNetworkAliases("minio");
.withNetwork(network));
toxiproxy.start();
closer.register(toxiproxy::close);

ToxiproxyClient toxiproxyClient = new ToxiproxyClient(toxiproxy.getHost(), toxiproxy.getControlPort());
Proxy proxy = toxiproxyClient.createProxy("minio", "0.0.0.0:" + MINIO_PROXY_PORT, "minio:" + MINIO_API_PORT);
// the number of transferred bytes includes both the response headers (around 570 bytes) and body
proxy.toxics()
.limitData("broken connection", ToxicDirection.DOWNSTREAM, 700);
toxiProxy = toxiproxyClient.createProxy("minio", "0.0.0.0:" + MINIO_PROXY_PORT, "minio:" + MINIO_API_PORT);

s3client = S3Client.builder()
s3client = closer.register(S3Client.builder()
.endpointOverride(URI.create("http://" + toxiproxy.getHost() + ":" + toxiproxy.getMappedPort(MINIO_PROXY_PORT)))
.region(Region.of(Minio.MINIO_REGION))
.forcePathStyle(true)
.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create(Minio.MINIO_ACCESS_KEY, Minio.MINIO_SECRET_KEY)))
// explicitly configure the number of retries
.overrideConfiguration(o -> o.retryStrategy(b -> b.maxAttempts(3)))
.build();
closer.register(s3client::close);
.build());
}

@AfterAll
@AfterEach
final void cleanup()
throws IOException
throws Exception
{
closer.close();
}

private static byte[] getTestData()
@Test
public void testRead()
throws Exception
{
byte[] data = new byte[TEST_DATA_SIZE];
int testDataSize = 1024;
byte[] data = new byte[testDataSize];
Arrays.fill(data, (byte) 1);
return data;
}
minioClient.putObject(bucketName, data, "object");

@Test
public void testRetries()
{
S3Location location = new S3Location(Location.of("s3://bucket/object"));
// the number of transferred bytes includes both the response headers (around 570 bytes) and body
toxiProxy.toxics()
.limitData("broken_connection", ToxicDirection.DOWNSTREAM, 700);

S3Location location = new S3Location(Location.of("s3://%s/object".formatted(bucketName)));
GetObjectRequest request = GetObjectRequest.builder()
.bucket(location.bucket())
.key(location.key())
.build();
S3Input input = new S3Input(location.location(), s3client, request);

byte[] bytes = new byte[TEST_DATA_SIZE];
assertThatThrownBy(() -> input.readFully(0, bytes, 0, TEST_DATA_SIZE)).cause()
byte[] bytes = new byte[testDataSize];
assertThatThrownBy(() -> input.readFully(0, bytes, 0, testDataSize)).cause()
.hasSuppressedException(SdkClientException.create("Request attempt 2 failure: Error reading getObject response"));
assertThatThrownBy(() -> input.readTail(bytes, 0, TEST_DATA_SIZE)).cause()
assertThatThrownBy(() -> input.readTail(bytes, 0, testDataSize)).cause()
.hasSuppressedException(SdkClientException.create("Request attempt 2 failure: Error reading getObject response"));
}

@Test
public void testCreate()
throws Exception
{
S3FileSystemConfig config = new S3FileSystemConfig();
int partSize = toIntExact(config.getStreamingPartSize().toBytes());
S3Context context = new S3Context(
partSize,
config.isRequesterPays(),
S3Context.S3SseContext.of(
config.getSseType(),
config.getSseKmsKeyId(),
config.getSseCustomerKey()),
Optional.empty(),
config.getStorageClass(),
config.getCannedAcl(),
config.isSupportsExclusiveCreate());

// The toxicity is randomized, so iterate for determinism
for (int iteration = 0; iteration < 5; iteration++) {
for (boolean exclusive : List.of(false, true)) {
testCreate(context, exclusive, 0);
testCreate(context, exclusive, 10);
testCreate(context, exclusive, partSize + 1);
}
}
}

private void testCreate(S3Context context, boolean exclusive, int dataSize)
throws Exception
{
String objectKey = "test-exclusive-create-" + randomNameSuffix();
String s3Uri = "s3://%s/%s".formatted(bucketName, objectKey);
S3Location location = new S3Location(Location.of(s3Uri));
S3OutputFile outputFile = new S3OutputFile(directExecutor(), s3client, context, location, Optional.empty());

byte[] data = new byte[dataSize];
ThreadLocalRandom.current().nextBytes(data);
if (exclusive) {
outputFile.createExclusive(data);
}
else {
try (OutputStream outputStream = outputFile.create()) {
outputStream.write(data);
}
}

assertThat(minioClient.getObjectContents(bucketName, objectKey)).as("Object data read back from storage")
.isEqualTo(data);

// Subsequent exclusive creation should fail
assertThatThrownBy(() -> outputFile.createExclusive(data))
.isInstanceOf(java.nio.file.FileAlreadyExistsException.class)
.hasMessage(s3Uri);

// Subsequent non-exclusive create may succeed (or fail cleanly)
try (OutputStream outputStream = outputFile.create()) {
outputStream.write(data);
}

minioClient.removeObject(bucketName, objectKey);
}
}