Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.getReservedAttributeNameIfPresent;
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.isLarge;
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.isS3ReceiptHandle;
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.sizeOf;
import static com.amazon.sqs.javamessaging.AmazonSQSExtendedClientUtil.updateMessageAttributePayloadSize;

import java.util.ArrayList;
Expand All @@ -17,6 +18,8 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import software.amazon.awssdk.awscore.AwsRequest;
Expand Down Expand Up @@ -44,6 +47,7 @@
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
import software.amazon.awssdk.utils.Pair;
import software.amazon.awssdk.utils.StringUtils;
import software.amazon.payloadoffloading.PayloadStoreAsync;
import software.amazon.payloadoffloading.S3AsyncDao;
Expand Down Expand Up @@ -341,37 +345,15 @@ public CompletableFuture<SendMessageBatchResponse> sendMessageBatch(
return super.sendMessageBatch(sendMessageBatchRequest);
}

List<CompletableFuture<SendMessageBatchRequestEntry>> batchEntryFutures = new ArrayList<>(
sendMessageBatchRequest.entries().size());
boolean hasS3Entries = false;
for (SendMessageBatchRequestEntry entry : sendMessageBatchRequest.entries()) {
//Check message attributes for ExtendedClient related constraints
checkMessageAttributes(clientConfiguration.getPayloadSizeThreshold(), entry.messageAttributes());

if (clientConfiguration.isAlwaysThroughS3()
|| isLarge(clientConfiguration.getPayloadSizeThreshold(), entry)) {
batchEntryFutures.add(storeMessageInS3(entry));
hasS3Entries = true;
} else {
batchEntryFutures.add(CompletableFuture.completedFuture(entry));
}
}

if (!hasS3Entries) {
return super.sendMessageBatch(sendMessageBatchRequest);
}

// Convert list of entry futures to a future list of entries.
return CompletableFuture.allOf(
batchEntryFutures.toArray(new CompletableFuture[batchEntryFutures.size()]))
.thenApply(v -> batchEntryFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()))
.thenCompose(batchEntries -> {
SendMessageBatchRequest modifiedBatchRequest =
sendMessageBatchRequest.toBuilder().entries(batchEntries).build();
return super.sendMessageBatch(modifiedBatchRequest);
});
return CompletableFuture.supplyAsync(() -> new SendBatchHelper(clientConfiguration, sendMessageBatchRequest.entries()))
.thenApply(SendBatchHelper::storeInS3IfNeeded)
.thenApply(CompletableFuture::join)
.thenApply(SendBatchHelper::getEntries)
.thenCompose(batchEntries -> {
SendMessageBatchRequest modifiedBatchRequest =
sendMessageBatchRequest.toBuilder().entries(batchEntries).build();
return super.sendMessageBatch(modifiedBatchRequest);
});
}

/**
Expand Down Expand Up @@ -508,4 +490,128 @@ private CompletableFuture<String> storeOriginalPayload(String messageContentStr)
private static <T extends AwsRequest.Builder> T appendUserAgent(final T builder) {
return AmazonSQSExtendedClientUtil.appendUserAgent(builder, USER_AGENT_NAME, USER_AGENT_VERSION);
}



private class SendBatchHelper {
private final ExtendedAsyncClientConfiguration clientConfiguration;

private final List<SendMessageBatchRequestEntry> entries;
private final List<Pair<Integer, Long>> entrySizes;
private final List<Integer> alreadyMovedEntries;
private long totalSize;

public SendBatchHelper(ExtendedAsyncClientConfiguration clientConfiguration, List<SendMessageBatchRequestEntry> entries) {
this.clientConfiguration = clientConfiguration;
this.entries = new ArrayList<>(entries);
this.entrySizes = IntStream.range(0, entries.size())
.boxed()
.map(i -> Pair.of(i, sizeOf(entries.get(i))))
.collect(Collectors.toList());
this.totalSize = entrySizes.stream().map(Pair::right).mapToLong(Long::longValue).sum();
this.alreadyMovedEntries = new ArrayList<>(entries.size());
}

public List<SendMessageBatchRequestEntry> getEntries() {
return entries;
}

public long getTotalSize() {
return totalSize;
}

/**
* Get the largest entry that has not been moved to S3 yet
* @return metadata of largest not yet moved entry or empty if all entries have been moved
*/
private Optional<LargestEntryWithMetadata> getLargestNotYetMovedEntry() {
long largestEntrySize = -1;
int largestEntryIdx = -1;
for (Pair<Integer, Long> entryPair : entrySizes) {
Integer currentEntryIdx = entryPair.left();
if (alreadyMovedEntries.contains(currentEntryIdx)) {
continue;
}
Long currentEntrySize = entryPair.right();
if(largestEntrySize < currentEntrySize) {
largestEntrySize = currentEntrySize;
largestEntryIdx = currentEntryIdx;
}
}
if (largestEntryIdx == -1) {
// we moved all entries
return Optional.empty();
}
return Optional.of(
new LargestEntryWithMetadata(entries.get(largestEntryIdx), largestEntryIdx, largestEntrySize)
);
}

