diff --git a/ts-segment-uploader/pom.xml b/ts-segment-uploader/pom.xml index c161047..d115fa8 100644 --- a/ts-segment-uploader/pom.xml +++ b/ts-segment-uploader/pom.xml @@ -181,7 +181,7 @@ org.apache.maven.plugins maven-surefire-plugin - ${maven.surefire.version} + 3.0.0-M5 1 false diff --git a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/DirectoryTreeWatcher.java b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/DirectoryTreeWatcher.java index a2228f8..ec8a03e 100644 --- a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/DirectoryTreeWatcher.java +++ b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/DirectoryTreeWatcher.java @@ -15,10 +15,15 @@ import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; import java.nio.file.StandardWatchEventKinds; import java.nio.file.WatchEvent; import java.nio.file.WatchKey; import java.nio.file.WatchService; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -51,6 +56,7 @@ */ public class DirectoryTreeWatcher implements Runnable { private static final Logger LOG = LogManager.getLogger(DirectoryTreeWatcher.class); + private static final Logger FAILED_UPLOADS_LOG = LogManager.getLogger("failed-uploads"); private static final String[] MONITORED_EXTENSIONS = {".timeindex", ".index", ".log"}; private static final Pattern MONITORED_FILE_PATTERN = Pattern.compile("^\\d+(" + String.join("|", MONITORED_EXTENSIONS) + ")$"); private static Map activeSegment; @@ -75,6 +81,7 @@ public class DirectoryTreeWatcher implements Runnable { private final Object watchKeyMapLock = new Object(); private Thread thread; private boolean cancelled = false; + private final Object failedUploadFileLock = new Object(); public static void setLeadershipWatcher(LeadershipWatcher suppliedLeadershipWatcher) { if (leadershipWatcher == null) @@ -142,7 +149,8 @@ public void initialize() throws Exception { LOG.info("Submitting s3UploadHandler loop"); } - private void handleUploadCallback(UploadTask uploadTask, long totalTimeMs, Throwable throwable, int statusCode) { + @VisibleForTesting + protected void handleUploadCallback(UploadTask uploadTask, long totalTimeMs, Throwable throwable, int statusCode) { TopicPartition topicPartition = uploadTask.getTopicPartition(); MetricRegistryManager.getInstance(config.getMetricsConfiguration()).incrementCounter( topicPartition.topic(), @@ -315,24 +323,65 @@ private void handleUploadException(UploadTask uploadTask, Throwable throwable, T "broker=" + environmentProvider.brokerId(), "file=" + uploadTask.getFullFilename() ); + handleFailedUploadAfterAllRetries(uploadTask, throwable, topicPartition); } } else if (uploadTask.getTries() < config.getUploadMaxRetries()){ // retry all other errors retryUpload(uploadTask.retry(), throwable, topicPartition); } else { - // retry limit reached, upload is still erroring - send a metric - MetricRegistryManager.getInstance(config.getMetricsConfiguration()).incrementCounter( - topicPartition.topic(), - topicPartition.partition(), - UploaderMetrics.UPLOAD_ERROR_METRIC, - "exception=" + throwable.getClass().getName(), - "cluster=" + environmentProvider.clusterId(), - "broker=" + environmentProvider.brokerId(), - "offset=" + uploadTask.getOffset() - ); + // retry limit reached, upload still errors + handleFailedUploadAfterAllRetries(uploadTask, throwable, topicPartition); } } + /** + * Handle a failed upload after all retries have been exhausted. + * + * @param uploadTask the upload task that failed + * @param throwable the exception that caused the failure + * @param topicPartition the topic partition of the upload task + */ + private void handleFailedUploadAfterAllRetries(UploadTask uploadTask, Throwable throwable, TopicPartition topicPartition) { + LOG.error(String.format("Failed to upload file %s to %s after reaching max %s retries.", + uploadTask.getAbsolutePath(), uploadTask.getUploadDestinationPathString(), uploadTask.getTries())); + if (config.getUploadFailureFile() != null) { + synchronized (failedUploadFileLock) { + LOG.info(String.format("Writing failed upload %s --> %s to failure file: %s", + uploadTask.getAbsolutePath(), uploadTask.getUploadDestinationPathString(), config.getUploadFailureFile())); + try { + long timestamp = System.currentTimeMillis(); + LocalDateTime dt = LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()); + Files.write( + Paths.get(config.getUploadFailureFile()), + Arrays.asList( + "timestamp: " + timestamp, + "human_timestamp: " + dt.format(DateTimeFormatter.ISO_DATE_TIME), + "task_num_retries: " + uploadTask.getTries(), + "local_path: " + uploadTask.getAbsolutePath(), + "destination_path: " + uploadTask.getUploadDestinationPathString(), + "exception: " + throwable.getClass().getName(), + "message: " + throwable.getMessage(), + "-------------------" + ), + StandardOpenOption.CREATE, + StandardOpenOption.APPEND + ); + } catch (IOException e) { + LOG.error("Failed to write failed upload to failure file", e); + } + } + } + MetricRegistryManager.getInstance(config.getMetricsConfiguration()).incrementCounter( + topicPartition.topic(), + topicPartition.partition(), + UploaderMetrics.UPLOAD_ERROR_METRIC, + "exception=" + throwable.getClass().getName(), + "cluster=" + environmentProvider.clusterId(), + "broker=" + environmentProvider.brokerId(), + "offset=" + uploadTask.getOffset() + ); + } + /** * Retry the upload of a file that failed to upload. * @param uploadTask @@ -952,6 +1001,7 @@ public static class UploadTask { private final long sizeBytes; private int tries = 0; private long nextRetryNotBeforeTimestamp = -1; + private String uploadDestinationPathString; public UploadTask(TopicPartition topicPartition, String offset, String fullFilename, Path absolutePath) { this.topicPartition = topicPartition; @@ -1012,6 +1062,14 @@ public long getNextRetryNotBeforeTimestamp() { return nextRetryNotBeforeTimestamp; } + public String getUploadDestinationPathString() { + return uploadDestinationPathString; + } + + public void setUploadDestinationPathString(String uploadDestinationPathString) { + this.uploadDestinationPathString = uploadDestinationPathString; + } + public boolean isReadyForUpload() { return tries == 0 || System.currentTimeMillis() > nextRetryNotBeforeTimestamp; } diff --git a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/MultiThreadedS3FileUploader.java b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/MultiThreadedS3FileUploader.java index 3b796ac..3620ebc 100644 --- a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/MultiThreadedS3FileUploader.java +++ b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/MultiThreadedS3FileUploader.java @@ -5,15 +5,17 @@ import com.pinterest.kafka.tieredstorage.common.discovery.s3.S3StorageServiceEndpoint; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; -import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.exception.ApiCallTimeoutException; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import java.nio.file.NoSuchFileException; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** @@ -22,24 +24,27 @@ */ public class MultiThreadedS3FileUploader implements S3FileUploader { private static final Logger LOG = LogManager.getLogger(MultiThreadedS3FileUploader.class); - private static final int UPLOAD_TIMEOUT_ERROR_CODE = 601; - private static final int UPLOAD_FILE_NOT_FOUND_ERROR_CODE = 602; - private static final int UPLOAD_GENERAL_ERROR_CODE = 603; + protected static final int UPLOAD_TIMEOUT_ERROR_CODE = 601; + protected static final int UPLOAD_FILE_NOT_FOUND_ERROR_CODE = 602; + protected static final int UPLOAD_GENERAL_ERROR_CODE = 603; private final ExecutorService executorService; private final StorageServiceEndpointProvider endpointProvider; private final Heartbeat heartbeat; - private static S3Client s3Client; + private static S3AsyncClient s3AsyncClient; private final SegmentUploaderConfiguration config; public MultiThreadedS3FileUploader(StorageServiceEndpointProvider endpointProvider, SegmentUploaderConfiguration config, KafkaEnvironmentProvider environmentProvider) { this.endpointProvider = endpointProvider; this.config = config; - if (s3Client == null) { - s3Client = S3Client.builder().build(); + ClientOverrideConfiguration overrideConfiguration = ClientOverrideConfiguration.builder() + .apiCallTimeout(Duration.ofMillis(config.getUploadTimeoutMs())) + .build(); + if (s3AsyncClient == null) { + s3AsyncClient = S3AsyncClient.builder().overrideConfiguration(overrideConfiguration).build(); } executorService = Executors.newFixedThreadPool(config.getUploadThreadCount()); heartbeat = new Heartbeat("uploader", config, environmentProvider); - LOG.info("Started MultiThreadedS3FileUploader with threadpool size=" + config.getUploadThreadCount()); + LOG.info("Started MultiThreadedS3FileUploader with threadpool size=" + config.getUploadThreadCount() + " and timeout=" + config.getUploadTimeoutMs() + "ms"); } @Override @@ -58,25 +63,35 @@ public void uploadFile(DirectoryTreeWatcher.UploadTask uploadTask, S3UploadCallb String s3Key = String.format("%s/%s", s3Prefix, subpath); long queueTime = System.currentTimeMillis(); - CompletableFuture future = - CompletableFuture.supplyAsync(() -> { - PutObjectRequest putObjectRequest = PutObjectRequest.builder() - .bucket(s3Bucket) - .key(s3Key) - // Changing checksum algorithm does not seem to - // have any impact regarding seeing CPU intensive - // sun/security/provider/MD5.implCompress - // that is observed in the flame graph. - //.checksumAlgorithm(ChecksumAlgorithm.CRC32_C) - .build(); - return s3Client.putObject(putObjectRequest, uploadTask.getAbsolutePath()); - }, executorService).orTimeout(config.getUploadTimeoutMs(), TimeUnit.MILLISECONDS); - - LOG.info(String.format("Submitted upload of s3://%s/%s", s3Bucket, s3Key)); + PutObjectRequest putObjectRequest = PutObjectRequest.builder() + .bucket(s3Bucket) + .key(s3Key) + // Changing checksum algorithm does not seem to + // have any impact regarding seeing CPU intensive + // sun/security/provider/MD5.implCompress + // that is observed in the flame graph. + //.checksumAlgorithm(ChecksumAlgorithm.CRC32_C) + .build(); + CompletableFuture future; + String uploadPathString = String.format("s3://%s/%s", s3Bucket, s3Key); + uploadTask.setUploadDestinationPathString(uploadPathString); // set the upload destination path so that it can be used in the callback + try { + LOG.info(String.format("Submitting upload of %s --> %s", uploadTask.getAbsolutePath(), uploadPathString)); + future = s3AsyncClient.putObject(putObjectRequest, uploadTask.getAbsolutePath()); + } catch (Exception e) { + long timeSpentMs = System.currentTimeMillis() - queueTime; + LOG.error(String.format("Caught exception during putObject for %s --> %s in %dms", uploadTask.getAbsolutePath(), uploadPathString, timeSpentMs), e); + int errorCode = UPLOAD_GENERAL_ERROR_CODE; + if (Utils.isAssignableFromRecursive(e, NoSuchFileException.class)) { + errorCode = UPLOAD_FILE_NOT_FOUND_ERROR_CODE; + } + s3UploadCallback.onCompletion(uploadTask, timeSpentMs, e, errorCode); + return; + } future.whenComplete((putObjectResponse, throwable) -> { long timeSpentMs = System.currentTimeMillis() - queueTime; if (throwable != null) { - LOG.error(String.format("Failed upload of s3://%s/%s in %d ms.", s3Bucket, s3Key, timeSpentMs), throwable); + LOG.error(String.format("PutObject failed for %s --> %s in %d ms.", uploadTask.getAbsolutePath(), uploadPathString, timeSpentMs), throwable); int errorCode = getErrorCode(throwable, putObjectResponse); @@ -87,7 +102,7 @@ public void uploadFile(DirectoryTreeWatcher.UploadTask uploadTask, S3UploadCallb errorCode ); } else { - LOG.info(String.format("Completed upload of s3://%s/%s in %d ms.", s3Bucket, s3Key, timeSpentMs)); + LOG.info(String.format("Completed upload of %s in %d ms.", uploadPathString, timeSpentMs)); s3UploadCallback.onCompletion(uploadTask, timeSpentMs,null, putObjectResponse.sdkHttpResponse().statusCode()); } }); @@ -97,7 +112,7 @@ private int getErrorCode(Throwable throwable, PutObjectResponse putObjectRespons if (throwable == null) { return putObjectResponse == null ? UPLOAD_GENERAL_ERROR_CODE : putObjectResponse.sdkHttpResponse().statusCode(); } - if (throwable instanceof TimeoutException) { + if (throwable instanceof ApiCallTimeoutException || throwable instanceof TimeoutException) { return UPLOAD_TIMEOUT_ERROR_CODE; } if (throwable instanceof NoSuchFileException) { @@ -108,7 +123,7 @@ private int getErrorCode(Throwable throwable, PutObjectResponse putObjectRespons } public void stop() { - s3Client.close(); + s3AsyncClient.close(); executorService.shutdown(); heartbeat.stop(); } @@ -119,7 +134,7 @@ public StorageServiceEndpointProvider getStorageServiceEndpointProvider() { } @VisibleForTesting - protected static void overrideS3Client(S3Client newS3Client) { - s3Client = newS3Client; + protected static void overrideS3Client(S3AsyncClient newS3Client) { + s3AsyncClient = newS3Client; } } diff --git a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/SegmentUploaderConfiguration.java b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/SegmentUploaderConfiguration.java index 7e6c7f7..a4a90ce 100644 --- a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/SegmentUploaderConfiguration.java +++ b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/SegmentUploaderConfiguration.java @@ -61,6 +61,11 @@ public class SegmentUploaderConfiguration { */ private static final String UPLOAD_MAX_RETRIES = TS_SEGMENT_UPLOADER_PREFIX + "." + "upload.max.retries"; + /** + * File to write failed uploads information to. + */ + private static final String UPLOAD_FAILURE_FILE = TS_SEGMENT_UPLOADER_PREFIX + "." + "upload.failure.file"; + /** * Class name for {@link com.pinterest.kafka.tieredstorage.uploader.leadership.LeadershipWatcher} implementation to use. */ @@ -210,6 +215,15 @@ public int getUploadMaxRetries() { return Integer.parseInt(properties.getProperty(UPLOAD_MAX_RETRIES, String.valueOf(Defaults.DEFAULT_UPLOAD_MAX_RETRIES))); } + public String getUploadFailureFile() { + return properties.getProperty(UPLOAD_FAILURE_FILE); + } + + @VisibleForTesting + protected void setUploadFailureFile(String uploadFailureFile) { + properties.setProperty(UPLOAD_FAILURE_FILE, uploadFailureFile); + } + @VisibleForTesting protected boolean isInInclusionCache(String topicName) { return includeTopicsCache.contains(topicName); diff --git a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestBase.java b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestBase.java index 0eaa1f7..a7b4f5c 100644 --- a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestBase.java +++ b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestBase.java @@ -16,7 +16,9 @@ import org.junit.jupiter.api.extension.RegisterExtension; import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.GetObjectRequest; @@ -29,6 +31,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.time.Duration; import java.util.Collections; import java.util.List; import java.util.Map; @@ -42,8 +45,10 @@ public class TestBase { protected static final String S3_BUCKET = "test-bucket"; protected static final String TEST_CLUSTER = "test-cluster-2"; protected static final String TEST_TOPIC_A = "test_topic_a"; + protected static final String TEST_TOPIC_B = "test_topic_b"; protected static final Path TEST_DATA_LOG_DIRECTORY_PATH = Paths.get("src/test/resources/log_segments/test_topic_a-0"); protected S3Client s3Client; + protected S3AsyncClient s3AsyncClient; @BeforeEach public void setup() throws Exception { @@ -53,6 +58,12 @@ public void setup() throws Exception { .credentialsProvider(AnonymousCredentialsProvider.create()) .build(); s3Client.createBucket(CreateBucketRequest.builder().bucket(S3_BUCKET).build()); + s3AsyncClient = S3AsyncClient.builder() + .endpointOverride(URI.create(S3_MOCK.getServiceEndpoint())) + .region(Region.US_EAST_1) + .credentialsProvider(AnonymousCredentialsProvider.create()) + .build(); + s3AsyncClient.createBucket(CreateBucketRequest.builder().bucket(S3_BUCKET).build()); } @AfterEach @@ -61,11 +72,26 @@ public void tearDown() throws IOException, InterruptedException, ExecutionExcept s3Client.close(); } - public static void overrideS3ClientForFileUploaderAndDownloader(S3Client s3Client) { - MultiThreadedS3FileUploader.overrideS3Client(s3Client); + public static void overrideS3ClientForFileDownloader(S3Client s3Client) { S3FileDownloader.overrideS3Client(s3Client); } + public static void overrideS3AsyncClientForFileUploader(S3AsyncClient s3AsyncClient) { + MultiThreadedS3FileUploader.overrideS3Client(s3AsyncClient); + } + + public static S3AsyncClient getS3AsyncClientWithCustomApiCallTimeout(long timeoutMs) { + ClientOverrideConfiguration overrideConfiguration = ClientOverrideConfiguration.builder() + .apiCallTimeout(Duration.ofMillis(1L)) + .build(); + return S3AsyncClient.builder() + .endpointOverride(URI.create(S3_MOCK.getServiceEndpoint())) + .region(Region.US_EAST_1) + .credentialsProvider(AnonymousCredentialsProvider.create()) + .overrideConfiguration(overrideConfiguration) + .build(); + } + public static KafkaEnvironmentProvider createTestEnvironmentProvider(String suppliedZkConnect, String suppliedLogDir) { KafkaEnvironmentProvider environmentProvider = new KafkaEnvironmentProvider() { @@ -199,8 +225,8 @@ protected static ResponseInputStream getObjectResponse(String ); } - protected static ListObjectsV2Response getListObjectsV2Response(String bucket, String prefix, S3Client s3Client) { - return s3Client.listObjectsV2(ListObjectsV2Request.builder().bucket(bucket).prefix(prefix).build()); + protected static ListObjectsV2Response getListObjectsV2Response(String bucket, String prefix, S3AsyncClient s3AsyncClient) throws ExecutionException, InterruptedException { + return s3AsyncClient.listObjectsV2(ListObjectsV2Request.builder().bucket(bucket).prefix(prefix).build()).get(); } protected static void reassignPartitions(SharedKafkaTestResource sharedKafkaTestResource, Map>> assignmentMap) throws IOException, InterruptedException, KeeperException { diff --git a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestDirectoryTreeWatcher.java b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestDirectoryTreeWatcher.java index 8ce023e..9d3c7c6 100644 --- a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestDirectoryTreeWatcher.java +++ b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestDirectoryTreeWatcher.java @@ -1,5 +1,6 @@ package com.pinterest.kafka.tieredstorage.uploader; +import com.pinterest.kafka.tieredstorage.common.discovery.StorageServiceEndpointProvider; import com.pinterest.kafka.tieredstorage.common.discovery.s3.MockS3StorageServiceEndpointProvider; import com.pinterest.kafka.tieredstorage.uploader.leadership.ZookeeperLeadershipWatcher; import com.salesforce.kafka.test.junit5.SharedKafkaTestResource; @@ -15,6 +16,7 @@ import java.io.File; import java.io.IOException; +import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.WatchKey; @@ -22,10 +24,12 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; public class TestDirectoryTreeWatcher extends TestBase { @@ -33,10 +37,10 @@ public class TestDirectoryTreeWatcher extends TestBase { private static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource() .withBrokerProperty("log.segment.bytes", "30000") .withBrokerProperty("log.segment.delete.delay.ms", "5000"); - private static final String TEST_TOPIC_B = "test_topic_b"; private DirectoryTreeWatcher directoryTreeWatcher; private KafkaEnvironmentProvider environmentProvider; private static AdminClient adminClient; + private SegmentUploaderConfiguration config; @BeforeEach @Override @@ -48,14 +52,15 @@ public void setup() throws Exception { environmentProvider.load(); // override s3 client - overrideS3ClientForFileUploaderAndDownloader(s3Client); + overrideS3ClientForFileDownloader(s3Client); + overrideS3AsyncClientForFileUploader(s3AsyncClient); // endpoint provider setup MockS3StorageServiceEndpointProvider endpointProvider = new MockS3StorageServiceEndpointProvider(); endpointProvider.initialize(TEST_CLUSTER); // s3 uploader setup - SegmentUploaderConfiguration config = new SegmentUploaderConfiguration("src/test/resources", TEST_CLUSTER); + config = new SegmentUploaderConfiguration("src/test/resources", TEST_CLUSTER); S3FileUploader s3FileUploader = new MultiThreadedS3FileUploader(endpointProvider, config, environmentProvider); // create topic @@ -174,6 +179,73 @@ void testExponentialBackoffRetries() throws InterruptedException { } } + @Test + void testRetryExhaustion() throws IOException, InterruptedException { + // override s3AsyncClient to have a very short timeout + MultiThreadedS3FileUploader.overrideS3Client(getS3AsyncClientWithCustomApiCallTimeout(1L)); + + // write to file + config.setUploadFailureFile(Paths.get("src/test/resources").resolve("upload_failure.txt").toString()); + + StorageServiceEndpointProvider endpointProvider = new MockS3StorageServiceEndpointProvider(); + endpointProvider.initialize(TEST_CLUSTER); + + MultiThreadedS3FileUploader uploader = new MultiThreadedS3FileUploader(endpointProvider, config, environmentProvider); + + // upload two log files + TopicPartition tp = new TopicPartition(TEST_TOPIC_A, 0); + String offset = "00000000000000000000"; + String offset2 = "00000000000000000294"; + String nonExistentOffset = "00000000000000000295"; + + DirectoryTreeWatcher.UploadTask uploadTask = new DirectoryTreeWatcher.UploadTask( + tp, + offset, + String.format("%s.log", offset), + TEST_DATA_LOG_DIRECTORY_PATH.resolve(Paths.get(String.format("%s.log", offset))) + ); + DirectoryTreeWatcher.UploadTask uploadTask2 = new DirectoryTreeWatcher.UploadTask( + tp, + offset2, + String.format("%s.log", offset2), + TEST_DATA_LOG_DIRECTORY_PATH.resolve(Paths.get(String.format("%s.log", offset2))) + ); + DirectoryTreeWatcher.UploadTask nonExistentUploadTask = new DirectoryTreeWatcher.UploadTask( + tp, + nonExistentOffset, + String.format("%s.log", nonExistentOffset), + TEST_DATA_LOG_DIRECTORY_PATH.resolve(Paths.get(String.format("%s.log", nonExistentOffset))) + ); + + S3UploadCallback callback = new S3UploadCallback() { + @Override + public void onCompletion(DirectoryTreeWatcher.UploadTask uploadTask, long totalTimeMs, Throwable throwable, int statusCode) { + assertNotNull(throwable); + if (uploadTask == nonExistentUploadTask) { + assertEquals(MultiThreadedS3FileUploader.UPLOAD_FILE_NOT_FOUND_ERROR_CODE, statusCode); + assertTrue(Utils.isAssignableFromRecursive(throwable, NoSuchFileException.class)); + } else { + assertEquals(MultiThreadedS3FileUploader.UPLOAD_TIMEOUT_ERROR_CODE, statusCode); + assertTrue(Utils.isAssignableFromRecursive(throwable, CompletionException.class)); + } + directoryTreeWatcher.handleUploadCallback(uploadTask, totalTimeMs, throwable, statusCode); + } + }; + + uploader.uploadFile(uploadTask, callback); + uploader.uploadFile(uploadTask2, callback); + uploader.uploadFile(nonExistentUploadTask, callback); + + Thread.sleep(15000); // wait for retries to exhaust + + assertEquals(config.getUploadMaxRetries(), uploadTask.getTries()); + + // TODO: validate contents of file and delete it after test + + // override the s3AsyncClient back to original + MultiThreadedS3FileUploader.overrideS3Client(s3AsyncClient); + } + private static void increasePartitionsAndVerify(SharedKafkaTestResource sharedKafkaTestResource, String topic, int newPartitionCount) throws InterruptedException { sharedKafkaTestResource.getKafkaTestUtils().getAdminClient().createPartitions(Collections.singletonMap(topic, NewPartitions.increaseTo(newPartitionCount))); while (sharedKafkaTestResource.getKafkaTestUtils().describeTopic(topic).partitions().size() != newPartitionCount) diff --git a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestDirectoryTreeWatcherMultiBroker.java b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestDirectoryTreeWatcherMultiBroker.java index 5695420..9af1dd1 100644 --- a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestDirectoryTreeWatcherMultiBroker.java +++ b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestDirectoryTreeWatcherMultiBroker.java @@ -28,7 +28,6 @@ public class TestDirectoryTreeWatcherMultiBroker extends TestBase { .withBrokerProperty("log.segment.bytes", "30000") .withBrokerProperty("log.segment.delete.delay.ms", "5000") .withBrokers(2); - private static final String TEST_TOPIC_B = "test_topic_b"; private DirectoryTreeWatcher directoryTreeWatcher; private static AdminClient adminClient; @@ -42,7 +41,8 @@ public void setup() throws Exception { environmentProvider.load(); // override s3 client - overrideS3ClientForFileUploaderAndDownloader(s3Client); + overrideS3ClientForFileDownloader(s3Client); + overrideS3AsyncClientForFileUploader(s3AsyncClient); // endpoint provider setup MockS3StorageServiceEndpointProvider endpointProvider = new MockS3StorageServiceEndpointProvider(); diff --git a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestKafkaSegmentUploader.java b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestKafkaSegmentUploader.java index 0dc980a..4976269 100644 --- a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestKafkaSegmentUploader.java +++ b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestKafkaSegmentUploader.java @@ -37,9 +37,6 @@ public class TestKafkaSegmentUploader extends TestBase { .withBrokerProperty("log.index.interval.bytes", "100") .withBrokerProperty("log.segment.delete.delay.ms", "5000") .withBrokerProperty("log.dir", "/tmp/kafka-unit"); - private static final String TEST_TOPIC_A = "test_topic_a"; - private static final String TEST_TOPIC_B = "test_topic_b"; - private static final String TEST_CLUSTER = "test-cluster-2"; private static AdminClient adminClient; private KafkaSegmentUploader uploader; private KafkaEnvironmentProvider environmentProvider; @@ -76,7 +73,8 @@ static void tearDownAll() { private void startSegmentUploaderThread() throws Exception { String configDirectory = "src/test/resources"; - overrideS3ClientForFileUploaderAndDownloader(s3Client); + overrideS3ClientForFileDownloader(s3Client); + overrideS3AsyncClientForFileUploader(s3AsyncClient); uploader = new KafkaSegmentUploader(configDirectory, environmentProvider); uploader.start(); } diff --git a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestMultiThreadedS3FileUploader.java b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestMultiThreadedS3FileUploader.java index 4dbf817..960af86 100644 --- a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestMultiThreadedS3FileUploader.java +++ b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestMultiThreadedS3FileUploader.java @@ -3,17 +3,21 @@ import com.pinterest.kafka.tieredstorage.common.discovery.s3.MockS3StorageServiceEndpointProvider; import com.pinterest.kafka.tieredstorage.common.discovery.s3.S3StorageServiceEndpoint; import com.pinterest.kafka.tieredstorage.common.discovery.s3.S3StorageServiceEndpointProvider; -import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.kafka.common.TopicPartition; -import org.apache.zookeeper.KeeperException; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; +import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; +import software.amazon.awssdk.core.exception.ApiCallTimeoutException; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; +import java.net.URI; import java.nio.file.NoSuchFileException; import java.nio.file.Paths; +import java.time.Duration; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; @@ -24,13 +28,14 @@ public class TestMultiThreadedS3FileUploader extends TestBase { protected MultiThreadedS3FileUploader s3FileUploader; private S3StorageServiceEndpointProvider endpointProvider; + private KafkaEnvironmentProvider environmentProvider; private SegmentUploaderConfiguration config; @BeforeEach public void setup() throws Exception { super.setup(); // NO-OP - KafkaEnvironmentProvider environmentProvider = new KafkaEnvironmentProvider() { + environmentProvider = new KafkaEnvironmentProvider() { @Override public void load() { } @@ -59,7 +64,7 @@ public String logDir() { }; endpointProvider = new MockS3StorageServiceEndpointProvider(); endpointProvider.initialize(TEST_CLUSTER); - MultiThreadedS3FileUploader.overrideS3Client(s3Client); + MultiThreadedS3FileUploader.overrideS3Client(s3AsyncClient); config = new SegmentUploaderConfiguration("src/test/resources", TEST_CLUSTER); s3FileUploader = new MultiThreadedS3FileUploader(endpointProvider, config, environmentProvider); } @@ -69,7 +74,7 @@ public String logDir() { * @throws InterruptedException */ @Test - void testSimpleUpload() throws InterruptedException { + void testSimpleUpload() throws InterruptedException, ExecutionException { TopicPartition tp = new TopicPartition(TEST_TOPIC_A, 0); String offset = "00000000000000000000"; DirectoryTreeWatcher.UploadTask uploadTask = new DirectoryTreeWatcher.UploadTask( @@ -89,7 +94,7 @@ void testSimpleUpload() throws InterruptedException { ListObjectsV2Response response = getListObjectsV2Response( S3_BUCKET, endpoint.getFullPrefix(), - s3Client + s3AsyncClient ); assertEquals(1, response.contents().size()); @@ -107,7 +112,7 @@ void testSimpleUpload() throws InterruptedException { response = getListObjectsV2Response( S3_BUCKET, endpoint.getFullPrefix(), - s3Client + s3AsyncClient ); assertEquals(2, response.contents().size()); @@ -141,6 +146,33 @@ public void onCompletion(DirectoryTreeWatcher.UploadTask uploadTask, long totalT assertTrue(Utils.isAssignableFromRecursive(throwable, NoSuchFileException.class)); } }); + } + + @Test + void testTimeoutUpload() throws IOException { + // override s3AsyncClient to have a very short timeout + MultiThreadedS3FileUploader.overrideS3Client(getS3AsyncClientWithCustomApiCallTimeout(1L)); + + // upload a log file + TopicPartition tp = new TopicPartition(TEST_TOPIC_B, 0); + String offset = "00000000000000000000"; + + DirectoryTreeWatcher.UploadTask uploadTask = new DirectoryTreeWatcher.UploadTask( + tp, + offset, + String.format("%s.log", offset), + TEST_DATA_LOG_DIRECTORY_PATH.resolve(Paths.get(String.format("%s.log", offset))) + ); + + s3FileUploader.uploadFile(uploadTask, new S3UploadCallback() { + @Override + public void onCompletion(DirectoryTreeWatcher.UploadTask uploadTask, long totalTimeMs, Throwable throwable, int statusCode) { + assertTrue(Utils.isAssignableFromRecursive(throwable, ApiCallTimeoutException.class)); + assertEquals(statusCode, MultiThreadedS3FileUploader.UPLOAD_TIMEOUT_ERROR_CODE); + } + }); + // override s3AsyncClient back to original + MultiThreadedS3FileUploader.overrideS3Client(s3AsyncClient); } } diff --git a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/leadership/TestLeadershipWatcher.java b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/leadership/TestLeadershipWatcher.java index a55eeba..4273597 100644 --- a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/leadership/TestLeadershipWatcher.java +++ b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/leadership/TestLeadershipWatcher.java @@ -35,7 +35,8 @@ public void setup() throws Exception { environmentProvider.load(); // override s3 client - overrideS3ClientForFileUploaderAndDownloader(s3Client); + overrideS3ClientForFileDownloader(s3Client); + overrideS3AsyncClientForFileUploader(s3AsyncClient); // endpoint provider setup MockS3StorageServiceEndpointProvider endpointProvider = new MockS3StorageServiceEndpointProvider(); diff --git a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/leadership/TestZookeeperLeadershipWatcher.java b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/leadership/TestZookeeperLeadershipWatcher.java index f45933b..55de89a 100644 --- a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/leadership/TestZookeeperLeadershipWatcher.java +++ b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/leadership/TestZookeeperLeadershipWatcher.java @@ -48,7 +48,8 @@ public void setup() throws Exception { environmentProvider.load(); // override s3 client - overrideS3ClientForFileUploaderAndDownloader(s3Client); + overrideS3ClientForFileDownloader(s3Client); + overrideS3AsyncClientForFileUploader(s3AsyncClient); // endpoint provider setup MockS3StorageServiceEndpointProvider endpointProvider = new MockS3StorageServiceEndpointProvider(); diff --git a/ts-segment-uploader/src/test/resources/log4j.xml b/ts-segment-uploader/src/test/resources/log4j.xml index 20d4f87..147d5a3 100644 --- a/ts-segment-uploader/src/test/resources/log4j.xml +++ b/ts-segment-uploader/src/test/resources/log4j.xml @@ -9,7 +9,7 @@ - + diff --git a/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000000000000.index b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000000000000.index new file mode 100644 index 0000000..2453082 Binary files /dev/null and b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000000000000.index differ diff --git a/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000000000000.log b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000000000000.log new file mode 100644 index 0000000..654e740 Binary files /dev/null and b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000000000000.log differ diff --git a/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000000000000.timeindex b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000000000000.timeindex new file mode 100644 index 0000000..0404753 Binary files /dev/null and b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000000000000.timeindex differ diff --git a/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000000822706.index b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000000822706.index new file mode 100644 index 0000000..1091cd6 Binary files /dev/null and b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000000822706.index differ diff --git a/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000000822706.log b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000000822706.log new file mode 100644 index 0000000..7176e4c Binary files /dev/null and b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000000822706.log differ diff --git a/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000000822706.snapshot b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000000822706.snapshot new file mode 100644 index 0000000..6099ecd Binary files /dev/null and b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000000822706.snapshot differ diff --git a/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000000822706.timeindex b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000000822706.timeindex new file mode 100644 index 0000000..a1e9148 Binary files /dev/null and b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000000822706.timeindex differ diff --git a/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000001613280.index b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000001613280.index new file mode 100644 index 0000000..948ff0e Binary files /dev/null and b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000001613280.index differ diff --git a/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000001613280.log b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000001613280.log new file mode 100644 index 0000000..ecfaa72 Binary files /dev/null and b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000001613280.log differ diff --git a/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000001613280.snapshot b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000001613280.snapshot new file mode 100644 index 0000000..6099ecd Binary files /dev/null and b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000001613280.snapshot differ diff --git a/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000001613280.timeindex b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000001613280.timeindex new file mode 100644 index 0000000..829dc1c Binary files /dev/null and b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/00000000000001613280.timeindex differ diff --git a/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/leader-epoch-checkpoint b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/leader-epoch-checkpoint new file mode 100644 index 0000000..e416669 --- /dev/null +++ b/ts-segment-uploader/src/test/resources/log_segments/test_topic_b-0/leader-epoch-checkpoint @@ -0,0 +1,3 @@ +0 +1 +0 0 diff --git a/ts-segment-uploader/src/test/resources/test-cluster.properties b/ts-segment-uploader/src/test/resources/test-cluster.properties index 7455c8b..bf72a53 100644 --- a/ts-segment-uploader/src/test/resources/test-cluster.properties +++ b/ts-segment-uploader/src/test/resources/test-cluster.properties @@ -5,4 +5,5 @@ ts.segment.uploader.kafka.topics.exclude=test_topic.*, .*_topic storage.service.endpoint.provider.class=com.pinterest.kafka.tieredstorage.common.discovery.s3.MockS3StorageServiceEndpointProvider -ts.segment.uploader.leadership.watcher.class=com.pinterest.kafka.tieredstorage.uploader.leadership.ZookeeperLeadershipWatcher \ No newline at end of file +ts.segment.uploader.leadership.watcher.class=com.pinterest.kafka.tieredstorage.uploader.leadership.ZookeeperLeadershipWatcher +ts.segment.uploader.upload.max.retries=3 \ No newline at end of file diff --git a/ts-segment-uploader/src/test/resources/upload_failure.txt b/ts-segment-uploader/src/test/resources/upload_failure.txt new file mode 100644 index 0000000..0835a9e --- /dev/null +++ b/ts-segment-uploader/src/test/resources/upload_failure.txt @@ -0,0 +1,48 @@ +timestamp: 1734124369987 +human_timestamp: 2024-12-13T16:12:49.987 +task_num_retries: 3 +local_path: src/test/resources/log_segments/test_topic_a-0/00000000000000000000.log +destination_path: s3://test-bucket/retention-3days/tiered_storage_test/01101/test-cluster-2/test_topic_a-0/00000000000000000000.log +exception: java.util.concurrent.CompletionException +message: software.amazon.awssdk.core.exception.ApiCallTimeoutException: Client execution did not complete before the specified timeout configuration: 1 millis +------------------- +timestamp: 1734124369994 +human_timestamp: 2024-12-13T16:12:49.994 +task_num_retries: 3 +local_path: src/test/resources/log_segments/test_topic_a-0/00000000000000000294.log +destination_path: s3://test-bucket/retention-3days/tiered_storage_test/01101/test-cluster-2/test_topic_a-0/00000000000000000294.log +exception: java.util.concurrent.CompletionException +message: software.amazon.awssdk.core.exception.ApiCallTimeoutException: Client execution did not complete before the specified timeout configuration: 1 millis +------------------- +timestamp: 1734124372390 +human_timestamp: 2024-12-13T16:12:52.39 +task_num_retries: 4 +local_path: src/test/resources/log_segments/test_topic_a-0/00000000000000000295.log.deleted +destination_path: s3://test-bucket/retention-3days/tiered_storage_test/01101/test-cluster-2/test_topic_a-0/00000000000000000295.log +exception: java.io.UncheckedIOException +message: java.nio.file.NoSuchFileException: src/test/resources/log_segments/test_topic_a-0/00000000000000000295.log.deleted +------------------- +timestamp: 1734127129568 +human_timestamp: 2024-12-13T16:58:49.568 +task_num_retries: 3 +local_path: src/test/resources/log_segments/test_topic_a-0/00000000000000000000.log +destination_path: s3://test-bucket/retention-3days/tiered_storage_test/01101/test-cluster-2/test_topic_a-0/00000000000000000000.log +exception: java.util.concurrent.CompletionException +message: software.amazon.awssdk.core.exception.ApiCallTimeoutException: Client execution did not complete before the specified timeout configuration: 1 millis +------------------- +timestamp: 1734127129574 +human_timestamp: 2024-12-13T16:58:49.574 +task_num_retries: 3 +local_path: src/test/resources/log_segments/test_topic_a-0/00000000000000000294.log +destination_path: s3://test-bucket/retention-3days/tiered_storage_test/01101/test-cluster-2/test_topic_a-0/00000000000000000294.log +exception: java.util.concurrent.CompletionException +message: software.amazon.awssdk.core.exception.ApiCallTimeoutException: Client execution did not complete before the specified timeout configuration: 1 millis +------------------- +timestamp: 1734127131962 +human_timestamp: 2024-12-13T16:58:51.962 +task_num_retries: 4 +local_path: src/test/resources/log_segments/test_topic_a-0/00000000000000000295.log.deleted +destination_path: s3://test-bucket/retention-3days/tiered_storage_test/01101/test-cluster-2/test_topic_a-0/00000000000000000295.log +exception: java.io.UncheckedIOException +message: java.nio.file.NoSuchFileException: src/test/resources/log_segments/test_topic_a-0/00000000000000000295.log.deleted +-------------------