From cb4ed2a8439628533ffc8e9b502610607901d7c8 Mon Sep 17 00:00:00 2001 From: Nikita Bahliuk Date: Sun, 16 Jun 2024 14:40:28 +0300 Subject: [PATCH 1/2] Fix `sendMessageBatch` of async client. Same approach that used in sync client, but with different code due to async nature of this client --- .../AmazonSQSExtendedAsyncClient.java | 168 ++++++++++++++---- .../AmazonSQSExtendedAsyncClientTest.java | 40 ++++- .../AmazonSQSExtendedClientTest.java | 2 - 3 files changed, 176 insertions(+), 34 deletions(-) diff --git a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClient.java b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClient.java index 02404a2..c053894 100644 --- a/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClient.java +++ b/src/main/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClient.java @@ -7,6 +7,7 @@ import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.getReservedAttributeNameIfPresent; import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.isLarge; import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.isS3ReceiptHandle; +import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.sizeOf; import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.updateMessageAttributePayloadSize; import java.util.ArrayList; @@ -17,6 +18,8 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import java.util.stream.IntStream; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import software.amazon.awssdk.awscore.AwsRequest; @@ -44,6 +47,7 @@ import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; import software.amazon.awssdk.services.sqs.model.SendMessageResponse; +import software.amazon.awssdk.utils.Pair; import software.amazon.awssdk.utils.StringUtils; import software.amazon.payloadoffloading.PayloadStoreAsync; import software.amazon.payloadoffloading.S3AsyncDao; @@ -341,37 +345,15 @@ public CompletableFuture sendMessageBatch( return super.sendMessageBatch(sendMessageBatchRequest); } - List> batchEntryFutures = new ArrayList<>( - sendMessageBatchRequest.entries().size()); - boolean hasS3Entries = false; - for (SendMessageBatchRequestEntry entry : sendMessageBatchRequest.entries()) { - //Check message attributes for ExtendedClient related constraints - checkMessageAttributes(clientConfiguration.getPayloadSizeThreshold(), entry.messageAttributes()); - - if (clientConfiguration.isAlwaysThroughS3() - || isLarge(clientConfiguration.getPayloadSizeThreshold(), entry)) { - batchEntryFutures.add(storeMessageInS3(entry)); - hasS3Entries = true; - } else { - batchEntryFutures.add(CompletableFuture.completedFuture(entry)); - } - } - - if (!hasS3Entries) { - return super.sendMessageBatch(sendMessageBatchRequest); - } - - // Convert list of entry futures to a future list of entries. - return CompletableFuture.allOf( - batchEntryFutures.toArray(new CompletableFuture[batchEntryFutures.size()])) - .thenApply(v -> batchEntryFutures.stream() - .map(CompletableFuture::join) - .collect(Collectors.toList())) - .thenCompose(batchEntries -> { - SendMessageBatchRequest modifiedBatchRequest = - sendMessageBatchRequest.toBuilder().entries(batchEntries).build(); - return super.sendMessageBatch(modifiedBatchRequest); - }); + return CompletableFuture.supplyAsync(() -> new SendBatchHelper(clientConfiguration, sendMessageBatchRequest.entries())) + .thenApply(SendBatchHelper::storeInS3IfNeeded) + .thenApply(CompletableFuture::join) + .thenApply(SendBatchHelper::getEntries) + .thenCompose(batchEntries -> { + SendMessageBatchRequest modifiedBatchRequest = + sendMessageBatchRequest.toBuilder().entries(batchEntries).build(); + return super.sendMessageBatch(modifiedBatchRequest); + }); } /** @@ -508,4 +490,128 @@ private CompletableFuture storeOriginalPayload(String messageContentStr) private static T appendUserAgent(final T builder) { return AmazonSQSExtendedClientUtil.appendUserAgent(builder, USER_AGENT_NAME, USER_AGENT_VERSION); } + + + + private class SendBatchHelper { + private final ExtendedAsyncClientConfiguration clientConfiguration; + + private final List entries; + private final List> entrySizes; + private final List alreadyMovedEntries; + private long totalSize; + + public SendBatchHelper(ExtendedAsyncClientConfiguration clientConfiguration, List entries) { + this.clientConfiguration = clientConfiguration; + this.entries = new ArrayList<>(entries); + this.entrySizes = IntStream.range(0, entries.size()) + .boxed() + .map(i -> Pair.of(i, sizeOf(entries.get(i)))) + .collect(Collectors.toList()); + this.totalSize = entrySizes.stream().map(Pair::right).mapToLong(Long::longValue).sum(); + this.alreadyMovedEntries = new ArrayList<>(entries.size()); + } + + public List getEntries() { + return entries; + } + + public long getTotalSize() { + return totalSize; + } + + /** + * Get the largest entry that has not been moved to S3 yet + * @return metadata of largest not yet moved entry or empty if all entries have been moved + */ + private Optional getLargestNotYetMovedEntry() { + long largestEntrySize = -1; + int largestEntryIdx = -1; + for (Pair entryPair : entrySizes) { + Integer currentEntryIdx = entryPair.left(); + if (alreadyMovedEntries.contains(currentEntryIdx)) { + continue; + } + Long currentEntrySize = entryPair.right(); + if(largestEntrySize < currentEntrySize) { + largestEntrySize = currentEntrySize; + largestEntryIdx = currentEntryIdx; + } + } + if (largestEntryIdx == -1) { + // we moved all entries + return Optional.empty(); + } + return Optional.of( + new LargestEntryWithMetadata(entries.get(largestEntryIdx), largestEntryIdx, largestEntrySize) + ); + } + + /** + * Move the largest entry to S3 + * @return a future that completes with true if the entry was moved to S3, false otherwise + */ + public CompletableFuture moveLargestEntryToS3() { + Optional largestNotYetMovedEntry = getLargestNotYetMovedEntry(); + if (!largestNotYetMovedEntry.isPresent()) { + return CompletableFuture.completedFuture(false); + } + LargestEntryWithMetadata entryWithMeta = largestNotYetMovedEntry.get(); + SendMessageBatchRequestEntry largestEntry = entryWithMeta.getEntry(); + checkMessageAttributes(this.clientConfiguration.getPayloadSizeThreshold(), largestEntry.messageAttributes()); + return storeMessageInS3(largestEntry) + .thenApply(modifiedEntry -> { + alreadyMovedEntries.add(entryWithMeta.getIndex()); + entries.set(entryWithMeta.getIndex(), modifiedEntry); + totalSize = totalSize - entryWithMeta.getSize() + sizeOf(modifiedEntry); + return true; + }); + + } + + + public CompletableFuture storeInS3IfNeeded() { + // Verify that total size of batch request is within limits + if (this.getTotalSize() <= clientConfiguration.getPayloadSizeThreshold() && !clientConfiguration.isAlwaysThroughS3()) { + return CompletableFuture.completedFuture(this); + } + // move the largest entry to S3 + return this.moveLargestEntryToS3() + // call recursively to check if there are more entries to move to S3 + .thenCompose(actuallyMoved -> { + if (actuallyMoved) { + return storeInS3IfNeeded(); + } else { + // no more entries to move to S3 + return CompletableFuture.completedFuture(this); + } + }); + } + + } + + private static class LargestEntryWithMetadata { + private final SendMessageBatchRequestEntry entry; + private final int index; + private final long size; + + public LargestEntryWithMetadata(SendMessageBatchRequestEntry entry, int index, long size) { + this.entry = entry; + this.index = index; + this.size = size; + } + + public SendMessageBatchRequestEntry getEntry() { + return entry; + } + + public int getIndex() { + return index; + } + + public long getSize() { + return size; + } + } + } diff --git a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClientTest.java b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClientTest.java index eb7de08..5500195 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClientTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClientTest.java @@ -35,6 +35,7 @@ import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; @@ -404,7 +405,7 @@ public void testWhenMessageBatchIsSentThenOnlyMessagesLargerThanThresholdAreStor 700_000, 800_000, 900_000, - 200_000, + 150_000, 1000_000 }; @@ -427,6 +428,43 @@ public void testWhenMessageBatchIsSentThenOnlyMessagesLargerThanThresholdAreStor verify(mockS3, times(8)).putObject(isA(PutObjectRequest.class), isA(AsyncRequestBody.class)); } + + @Test + public void testWhenMessageBatchWithTotalSizeOverTheLimitIsSentThenLargestEntriesAreStoredInS3() { + // This creates 10 messages, out of which only two are below the threshold (100K and 150K), + // and the other 8 are above the threshold + + int[] messageLengthForCounter = new int[] { + 10_000, + 10_000, + 10_000, + 150_000, + 160_000, + 170_000, + 180_000, + 10_000, + 10_000, + 10_000 + }; + + List batchEntries = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + int messageLength = messageLengthForCounter[i]; + String messageBody = generateStringWithLength(messageLength); + SendMessageBatchRequestEntry entry = SendMessageBatchRequestEntry.builder() + .id("entry_" + i) + .messageBody(messageBody) + .build(); + batchEntries.add(entry); + } + + SendMessageBatchRequest batchRequest = SendMessageBatchRequest.builder().queueUrl(SQS_QUEUE_URL).entries(batchEntries).build(); + extendedSqsWithDefaultConfig.sendMessageBatch(batchRequest).join(); + + // There should be 3 puts for the 3 largest messages as sum of sizes of others should be within limit + verify(mockS3, times(3)).putObject(isA(PutObjectRequest.class), isA(AsyncRequestBody.class)); + } + @Test public void testWhenMessageBatchIsLargeS3PointerIsCorrectlySentToSQSAndNotOriginalMessage() { String messageBody = generateStringWithLength(LESS_THAN_SQS_SIZE_LIMIT); diff --git a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java index 20649f5..7c8c4bb 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedClientTest.java @@ -462,8 +462,6 @@ public void testWhenMessageBatchIsSentThenOnlyMessagesLargerThanThresholdAreStor @Test public void testWhenMessageBatchWithTotalSizeOverTheLimitIsSentThenLargestEntriesAreStoredInS3() { - // This creates 10 messages, out of which only two are below the threshold (100K and 150K), - // and the other 8 are above the threshold int[] messageLengthForCounter = new int[] { 10_000, From f0ab64b40966901b7e921d0b6ff432389f294eb0 Mon Sep 17 00:00:00 2001 From: Nikita Bahliuk Date: Sun, 16 Jun 2024 14:41:21 +0300 Subject: [PATCH 2/2] Cosmetics --- .../sqs/javamessaging/AmazonSQSExtendedAsyncClientTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClientTest.java b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClientTest.java index 5500195..7a8dcb9 100644 --- a/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClientTest.java +++ b/src/test/java/com/amazon/sqs/javamessaging/AmazonSQSExtendedAsyncClientTest.java @@ -431,9 +431,6 @@ public void testWhenMessageBatchIsSentThenOnlyMessagesLargerThanThresholdAreStor @Test public void testWhenMessageBatchWithTotalSizeOverTheLimitIsSentThenLargestEntriesAreStoredInS3() { - // This creates 10 messages, out of which only two are below the threshold (100K and 150K), - // and the other 8 are above the threshold - int[] messageLengthForCounter = new int[] { 10_000, 10_000,