/**
* Move the largest entry to S3
* @return a future that completes with true if the entry was moved to S3, false otherwise
*/
public CompletableFuture<Boolean> moveLargestEntryToS3() {
Optional<LargestEntryWithMetadata> largestNotYetMovedEntry = getLargestNotYetMovedEntry();
if (!largestNotYetMovedEntry.isPresent()) {
return CompletableFuture.completedFuture(false);
}
LargestEntryWithMetadata entryWithMeta = largestNotYetMovedEntry.get();
SendMessageBatchRequestEntry largestEntry = entryWithMeta.getEntry();
checkMessageAttributes(this.clientConfiguration.getPayloadSizeThreshold(), largestEntry.messageAttributes());
return storeMessageInS3(largestEntry)
.thenApply(modifiedEntry -> {
alreadyMovedEntries.add(entryWithMeta.getIndex());
entries.set(entryWithMeta.getIndex(), modifiedEntry);
totalSize = totalSize - entryWithMeta.getSize() + sizeOf(modifiedEntry);
return true;
});

}


public CompletableFuture<SendBatchHelper> storeInS3IfNeeded() {
// Verify that total size of batch request is within limits
if (this.getTotalSize() <= clientConfiguration.getPayloadSizeThreshold() && !clientConfiguration.isAlwaysThroughS3()) {
return CompletableFuture.completedFuture(this);
}
// move the largest entry to S3
return this.moveLargestEntryToS3()
// call recursively to check if there are more entries to move to S3
.thenCompose(actuallyMoved -> {
if (actuallyMoved) {
return storeInS3IfNeeded();
} else {
// no more entries to move to S3
return CompletableFuture.completedFuture(this);
}
});
}

}

private static class LargestEntryWithMetadata {
private final SendMessageBatchRequestEntry entry;
private final int index;
private final long size;

public LargestEntryWithMetadata(SendMessageBatchRequestEntry entry, int index, long size) {
this.entry = entry;
this.index = index;
this.size = size;
}

public SendMessageBatchRequestEntry getEntry() {
return entry;
}

public int getIndex() {
return index;
}

public long getSize() {
return size;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
Expand Down Expand Up @@ -404,7 +405,7 @@ public void testWhenMessageBatchIsSentThenOnlyMessagesLargerThanThresholdAreStor
700_000,
800_000,
900_000,
200_000,
150_000,
1000_000
};

Expand All @@ -427,6 +428,40 @@ public void testWhenMessageBatchIsSentThenOnlyMessagesLargerThanThresholdAreStor
verify(mockS3, times(8)).putObject(isA(PutObjectRequest.class), isA(AsyncRequestBody.class));
}


@Test
public void testWhenMessageBatchWithTotalSizeOverTheLimitIsSentThenLargestEntriesAreStoredInS3() {
int[] messageLengthForCounter = new int[] {
10_000,
10_000,
10_000,
150_000,
160_000,
170_000,
180_000,
10_000,
10_000,
10_000
};

List<SendMessageBatchRequestEntry> batchEntries = new ArrayList<>();
for (int i = 0; i < 10; i++) {
int messageLength = messageLengthForCounter[i];
String messageBody = generateStringWithLength(messageLength);
SendMessageBatchRequestEntry entry = SendMessageBatchRequestEntry.builder()
.id("entry_" + i)
.messageBody(messageBody)
.build();
batchEntries.add(entry);
}

SendMessageBatchRequest batchRequest = SendMessageBatchRequest.builder().queueUrl(SQS_QUEUE_URL).entries(batchEntries).build();
extendedSqsWithDefaultConfig.sendMessageBatch(batchRequest).join();

// There should be 3 puts for the 3 largest messages as sum of sizes of others should be within limit
verify(mockS3, times(3)).putObject(isA(PutObjectRequest.class), isA(AsyncRequestBody.class));
}

@Test
public void testWhenMessageBatchIsLargeS3PointerIsCorrectlySentToSQSAndNotOriginalMessage() {
String messageBody = generateStringWithLength(LESS_THAN_SQS_SIZE_LIMIT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,6 @@ public void testWhenMessageBatchIsSentThenOnlyMessagesLargerThanThresholdAreStor

@Test
public void testWhenMessageBatchWithTotalSizeOverTheLimitIsSentThenLargestEntriesAreStoredInS3() {
// This creates 10 messages, out of which only two are below the threshold (100K and 150K),
// and the other 8 are above the threshold

int[] messageLengthForCounter = new int[] {
10_000,
Expand Down