diff --git a/flink-connector-aws/flink-connector-dynamodb/pom.xml b/flink-connector-aws/flink-connector-dynamodb/pom.xml
index 06ae5cd4d..e4cab321e 100644
--- a/flink-connector-aws/flink-connector-dynamodb/pom.xml
+++ b/flink-connector-aws/flink-connector-dynamodb/pom.xml
@@ -71,10 +71,12 @@ under the License.
software.amazon.awssdk
dynamodb
+ 2.32.0
software.amazon.awssdk
dynamodb-enhanced
+ 2.32.0
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java
index 4978f4664..e2307fcf1 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbTypeInformedElementConverter.java
@@ -176,7 +176,7 @@ private TableSchema createTableSchemaFromPojo(
tableSchemaBuilder,
propertyDescriptor.getName(),
BeanAttributeGetter.create(
- typeInfo.getTypeClass(), propertyDescriptor.getReadMethod()),
+ typeInfo.getTypeClass(), propertyDescriptor.getReadMethod(), null),
fieldInfo);
}
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java
index 5b20bc13c..2998b30a1 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSource.java
@@ -55,10 +55,13 @@
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.retries.AdaptiveRetryStrategy;
import software.amazon.awssdk.retries.api.BackoffStrategy;
+import software.amazon.awssdk.retries.api.RetryStrategy;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
import software.amazon.awssdk.utils.AttributeMap;
import java.time.Duration;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
@@ -145,14 +148,18 @@ public SourceReader createReader(
final Duration getRecordsIdlePollingTimeBetweenEmptyPolls =
sourceConfig.get(DYNAMODB_STREAMS_GET_RECORDS_IDLE_TIME_BETWEEN_EMPTY_POLLS);
+ Map> childShardMap = new ConcurrentHashMap<>();
// We create a new stream proxy for each split reader since they have their own independent
// lifecycle.
Supplier splitReaderSupplier =
() ->
new PollingDynamoDbStreamsShardSplitReader(
- createDynamoDbStreamsProxy(sourceConfig),
+ createDynamoDbStreamsProxy(
+ sourceConfig,
+ SdkDefaultRetryStrategy.defaultRetryStrategy()),
getRecordsIdlePollingTimeBetweenNonEmptyPolls,
getRecordsIdlePollingTimeBetweenEmptyPolls,
+ childShardMap,
shardMetricGroupMap);
DynamoDbStreamsRecordEmitter recordEmitter =
new DynamoDbStreamsRecordEmitter<>(deserializationSchema);
@@ -162,6 +169,7 @@ public SourceReader createReader(
recordEmitter,
sourceConfig,
readerContext,
+ childShardMap,
shardMetricGroupMap);
}
@@ -178,11 +186,25 @@ public SourceReader createReader(
SplitEnumeratorContext enumContext,
DynamoDbStreamsSourceEnumeratorState checkpoint)
throws Exception {
+ int maxApiCallAttempts = sourceConfig.get(DYNAMODB_STREAMS_RETRY_COUNT);
+ Duration minDescribeStreamDelay =
+ sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MIN_DELAY);
+ Duration maxDescribeStreamDelay =
+ sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MAX_DELAY);
+ BackoffStrategy backoffStrategy =
+ BackoffStrategy.exponentialDelay(minDescribeStreamDelay, maxDescribeStreamDelay);
+ AdaptiveRetryStrategy adaptiveRetryStrategy =
+ SdkDefaultRetryStrategy.adaptiveRetryStrategy()
+ .toBuilder()
+ .maxAttempts(maxApiCallAttempts)
+ .backoffStrategy(backoffStrategy)
+ .throttlingBackoffStrategy(backoffStrategy)
+ .build();
return new DynamoDbStreamsSourceEnumerator(
enumContext,
streamArn,
sourceConfig,
- createDynamoDbStreamsProxy(sourceConfig),
+ createDynamoDbStreamsProxy(sourceConfig, adaptiveRetryStrategy),
dynamoDbStreamsShardAssigner,
checkpoint);
}
@@ -199,7 +221,8 @@ public SimpleVersionedSerializer getSplitSerializer()
new DynamoDbStreamsShardSplitSerializer());
}
- private DynamoDbStreamsProxy createDynamoDbStreamsProxy(Configuration consumerConfig) {
+ private DynamoDbStreamsProxy createDynamoDbStreamsProxy(
+ Configuration consumerConfig, RetryStrategy retryStrategy) {
SdkHttpClient httpClient =
AWSGeneralUtil.createSyncHttpClient(
AttributeMap.builder().build(), ApacheHttpClient.builder());
@@ -215,26 +238,12 @@ private DynamoDbStreamsProxy createDynamoDbStreamsProxy(Configuration consumerCo
consumerConfig.addAllToProperties(dynamoDbStreamsClientProperties);
AWSGeneralUtil.validateAwsCredentials(dynamoDbStreamsClientProperties);
- int maxApiCallAttempts = sourceConfig.get(DYNAMODB_STREAMS_RETRY_COUNT);
- Duration minDescribeStreamDelay =
- sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MIN_DELAY);
- Duration maxDescribeStreamDelay =
- sourceConfig.get(DYNAMODB_STREAMS_EXPONENTIAL_BACKOFF_MAX_DELAY);
- BackoffStrategy backoffStrategy =
- BackoffStrategy.exponentialDelay(minDescribeStreamDelay, maxDescribeStreamDelay);
- AdaptiveRetryStrategy adaptiveRetryStrategy =
- SdkDefaultRetryStrategy.adaptiveRetryStrategy()
- .toBuilder()
- .maxAttempts(maxApiCallAttempts)
- .backoffStrategy(backoffStrategy)
- .throttlingBackoffStrategy(backoffStrategy)
- .build();
DynamoDbStreamsClient dynamoDbStreamsClient =
AWSClientUtil.createAwsSyncClient(
dynamoDbStreamsClientProperties,
httpClient,
DynamoDbStreamsClient.builder(),
- ClientOverrideConfiguration.builder().retryStrategy(adaptiveRetryStrategy),
+ ClientOverrideConfiguration.builder().retryStrategy(retryStrategy),
DynamodbStreamsSourceConfigConstants
.BASE_DDB_STREAMS_USER_AGENT_PREFIX_FORMAT,
DynamodbStreamsSourceConfigConstants.DDB_STREAMS_CLIENT_USER_AGENT_PREFIX);
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java
index b16833e51..7d031cf9a 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/config/DynamodbStreamsSourceConfigConstants.java
@@ -76,6 +76,9 @@ public enum InitialPosition {
public static final String BASE_DDB_STREAMS_USER_AGENT_PREFIX_FORMAT =
"Apache Flink %s (%s) DynamoDb Streams Connector";
+ public static final String DYNAMODB_STREAMS_THROTTLING_EXCEPTION_ERROR_CODE =
+ "ThrottlingException";
+
public static final ConfigOption
DYNAMODB_STREAMS_GET_RECORDS_IDLE_TIME_BETWEEN_EMPTY_POLLS =
ConfigOptions.key("flink.dynamodbstreams.getrecords.empty.mindelay")
@@ -91,6 +94,10 @@ public enum InitialPosition {
.withDescription(
"The default idle time between non-empty polls for DynamoDB Streams GetRecords API");
+ public static final int MAX_RETRY_ATTEMPTS_FOR_CHILD_SHARDS = 5;
+ public static final Duration CHILD_SHARD_DISCOVERY_MIN_DELAY = Duration.ofMillis(100);
+ public static final Duration CHILD_SHARD_DISCOVERY_MAX_DELAY = Duration.ofMillis(1000);
+
/** DynamoDb Streams identifier for user agent prefix. */
public static final String DDB_STREAMS_CLIENT_USER_AGENT_PREFIX =
"aws.dynamodbstreams.client.user-agent-prefix";
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java
index 0c2f00b11..fd4d56558 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumerator.java
@@ -27,6 +27,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.InitialPosition;
import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEvent;
+import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEventContext;
import org.apache.flink.connector.dynamodb.source.enumerator.tracker.SplitGraphInconsistencyTracker;
import org.apache.flink.connector.dynamodb.source.enumerator.tracker.SplitTracker;
import org.apache.flink.connector.dynamodb.source.exception.DynamoDbStreamsSourceException;
@@ -138,7 +139,20 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
/** When we mark a split as finished, we will only assign its child splits to the subtasks. */
private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinishedEvent) {
- splitTracker.markAsFinished(splitsFinishedEvent.getFinishedSplitIds());
+ Set finishedSplitIds =
+ splitsFinishedEvent.getFinishedSplits().stream()
+ .map(SplitsFinishedEventContext::getSplitId)
+ .collect(Collectors.toSet());
+ splitTracker.markAsFinished(finishedSplitIds);
+ List childrenOfFinishedSplits = new ArrayList<>();
+ splitsFinishedEvent
+ .getFinishedSplits()
+ .forEach(
+ finishedSplitEvent ->
+ childrenOfFinishedSplits.addAll(
+ finishedSplitEvent.getChildSplits()));
+ LOG.info("Adding Children of finishedSplits to splitTracker: {}", childrenOfFinishedSplits);
+ splitTracker.addChildSplits(childrenOfFinishedSplits);
Set splitsAssignment = splitAssignment.get(subtaskId);
// during recovery, splitAssignment may return null since there might be no split assigned
@@ -152,13 +166,12 @@ private void handleFinishedSplits(int subtaskId, SplitsFinishedEvent splitsFinis
+ "Child shard discovery might be delayed until we have enough readers."
+ "Finished split ids: {}",
subtaskId,
- splitsFinishedEvent.getFinishedSplitIds());
+ finishedSplitIds);
return;
}
- splitsAssignment.removeIf(
- split -> splitsFinishedEvent.getFinishedSplitIds().contains(split.splitId()));
- assignChildSplits(splitsFinishedEvent.getFinishedSplitIds());
+ splitsAssignment.removeIf(split -> finishedSplitIds.contains(split.splitId()));
+ assignChildSplits(finishedSplitIds);
}
private void processDiscoveredSplits(ListShardsResult discoveredSplits, Throwable throwable) {
@@ -230,6 +243,7 @@ private void assignAllAvailableSplits() {
private void assignChildSplits(Set finishedSplitIds) {
List splitsAvailableForAssignment =
splitTracker.getUnassignedChildSplits(finishedSplitIds);
+ LOG.info("Unassigned child splits: {}", splitsAvailableForAssignment);
assignSplits(splitsAvailableForAssignment);
}
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEvent.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEvent.java
index 0da5f01a5..22659c46b 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEvent.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEvent.java
@@ -22,26 +22,27 @@
import org.apache.flink.api.connector.source.SourceEvent;
import java.util.Set;
+import java.util.stream.Collectors;
/** Source event used by source reader to communicate that splits are finished to enumerator. */
@Internal
public class SplitsFinishedEvent implements SourceEvent {
private static final long serialVersionUID = 1;
- private final Set finishedSplitIds;
+ private final Set finishedSplits;
- public SplitsFinishedEvent(Set finishedSplitIds) {
- this.finishedSplitIds = finishedSplitIds;
+ public SplitsFinishedEvent(Set finishedSplits) {
+ this.finishedSplits = finishedSplits;
}
- public Set getFinishedSplitIds() {
- return finishedSplitIds;
+ public Set getFinishedSplits() {
+ return finishedSplits;
}
@Override
public String toString() {
return "SplitsFinishedEvent{"
- + "finishedSplitIds=["
- + String.join(",", finishedSplitIds)
+ + "finishedSplit=["
+ + finishedSplits.stream().map(Object::toString).collect(Collectors.joining(","))
+ "]}";
}
}
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEventContext.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEventContext.java
new file mode 100644
index 000000000..1f2535877
--- /dev/null
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/event/SplitsFinishedEventContext.java
@@ -0,0 +1,28 @@
+package org.apache.flink.connector.dynamodb.source.enumerator.event;
+
+import org.apache.flink.annotation.Internal;
+
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.io.Serializable;
+import java.util.List;
+
+/** Context which contains the split id and the finished splits for a finished split event. */
+@Internal
+public class SplitsFinishedEventContext implements Serializable {
+ String splitId;
+ List childSplits;
+
+ public SplitsFinishedEventContext(String splitId, List childSplits) {
+ this.splitId = splitId;
+ this.childSplits = childSplits;
+ }
+
+ public String getSplitId() {
+ return splitId;
+ }
+
+ public List getChildSplits() {
+ return childSplits;
+ }
+}
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java
index e655d4cde..042a216f7 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/enumerator/tracker/SplitTracker.java
@@ -102,6 +102,10 @@ public void addSplits(Collection shardsToAdd) {
addSplitsForLatest(shardsToAdd);
}
+ public void addChildSplits(Collection childShardsToAdd) {
+ addSplitsForTrimHorizon(childShardsToAdd);
+ }
+
private void addSplitsForLatest(Collection shardsToAdd) {
List openShards =
shardsToAdd.stream()
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java
index 537b1bf59..74e7390f5 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxy.java
@@ -32,6 +32,7 @@
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
import software.amazon.awssdk.services.dynamodb.model.StreamStatus;
import software.amazon.awssdk.services.dynamodb.model.TrimmedDataAccessException;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
@@ -88,6 +89,24 @@ public ListShardsResult listShards(String streamArn, @Nullable String lastSeenSh
return listShardsResult;
}
+ @Override
+ public ListShardsResult listShardsWithFilter(String streamArn, ShardFilter shardFilter) {
+ LOG.info("Child shards with filter called, for shardId: {}", shardFilter.shardId());
+ ListShardsResult listShardsResult = new ListShardsResult();
+
+ try {
+ DescribeStreamResponse describeStreamResponse =
+ this.describeStream(streamArn, shardFilter);
+ listShardsResult.addShards(describeStreamResponse.streamDescription().shards());
+ listShardsResult.setStreamStatus(
+ describeStreamResponse.streamDescription().streamStatus());
+ } catch (Exception e) {
+ LOG.error("DescribeStream with Filter API threw an exception", e);
+ }
+ LOG.info("Child shards returned for shardId: {}", listShardsResult);
+ return listShardsResult;
+ }
+
@Override
public GetRecordsResponse getRecords(
String streamArn, String shardId, StartingPosition startingPosition) {
@@ -170,6 +189,30 @@ private DescribeStreamResponse describeStream(String streamArn, @Nullable String
return describeStreamResponse;
}
+ private DescribeStreamResponse describeStream(String streamArn, ShardFilter shardFilter) {
+ final DescribeStreamRequest describeStreamRequest =
+ DescribeStreamRequest.builder()
+ .streamArn(streamArn)
+ .shardFilter(shardFilter)
+ .build();
+
+ DescribeStreamResponse describeStreamResponse =
+ dynamoDbStreamsClient.describeStream(describeStreamRequest);
+
+ StreamStatus streamStatus = describeStreamResponse.streamDescription().streamStatus();
+ if (streamStatus.equals(StreamStatus.ENABLING)) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn(
+ String.format(
+ "The status of stream %s is %s ; result of the current "
+ + "describeStream operation will not contain any shard information.",
+ streamArn, streamStatus));
+ }
+ }
+
+ return describeStreamResponse;
+ }
+
private String getShardIterator(
String streamArn, String shardId, StartingPosition startingPosition) {
GetShardIteratorRequest.Builder requestBuilder =
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/StreamProxy.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/StreamProxy.java
index fa9bf7c9f..9de83de99 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/StreamProxy.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/proxy/StreamProxy.java
@@ -23,6 +23,7 @@
import org.apache.flink.connector.dynamodb.source.util.ListShardsResult;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
import javax.annotation.Nullable;
@@ -41,6 +42,8 @@ public interface StreamProxy extends Closeable {
*/
ListShardsResult listShards(String streamArn, @Nullable String lastSeenShardId);
+ ListShardsResult listShardsWithFilter(String streamArn, ShardFilter shardFilter);
+
/**
* Retrieves records from the stream.
*
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java
index 90a105dd5..b60e3d03e 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReader.java
@@ -25,6 +25,7 @@
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEvent;
+import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEventContext;
import org.apache.flink.connector.dynamodb.source.metrics.DynamoDbStreamsShardMetrics;
import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplitState;
@@ -32,6 +33,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.model.Record;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
import java.util.ArrayList;
import java.util.Collections;
@@ -41,6 +43,7 @@
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
+import java.util.stream.Collectors;
/**
* Coordinates the reading from assigned splits. Runs on the TaskManager.
@@ -55,6 +58,7 @@ public class DynamoDbStreamsSourceReader
private static final Logger LOG = LoggerFactory.getLogger(DynamoDbStreamsSourceReader.class);
private final Map shardMetricGroupMap;
private final NavigableMap> splitFinishedEvents;
+ private final Map> childShardIdMap;
private long currentCheckpointId;
public DynamoDbStreamsSourceReader(
@@ -62,10 +66,12 @@ public DynamoDbStreamsSourceReader(
RecordEmitter recordEmitter,
Configuration config,
SourceReaderContext context,
+ Map> childShardIdMap,
Map shardMetricGroupMap) {
super(splitFetcherManager, recordEmitter, config, context);
this.shardMetricGroupMap = shardMetricGroupMap;
this.splitFinishedEvents = new TreeMap<>();
+ this.childShardIdMap = childShardIdMap;
this.currentCheckpointId = Long.MIN_VALUE;
}
@@ -83,19 +89,40 @@ protected void onSplitFinished(Map finis
splitFinishedEvents.computeIfAbsent(currentCheckpointId, k -> new HashSet<>());
finishedSplitIds.values().stream()
.map(
- finishedSplit ->
- new DynamoDbStreamsShardSplit(
- finishedSplit.getStreamArn(),
- finishedSplit.getShardId(),
- finishedSplit.getNextStartingPosition(),
- finishedSplit
- .getDynamoDbStreamsShardSplit()
- .getParentShardId(),
- true))
+ finishedSplit -> {
+ List childSplits = new ArrayList<>();
+ String finishedSplitId = finishedSplit.getSplitId();
+ if (childShardIdMap.containsKey(finishedSplitId)) {
+ List childSplitIdsOfFinishedSplit =
+ childShardIdMap.get(finishedSplitId);
+ childSplits.addAll(childSplitIdsOfFinishedSplit);
+ }
+ return new DynamoDbStreamsShardSplit(
+ finishedSplit.getStreamArn(),
+ finishedSplit.getShardId(),
+ finishedSplit.getNextStartingPosition(),
+ finishedSplit.getDynamoDbStreamsShardSplit().getParentShardId(),
+ true,
+ childSplits);
+ })
.forEach(split -> splitFinishedEvents.get(currentCheckpointId).add(split));
+ Set splitsFinishedEventContextMap =
+ finishedSplitIds.values().stream()
+ .map(
+ finishedSplit -> {
+ List childSplits = new ArrayList<>();
+ String finishedSplitId = finishedSplit.getSplitId();
+ if (childShardIdMap.containsKey(finishedSplitId)) {
+ childSplits.addAll(childShardIdMap.remove(finishedSplitId));
+ }
+ return new SplitsFinishedEventContext(
+ finishedSplitId, childSplits);
+ })
+ .collect(Collectors.toSet());
+ LOG.info("Sending splits finished event to coordinator: {}", splitsFinishedEventContextMap);
context.sendSourceEventToCoordinator(
- new SplitsFinishedEvent(new HashSet<>(finishedSplitIds.keySet())));
+ new SplitsFinishedEvent(splitsFinishedEventContextMap));
finishedSplitIds.keySet().forEach(this::unregisterShardMetricGroup);
}
@@ -121,8 +148,10 @@ public void addSplits(List splits) {
// buffer. If the next checkpoint doesn't complete,
// we would go back to the previous checkpointed
// state which will again replay these split finished events.
+ SplitsFinishedEventContext splitsFinishedEventContext =
+ new SplitsFinishedEventContext(split.splitId(), split.getChildSplits());
context.sendSourceEventToCoordinator(
- new SplitsFinishedEvent(Collections.singleton(split.splitId())));
+ new SplitsFinishedEvent(Collections.singleton(splitsFinishedEventContext)));
} else {
dynamoDbStreamsShardSplits.add(split);
}
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java
index 8a516bed2..3da012d58 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReader.java
@@ -27,11 +27,16 @@
import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplitState;
import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
+import org.apache.flink.connector.dynamodb.source.util.ListShardsResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.model.Record;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilterType;
+import software.amazon.awssdk.services.dynamodb.model.StreamStatus;
import javax.annotation.Nullable;
@@ -43,10 +48,14 @@
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import static java.util.Collections.singleton;
+import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.CHILD_SHARD_DISCOVERY_MAX_DELAY;
+import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.CHILD_SHARD_DISCOVERY_MIN_DELAY;
+import static org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants.MAX_RETRY_ATTEMPTS_FOR_CHILD_SHARDS;
/**
* An implementation of the SplitReader that periodically polls the DynamoDb stream to retrieve
@@ -64,6 +73,7 @@ public class PollingDynamoDbStreamsShardSplitReader
private final Duration getRecordsIdlePollingTimeBetweenEmptyPolls;
private final Deque assignedSplits;
+ private final Map> childShardMap;
private final Map shardMetricGroupMap;
private final Set pausedSplitIds;
private static final Logger LOG =
@@ -73,6 +83,7 @@ public PollingDynamoDbStreamsShardSplitReader(
StreamProxy dynamodbStreamsProxy,
Duration getRecordsIdlePollingTimeBetweenNonEmptyPolls,
Duration getRecordsIdlePollingTimeBetweenEmptyPolls,
+ Map> childShardMap,
Map shardMetricGroupMap) {
this.dynamodbStreams = dynamodbStreamsProxy;
this.getRecordsIdlePollingTimeBetweenNonEmptyPolls =
@@ -80,6 +91,7 @@ public PollingDynamoDbStreamsShardSplitReader(
this.getRecordsIdlePollingTimeBetweenEmptyPolls =
getRecordsIdlePollingTimeBetweenEmptyPolls;
this.shardMetricGroupMap = shardMetricGroupMap;
+ this.childShardMap = childShardMap;
this.assignedSplits = new ArrayDeque<>();
this.pausedSplitIds = new HashSet<>();
}
@@ -106,6 +118,49 @@ public RecordsWithSplitIds fetch() throws IOException {
}
long currentTime = System.currentTimeMillis();
+
+ if (splitContext.splitState.isHasShardEndReached()) {
+ if (!splitContext.hasAttemptedChildShardDiscovery) {
+ splitContext.hasAttemptedChildShardDiscovery = true;
+ splitContext.childShardDiscoveryAttempts = 0;
+ }
+
+ if (splitContext.childShardDiscoveryAttempts < MAX_RETRY_ATTEMPTS_FOR_CHILD_SHARDS) {
+ long nextChildShardDiscoveryEligibleTime =
+ getNextEligibleTimeForChildDiscovery(splitContext);
+ if (currentTime >= nextChildShardDiscoveryEligibleTime) {
+ ListShardsResult listShardsResult =
+ dynamodbStreams.listShardsWithFilter(
+ splitContext.splitState.getStreamArn(),
+ ShardFilter.builder()
+ .shardId(splitContext.splitState.getShardId())
+ .type(ShardFilterType.CHILD_SHARDS)
+ .build());
+ if (!StreamStatus.ENABLED.equals(listShardsResult.getStreamStatus())) {
+ return new DynamoDbStreamRecordsWithSplitIds(
+ Collections.emptyIterator(),
+ splitContext.splitState.getSplitId(),
+ true);
+ }
+ List childShards = listShardsResult.getShards();
+ if (!childShards.isEmpty()) {
+ this.childShardMap.put(splitContext.splitState.getSplitId(), childShards);
+ return new DynamoDbStreamRecordsWithSplitIds(
+ Collections.emptyIterator(),
+ splitContext.splitState.getSplitId(),
+ true);
+ }
+ splitContext.childShardDiscoveryAttempts++;
+ splitContext.lastChildShardDiscoveryAttemptTime = currentTime;
+ }
+ assignedSplits.add(splitContext);
+ return INCOMPLETE_SHARD_EMPTY_RECORDS;
+ } else {
+ return new DynamoDbStreamRecordsWithSplitIds(
+ Collections.emptyIterator(), splitContext.splitState.getSplitId(), true);
+ }
+ }
+
long nextEligibleTime = getNextEligibleTime(splitContext);
LOG.debug(
@@ -132,15 +187,11 @@ public RecordsWithSplitIds fetch() throws IOException {
splitContext.lastPollTimeMillis = currentTime;
splitContext.wasLastPollEmpty = isEmptyPoll;
+ splitContext.splitState.setHasShardEndReached(isComplete);
+ assignedSplits.add(splitContext);
if (isEmptyPoll) {
- if (isComplete) {
- return new DynamoDbStreamRecordsWithSplitIds(
- Collections.emptyIterator(), splitContext.splitState.getSplitId(), true);
- } else {
- assignedSplits.add(splitContext);
- return INCOMPLETE_SHARD_EMPTY_RECORDS;
- }
+ return INCOMPLETE_SHARD_EMPTY_RECORDS;
} else {
DynamoDbStreamsShardMetrics shardMetrics =
shardMetricGroupMap.get(splitContext.splitState.getShardId());
@@ -164,13 +215,20 @@ public RecordsWithSplitIds fetch() throws IOException {
.dynamodb()
.sequenceNumber()));
- if (!isComplete) {
- assignedSplits.add(splitContext);
- }
return new DynamoDbStreamRecordsWithSplitIds(
getRecordsResponse.records().iterator(),
splitContext.splitState.getSplitId(),
- isComplete);
+ false);
+ }
+
+ private long getNextEligibleTimeForChildDiscovery(
+ DynamoDbStreamsShardSplitWithContext splitContext) {
+ long baseDelay = CHILD_SHARD_DISCOVERY_MIN_DELAY.toMillis();
+ long maxDelay = CHILD_SHARD_DISCOVERY_MAX_DELAY.toMillis();
+
+ long exponentialDelay =
+ Math.min(baseDelay * (1L << splitContext.childShardDiscoveryAttempts), maxDelay);
+ return splitContext.lastChildShardDiscoveryAttemptTime + exponentialDelay;
}
private void sleep(long milliseconds) throws IOException {
@@ -254,11 +312,15 @@ private static class DynamoDbStreamsShardSplitWithContext {
final DynamoDbStreamsShardSplitState splitState;
long lastPollTimeMillis;
boolean wasLastPollEmpty;
+ boolean hasAttemptedChildShardDiscovery;
+ int childShardDiscoveryAttempts;
+ long lastChildShardDiscoveryAttemptTime;
DynamoDbStreamsShardSplitWithContext(DynamoDbStreamsShardSplitState splitState) {
this.splitState = splitState;
this.lastPollTimeMillis = System.currentTimeMillis();
this.wasLastPollEmpty = false;
+ hasAttemptedChildShardDiscovery = false;
}
}
}
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java
index b79f1e829..967c309d9 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplit.java
@@ -23,7 +23,12 @@
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.dynamodb.source.enumerator.DynamoDbStreamsSourceEnumerator;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+
+import java.util.ArrayList;
+import java.util.List;
import java.util.Objects;
+import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -41,6 +46,7 @@ public final class DynamoDbStreamsShardSplit implements SourceSplit {
private final StartingPosition startingPosition;
private final String parentShardId;
private final boolean isFinished;
+ private final List childSplits;
public DynamoDbStreamsShardSplit(
String streamArn,
@@ -50,12 +56,31 @@ public DynamoDbStreamsShardSplit(
this(streamArn, shardId, startingPosition, parentShardId, false);
}
+ public DynamoDbStreamsShardSplit(
+ String streamArn,
+ String shardId,
+ StartingPosition startingPosition,
+ String parentShardId,
+ List childSplits) {
+ this(streamArn, shardId, startingPosition, parentShardId, false, childSplits);
+ }
+
public DynamoDbStreamsShardSplit(
String streamArn,
String shardId,
StartingPosition startingPosition,
String parentShardId,
boolean isFinished) {
+ this(streamArn, shardId, startingPosition, parentShardId, isFinished, new ArrayList<>());
+ }
+
+ public DynamoDbStreamsShardSplit(
+ String streamArn,
+ String shardId,
+ StartingPosition startingPosition,
+ String parentShardId,
+ boolean isFinished,
+ List childSplits) {
checkNotNull(streamArn, "streamArn cannot be null");
checkNotNull(shardId, "shardId cannot be null");
checkNotNull(startingPosition, "startingPosition cannot be null");
@@ -65,6 +90,7 @@ public DynamoDbStreamsShardSplit(
this.startingPosition = startingPosition;
this.parentShardId = parentShardId;
this.isFinished = isFinished;
+ this.childSplits = childSplits;
}
@Override
@@ -92,6 +118,10 @@ public boolean isFinished() {
return isFinished;
}
+ public List getChildSplits() {
+ return childSplits;
+ }
+
@Override
public String toString() {
return "DynamoDbStreamsShardSplit{"
@@ -108,7 +138,9 @@ public String toString() {
+ "]"
+ ", isFinished="
+ isFinished
- + "}";
+ + ", childSplitIds=["
+ + childSplits.stream().map(Shard::toString).collect(Collectors.joining(","))
+ + "]}";
}
@Override
@@ -124,11 +156,13 @@ public boolean equals(Object o) {
&& Objects.equals(shardId, that.shardId)
&& Objects.equals(startingPosition, that.startingPosition)
&& Objects.equals(parentShardId, that.parentShardId)
- && Objects.equals(isFinished, that.isFinished);
+ && Objects.equals(isFinished, that.isFinished)
+ && Objects.equals(childSplits, that.childSplits);
}
@Override
public int hashCode() {
- return Objects.hash(streamArn, shardId, startingPosition, parentShardId, isFinished);
+ return Objects.hash(
+ streamArn, shardId, startingPosition, parentShardId, isFinished, childSplits);
}
}
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializer.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializer.java
index b10bfce3a..3e7989046 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializer.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializer.java
@@ -22,6 +22,8 @@
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.io.VersionMismatchException;
+import software.amazon.awssdk.services.dynamodb.model.SequenceNumberRange;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType;
import java.io.ByteArrayInputStream;
@@ -29,8 +31,10 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
/**
@@ -41,8 +45,8 @@
public class DynamoDbStreamsShardSplitSerializer
implements SimpleVersionedSerializer {
- private static final Set COMPATIBLE_VERSIONS = new HashSet<>(Arrays.asList(0, 1));
- private static final int CURRENT_VERSION = 1;
+ private static final Set COMPATIBLE_VERSIONS = new HashSet<>(Arrays.asList(0, 1, 2));
+ private static final int CURRENT_VERSION = 2;
@Override
public int getVersion() {
@@ -74,6 +78,18 @@ public byte[] serialize(DynamoDbStreamsShardSplit split) throws IOException {
out.writeUTF(split.getParentShardId());
}
out.writeBoolean(split.isFinished());
+ out.writeInt(split.getChildSplits().size());
+ for (Shard childSplit : split.getChildSplits()) {
+ out.writeUTF(childSplit.shardId());
+ out.writeUTF(childSplit.parentShardId());
+ out.writeUTF(childSplit.sequenceNumberRange().startingSequenceNumber());
+ if (childSplit.sequenceNumberRange().endingSequenceNumber() == null) {
+ out.writeBoolean(false);
+ } else {
+ out.writeBoolean(true);
+ out.writeUTF(childSplit.sequenceNumberRange().endingSequenceNumber());
+ }
+ }
out.flush();
return baos.toByteArray();
@@ -116,12 +132,41 @@ public DynamoDbStreamsShardSplit deserialize(int version, byte[] serialized)
isFinished = in.readBoolean();
}
+ int childSplitSize = 0;
+ List childSplits = new ArrayList<>();
+ if (version > 1) {
+ childSplitSize = in.readInt();
+ if (childSplitSize > 0) {
+ for (int i = 0; i < childSplitSize; i++) {
+ String splitId = in.readUTF();
+ String parentSplitId = in.readUTF();
+ String startingSequenceNumber = in.readUTF();
+ String endingSequenceNumber = null;
+ if (in.readBoolean()) {
+ endingSequenceNumber = in.readUTF();
+ }
+ childSplits.add(
+ Shard.builder()
+ .shardId(splitId)
+ .parentShardId(parentSplitId)
+ .sequenceNumberRange(
+ SequenceNumberRange.builder()
+ .startingSequenceNumber(
+ startingSequenceNumber)
+ .endingSequenceNumber(endingSequenceNumber)
+ .build())
+ .build());
+ }
+ }
+ }
+
return new DynamoDbStreamsShardSplit(
streamArn,
shardId,
new StartingPosition(shardIteratorType, startingMarker),
parentShardId,
- isFinished);
+ isFinished,
+ childSplits);
}
}
}
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitState.java b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitState.java
index 47e20a132..08226106d 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitState.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitState.java
@@ -29,10 +29,12 @@ public class DynamoDbStreamsShardSplitState {
private final DynamoDbStreamsShardSplit dynamoDbStreamsShardSplit;
private StartingPosition nextStartingPosition;
private String nextShardIterator;
+ private boolean hasShardEndReached;
public DynamoDbStreamsShardSplitState(DynamoDbStreamsShardSplit dynamoDbStreamsShardSplit) {
this.dynamoDbStreamsShardSplit = dynamoDbStreamsShardSplit;
this.nextStartingPosition = dynamoDbStreamsShardSplit.getStartingPosition();
+ this.hasShardEndReached = false;
}
public DynamoDbStreamsShardSplit getDynamoDbStreamsShardSplit() {
@@ -70,4 +72,12 @@ public String getNextShardIterator() {
public void setNextShardIterator(String nextShardIterator) {
this.nextShardIterator = nextShardIterator;
}
+
+ public boolean isHasShardEndReached() {
+ return hasShardEndReached;
+ }
+
+ public void setHasShardEndReached(boolean hasShardEndReached) {
+ this.hasShardEndReached = hasShardEndReached;
+ }
}
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java
index 031d6de6d..daaca2237 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/enumerator/DynamoDbStreamsSourceEnumeratorTest.java
@@ -25,6 +25,7 @@
import org.apache.flink.connector.dynamodb.source.config.DynamodbStreamsSourceConfigConstants;
import org.apache.flink.connector.dynamodb.source.enumerator.assigner.ShardAssignerFactory;
import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEvent;
+import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEventContext;
import org.apache.flink.connector.dynamodb.source.proxy.StreamProxy;
import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
import org.apache.flink.connector.dynamodb.source.split.StartingPosition;
@@ -304,21 +305,29 @@ void testLatestAssignsChildShardsWithTrimHorizonDuringPeriodicDiscovery() throws
};
streamProxy.addShards(childShards);
enumerator.handleSourceEvent(
- subtaskId, new SplitsFinishedEvent(Collections.singleton(shards[2].shardId())));
- // Given no resharding occurs (list of shards remains the same)
- // When first periodic discovery runs
- context.runPeriodicCallable(0);
- // Then no additional splits are assigned
- SplitsAssignment periodicDiscoverySplitAssignment =
- context.getSplitsAssignmentSequence().get(2);
+ subtaskId,
+ new SplitsFinishedEvent(
+ Collections.singleton(
+ new SplitsFinishedEventContext(
+ shards[2].shardId(),
+ Collections.singletonList(childShards[0])))));
+
DynamoDbStreamsShardSplit childSplit =
new DynamoDbStreamsShardSplit(
STREAM_ARN,
childShards[0].shardId(),
StartingPosition.fromStart(),
shards[2].shardId());
- assertThat(periodicDiscoverySplitAssignment.assignment().get(subtaskId))
+ assertThat(context.getSplitsAssignmentSequence().get(1).assignment().get(subtaskId))
.containsExactly(childSplit);
+ // Given no resharding occurs (list of shards remains the same)
+ // When first periodic discovery runs
+ context.runPeriodicCallable(0);
+ // Then no additional splits are assigned
+ SplitsAssignment periodicDiscoverySplitAssignment =
+ context.getSplitsAssignmentSequence().get(2);
+ assertThat(periodicDiscoverySplitAssignment.assignment().get(subtaskId))
+ .isNullOrEmpty();
}
}
@@ -765,7 +774,12 @@ void testHandleSplitFinishedEventTest() throws Throwable {
context.runNextOneTimeCallable();
enumerator.handleSourceEvent(
- 1, new SplitsFinishedEvent(Collections.singleton(completedShard.shardId())));
+ 1,
+ new SplitsFinishedEvent(
+ Collections.singleton(
+ new SplitsFinishedEventContext(
+ completedShard.shardId(),
+ Collections.singletonList(shards[1])))));
// When restored from state
DynamoDbStreamsSourceEnumeratorState snapshotState = enumerator.snapshotState(1);
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxyTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxyTest.java
index 8c5b20420..95e98db64 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxyTest.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/proxy/DynamoDbStreamsProxyTest.java
@@ -38,19 +38,22 @@
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.awssdk.services.dynamodb.model.ResourceNotFoundException;
import software.amazon.awssdk.services.dynamodb.model.Shard;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilterType;
import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType;
import software.amazon.awssdk.services.dynamodb.model.StreamDescription;
import software.amazon.awssdk.services.dynamodb.model.StreamStatus;
import software.amazon.awssdk.services.dynamodb.model.TrimmedDataAccessException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import static org.apache.flink.connector.dynamodb.source.util.TestUtil.generateShardId;
-import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatNoException;
/** Tests to validate {@link DynamoDbStreamsProxy}. */
@@ -88,6 +91,86 @@ void testListShards(String lastSeenShardId) {
.isEqualTo(expectedListShardsResult);
}
+ @Test
+ void testListShardsWithFilterForChildShards() {
+ final String streamArn =
+ "arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-01-01T00:00:00.826";
+ final String parentShardId = "shardId-000000000001";
+
+ // Create child shards that we expect to be returned
+ final List childShards =
+ Arrays.asList(
+ Shard.builder()
+ .shardId("shardId-000000000002")
+ .parentShardId(parentShardId)
+ .build(),
+ Shard.builder()
+ .shardId("shardId-000000000003")
+ .parentShardId(parentShardId)
+ .build());
+
+ // Create some other shards that should not be returned
+ final List otherShards =
+ Arrays.asList(
+ Shard.builder()
+ .shardId("shardId-000000000004")
+ .parentShardId("different-parent")
+ .build(),
+ Shard.builder().shardId("shardId-000000000005").build());
+
+ // Set up the expected response
+ final ListShardsResult expectedResult = new ListShardsResult();
+ expectedResult.addShards(childShards);
+ expectedResult.setStreamStatus(StreamStatus.ENABLED);
+
+ // Create describe stream response with all shards
+ List allShards = new ArrayList<>();
+ allShards.addAll(childShards);
+ allShards.addAll(otherShards);
+
+ DescribeStreamResponse describeStreamResponse =
+ DescribeStreamResponse.builder()
+ .streamDescription(
+ StreamDescription.builder()
+ .shards(allShards)
+ .streamStatus(StreamStatus.ENABLED)
+ .lastEvaluatedShardId(null)
+ .build())
+ .build();
+
+ TestingDynamoDbStreamsClient testingDynamoDbStreamsClient =
+ new TestingDynamoDbStreamsClient();
+
+ // Verify the correct request is made
+ testingDynamoDbStreamsClient.setDescribeStreamValidation(
+ request -> {
+ assertThat(request.streamArn()).isEqualTo(streamArn);
+ assertThat(request.shardFilter()).isNotNull();
+ assertThat(request.shardFilter().type())
+ .isEqualTo(ShardFilterType.CHILD_SHARDS);
+ assertThat(request.shardFilter().shardId()).isEqualTo(parentShardId);
+ });
+
+ testingDynamoDbStreamsClient.setDescribeStreamResponse(describeStreamResponse);
+
+ DynamoDbStreamsProxy dynamoDbStreamsProxy =
+ new DynamoDbStreamsProxy(testingDynamoDbStreamsClient, HTTP_CLIENT);
+
+ // Create the filter for child shards
+ ShardFilter childShardFilter =
+ ShardFilter.builder()
+ .type(ShardFilterType.CHILD_SHARDS)
+ .shardId(parentShardId)
+ .build();
+
+ // Execute the method and verify results
+ ListShardsResult result =
+ dynamoDbStreamsProxy.listShardsWithFilter(streamArn, childShardFilter);
+
+ assertThat(result.getShards()).hasSize(2).containsExactlyInAnyOrderElementsOf(childShards);
+ assertThat(result.getStreamStatus()).isEqualTo(StreamStatus.ENABLED);
+ }
+
@Test
void testGetRecordsInitialReadFromTrimHorizon() {
final String streamArn =
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReaderTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReaderTest.java
index d66c0bbfd..12e8ac4e9 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReaderTest.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/DynamoDbStreamsSourceReaderTest.java
@@ -22,6 +22,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEvent;
+import org.apache.flink.connector.dynamodb.source.enumerator.event.SplitsFinishedEventContext;
import org.apache.flink.connector.dynamodb.source.metrics.DynamoDbStreamsShardMetrics;
import org.apache.flink.connector.dynamodb.source.proxy.StreamProxy;
import org.apache.flink.connector.dynamodb.source.split.DynamoDbStreamsShardSplit;
@@ -37,6 +38,7 @@
import org.junit.jupiter.api.Test;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -73,6 +75,7 @@ public void init() {
testStreamProxy,
NON_EMPTY_POLLING_DELAY_MILLIS,
EMPTY_POLLING_DELAY_MILLIS,
+ new ConcurrentHashMap<>(),
shardMetricGroupMap);
testingReaderContext =
@@ -84,6 +87,7 @@ public void init() {
new DynamoDbStreamsRecordEmitter<>(null),
new Configuration(),
testingReaderContext,
+ new ConcurrentHashMap<>(),
shardMetricGroupMap);
}
@@ -122,12 +126,14 @@ void testOnSplitFinishedEventSent() {
List events = testingReaderContext.getSentEvents();
- Set expectedSplitIds = Collections.singleton(split.splitId());
+ Set expectedFinishedSplits =
+ Collections.singleton(
+ new SplitsFinishedEventContext(split.splitId(), new ArrayList<>()));
assertThat(events)
.singleElement()
.isInstanceOf(SplitsFinishedEvent.class)
.usingRecursiveComparison()
- .isEqualTo(new SplitsFinishedEvent(expectedSplitIds));
+ .isEqualTo(new SplitsFinishedEvent(expectedFinishedSplits));
}
@Test
@@ -225,8 +231,10 @@ void testAddSplitsWithStateRestoration() throws Exception {
.allSatisfy(
e -> {
SplitsFinishedEvent event = (SplitsFinishedEvent) e;
- assertThat(event.getFinishedSplitIds()).hasSize(1);
- assertThat(event.getFinishedSplitIds())
+ assertThat(event.getFinishedSplits()).hasSize(1);
+ assertThat(
+ event.getFinishedSplits().stream()
+ .map(SplitsFinishedEventContext::getSplitId))
.containsAnyOf("finished-split-1", "finished-split-2");
});
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReaderTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReaderTest.java
index f3cc1692a..bcd57cf85 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReaderTest.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/reader/PollingDynamoDbStreamsShardSplitReaderTest.java
@@ -73,6 +73,7 @@ public void init() {
testStreamProxy,
NON_EMPTY_POLLING_DELAY_MILLIS,
EMPTY_POLLING_DELAY_MILLIS,
+ new ConcurrentHashMap<>(),
shardMetricGroupMap);
}
@@ -235,12 +236,20 @@ void testFinishedSplitsReturned() throws Exception {
assertThat(retrievedRecords.finishedSplits()).isEmpty();
fetchedRecords.add(retrievedRecords.nextRecordFromSplit());
}
-
- assertThat(retrievedRecords.nextSplit()).isNull();
- assertThat(retrievedRecords.finishedSplits()).contains(split.splitId());
assertThat(fetchedRecords)
.containsExactlyInAnyOrderElementsOf(expectedRecords);
});
+
+ // Now wait for the split to be marked as finished after child shard discovery attempts
+ await().pollDelay(NON_EMPTY_POLLING_DELAY_MILLIS)
+ .atMost(Duration.ofSeconds(30)) // Allow enough time for all retry attempts
+ .untilAsserted(
+ () -> {
+ RecordsWithSplitIds retrievedRecords = splitReader.fetch();
+ // No more records should be returned
+ assertThat(readAllRecords(retrievedRecords)).isEmpty();
+ assertThat(retrievedRecords.finishedSplits()).contains(split.splitId());
+ });
}
@Test
@@ -400,6 +409,7 @@ void testPollingDelayForEmptyRecords() throws Exception {
testStreamProxy,
NON_EMPTY_POLLING_DELAY_MILLIS,
testEmptyPollDelay,
+ new ConcurrentHashMap<>(),
shardMetricGroupMap);
// Immediate second poll - should return empty due to polling delay
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializerTest.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializerTest.java
index ebf117829..410f93bcc 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializerTest.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/split/DynamoDbStreamsShardSplitSerializerTest.java
@@ -29,6 +29,7 @@
import static org.apache.flink.connector.dynamodb.source.util.TestUtil.SHARD_ID;
import static org.apache.flink.connector.dynamodb.source.util.TestUtil.STREAM_ARN;
import static org.apache.flink.connector.dynamodb.source.util.TestUtil.getTestSplit;
+import static org.apache.flink.connector.dynamodb.source.util.TestUtil.getTestSplitWithChildShards;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType;
@@ -47,6 +48,19 @@ void testSerializeAndDeserializeEverythingSpecified() throws Exception {
assertThat(deserializedSplit).usingRecursiveComparison().isEqualTo(initialSplit);
}
+ @Test
+ void testSerializeAndDeserializeWithChildSplits() throws Exception {
+ final DynamoDbStreamsShardSplit initialSplit = getTestSplitWithChildShards();
+
+ DynamoDbStreamsShardSplitSerializer serializer = new DynamoDbStreamsShardSplitSerializer();
+
+ byte[] serialized = serializer.serialize(initialSplit);
+ DynamoDbStreamsShardSplit deserializedSplit =
+ serializer.deserialize(serializer.getVersion(), serialized);
+
+ assertThat(deserializedSplit).usingRecursiveComparison().isEqualTo(initialSplit);
+ }
+
@ParameterizedTest
@MethodSource("provideStartingPositions")
void testSerializeAndDeserializeWithStartingPosition(StartingPosition startingPosition)
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsClientProvider.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsClientProvider.java
index c4c3285ad..8e6c0a5cb 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsClientProvider.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsClientProvider.java
@@ -26,12 +26,18 @@
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse;
+import software.amazon.awssdk.services.dynamodb.model.Shard;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilterType;
+import software.amazon.awssdk.services.dynamodb.model.StreamDescription;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsServiceClientConfiguration;
import java.util.ArrayDeque;
import java.util.Deque;
+import java.util.List;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
/** Provides {@link DynamoDbStreamsClient} with mocked DynamoDbStreams behavior. */
public class DynamoDbStreamsClientProvider {
@@ -84,6 +90,29 @@ public DescribeStreamResponse describeStream(DescribeStreamRequest describeStrea
throws AwsServiceException, SdkClientException {
describeStreamValidation.accept(describeStreamRequest);
+ ShardFilter shardFilter = describeStreamRequest.shardFilter();
+ if (shardFilter != null && ShardFilterType.CHILD_SHARDS.equals(shardFilter.type())) {
+ List shards = describeStreamResponse.streamDescription().shards();
+ List childShards =
+ shards.stream()
+ .filter(
+ shard ->
+ shard.parentShardId() != null
+ && shard.parentShardId()
+ .equals(shardFilter.shardId()))
+ .collect(Collectors.toList());
+ return DescribeStreamResponse.builder()
+ .streamDescription(
+ StreamDescription.builder()
+ .shards(childShards)
+ .streamArn(describeStreamRequest.streamArn())
+ .streamStatus(
+ describeStreamResponse
+ .streamDescription()
+ .streamStatus())
+ .build())
+ .build();
+ }
return describeStreamResponse;
}
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java
index c9f45c196..c4a81c771 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/DynamoDbStreamsProxyProvider.java
@@ -26,6 +26,8 @@
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.awssdk.services.dynamodb.model.Shard;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
+import software.amazon.awssdk.services.dynamodb.model.ShardFilterType;
import javax.annotation.Nullable;
@@ -84,6 +86,25 @@ public ListShardsResult listShards(String streamArn, @Nullable String lastSeenSh
return listShardsResult;
}
+ @Override
+ public ListShardsResult listShardsWithFilter(String streamArn, ShardFilter shardFilter) {
+ if (ShardFilterType.CHILD_SHARDS.equals(shardFilter.type())) {
+ ListShardsResult listShardsResult = new ListShardsResult();
+ List childShards =
+ shards.stream()
+ .filter(
+ shard ->
+ shard.parentShardId() != null
+ && shard.parentShardId()
+ .equals(shardFilter.shardId()))
+ .collect(Collectors.toList());
+ listShardsResult.addShards(childShards);
+ return listShardsResult;
+ }
+ throw new UnsupportedOperationException(
+ String.format("ShardFilterType %s not supported", shardFilter.type().name()));
+ }
+
@Override
public GetRecordsResponse getRecords(
String streamArn, String shardId, StartingPosition startingPosition) {
diff --git a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/TestUtil.java b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/TestUtil.java
index 9c3fcb9f5..4c159593f 100644
--- a/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/TestUtil.java
+++ b/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/source/util/TestUtil.java
@@ -35,7 +35,10 @@
import java.time.Duration;
import java.time.Instant;
+import java.util.Arrays;
+import java.util.List;
import java.util.Optional;
+import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@@ -45,6 +48,8 @@ public class TestUtil {
public static final String STREAM_ARN =
"arn:aws:dynamodb:us-east-1:123456789012:stream/2024-01-01T00:00:00Z";
public static final String SHARD_ID = "shardId-000000000002";
+ public static final String CHILD_SHARD_ID_1 = "shardId-000000000003";
+ public static final String CHILD_SHARD_ID_2 = "shardId-000000000004";
public static final SimpleStringSchema STRING_SCHEMA = new SimpleStringSchema();
public static final long MILLIS_BEHIND_LATEST_TEST_VALUE = -1L;
@@ -95,6 +100,15 @@ public static DynamoDbStreamsShardSplit getTestSplit() {
return getTestSplit(SHARD_ID);
}
+ public static DynamoDbStreamsShardSplit getTestSplitWithChildShards() {
+ return getTestSplitWithChildShards(SHARD_ID);
+ }
+
+ public static DynamoDbStreamsShardSplit getTestSplitWithChildShards(String shardId) {
+ return getTestSplit(
+ STREAM_ARN, SHARD_ID, Arrays.asList(CHILD_SHARD_ID_1, CHILD_SHARD_ID_2));
+ }
+
public static DynamoDbStreamsShardSplit getTestSplit(String shardId) {
return getTestSplit(STREAM_ARN, shardId);
}
@@ -104,6 +118,27 @@ public static DynamoDbStreamsShardSplit getTestSplit(String streamArn, String sh
streamArn, shardId, StartingPosition.fromStart(), null);
}
+ public static DynamoDbStreamsShardSplit getTestSplit(
+ String streamArn, String shardId, List childShardIds) {
+ return new DynamoDbStreamsShardSplit(
+ streamArn,
+ shardId,
+ StartingPosition.fromStart(),
+ null,
+ childShardIds.stream()
+ .map(
+ childShardId ->
+ Shard.builder()
+ .parentShardId(shardId)
+ .shardId(childShardId)
+ .sequenceNumberRange(
+ SequenceNumberRange.builder()
+ .startingSequenceNumber("1234")
+ .build())
+ .build())
+ .collect(Collectors.toList()));
+ }
+
public static DynamoDbStreamsShardSplit getTestSplit(StartingPosition startingPosition) {
return new DynamoDbStreamsShardSplit(STREAM_ARN, SHARD_ID, startingPosition, null);
}