diff --git a/docs/content/docs/connectors/datastream/kinesis.md b/docs/content/docs/connectors/datastream/kinesis.md index 72bd5d738..b5f021166 100644 --- a/docs/content/docs/connectors/datastream/kinesis.md +++ b/docs/content/docs/connectors/datastream/kinesis.md @@ -217,6 +217,38 @@ properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIME If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX` (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without given a pattern). +### Configuring starting position for new streams + +By default, the Flink Kinesis Consumer handles new streams the same way it handles a new shard for an existing stream, and it starts consuming from the earliest record (same behaviour as TRIM_HORIZON). + +This behaviour is fine if you're consuming from a stream that you don't want to lose any data from, but if you're consuming from a stream with a large retention and where it is fine to start consuming from "now", +or more generally started from that is defined in `ConsumerConfigConstants.STREAM_INITIAL_POSITION`, this was not possible before. + +This behaviour can now be enabled by setting the `ConsumerConfigConstants.APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS` flag to true, which will make ALL new streams "reset" to consume from the initial position +instead of starting from the beginning. + +If you just want to force a particular new stream to start consuming from the defined `ConsumerConfigConstants.STREAM_INITIAL_POSITION`, you can use the `ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO` property (described below) instead. + +### Resetting specific streams to the configured initial position + +One of the features of the Flink Kinesis Consumer is that it keeps track of the offset that the application is at for each shard, so that if the application is restarted we can start consuming from that offset +when restoring from snapshot. + +This is the ideal behaviour most of the time, but what if you want to jump to `LATEST` or go back to `TRIM_HORIZON` for a stream that is already being tracked by the Flink Kinesis Consumer? + +You can now do this via the `ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO` property, which expects a comma separated list of strings referring to the names of the Kinesis Streams to reset. + +For example, if you configure your application with +``` +consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); +consumerConfig.put(ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO, "streamA, streamB"); +``` +then `streamA` and `streamB` would start consuming from LATEST, even if they are already being tracked by the application. + +{{< hint warning >}} +Note that you need to remove this property after the value is reset and a savepoint is taken, otherwise the Flink Kinesis Consumer will always be resetting those streams to the configured initial position. +{{< /hint >}} + ### Fault Tolerance for Exactly-Once User-Defined State Update Semantics With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume records from shards in Kinesis streams and diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java index 06d0acc9d..c0cb93b90 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java +++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java @@ -42,6 +42,7 @@ import org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber; import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata; +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata.EquivalenceWrapper; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchema; import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisDeserializationSchemaWrapper; import org.apache.flink.streaming.connectors.kinesis.table.DefaultShardAssignerFactory; @@ -54,11 +55,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; +import java.util.Set; +import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -148,6 +154,12 @@ public class FlinkKinesisConsumer extends RichParallelSourceFunction private transient HashMap sequenceNumsToRestore; + /** + * The streams present in the {@link #sequenceNumsToRestore} map, which means they were consumed + * by the application previously, so we know where to consume from. + */ + private transient Set knownStreams; + /** * Flag used to control reading from Kinesis: source will read data while value is true. Changed * to false after {@link #cancel()} has been called. @@ -323,70 +335,49 @@ public void run(SourceContext sourceContext) throws Exception { // initial discovery List allShards = fetcher.discoverNewShardsToSubscribe(); + boolean applyStreamInitialPositionForNewStreams = + getApplyStreamInitialPositionForNewStreamsFlag(); + + Set streamsToForceInitialPositionIn = getStreamsToForceInitialPositionIn(); + for (StreamShardHandle shard : allShards) { StreamShardMetadata.EquivalenceWrapper kinesisStreamShard = new StreamShardMetadata.EquivalenceWrapper( KinesisDataFetcher.convertToStreamShardMetadata(shard)); - - if (sequenceNumsToRestore != null) { - + String stream = shard.getStreamName(); + + if (sequenceNumsToRestore == null + || sequenceNumsToRestore.isEmpty() + || streamsToForceInitialPositionIn.contains(stream)) { + // we're starting fresh (either for the whole consumer or for this stream); + // use the configured start position as initial state + registerFromInitialPosition(fetcher, shard, kinesisStreamShard); + } else { if (sequenceNumsToRestore.containsKey(kinesisStreamShard)) { // if the shard was already seen and is contained in the state, // just use the sequence number stored in the state - fetcher.registerNewSubscribedShardState( - new KinesisStreamShardState( - kinesisStreamShard.getShardMetadata(), - shard, - sequenceNumsToRestore.get(kinesisStreamShard))); - - if (LOG.isInfoEnabled()) { - LOG.info( - "Subtask {} is seeding the fetcher with restored shard {}," - + " starting state set to the restored sequence number {}", - getRuntimeContext().getIndexOfThisSubtask(), - shard.toString(), - sequenceNumsToRestore.get(kinesisStreamShard)); - } + registerFromState(fetcher, shard, kinesisStreamShard); } else { - // the shard wasn't discovered in the previous run, therefore should be consumed - // from the beginning - fetcher.registerNewSubscribedShardState( - new KinesisStreamShardState( - kinesisStreamShard.getShardMetadata(), - shard, - SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())); - - if (LOG.isInfoEnabled()) { - LOG.info( - "Subtask {} is seeding the fetcher with new discovered shard {}," - + " starting state set to the SENTINEL_EARLIEST_SEQUENCE_NUM", - getRuntimeContext().getIndexOfThisSubtask(), - shard.toString()); + // it's either a new shard for a stream that was already seen or a new stream + if (knownStreams.contains(stream)) { + // the shard wasn't discovered in the previous run, therefore should be + // consumed + // from the beginning OR this is a new stream we haven't seen yet, and the + // applyStreamInitialPositionForNewStreams flag is false + registerFromBeginning(fetcher, shard, kinesisStreamShard); + } else { + // it's a new stream + if (applyStreamInitialPositionForNewStreams) { + // the flag is true, so we respect the initial position for the new + // stream + registerFromInitialPosition(fetcher, shard, kinesisStreamShard); + } else { + // the flag is false, so we continue existing behaviour of registering + // from the beginning + registerFromBeginning(fetcher, shard, kinesisStreamShard); + } } } - } else { - // we're starting fresh; use the configured start position as initial state - SentinelSequenceNumber startingSeqNum = - InitialPosition.valueOf( - configProps.getProperty( - ConsumerConfigConstants.STREAM_INITIAL_POSITION, - ConsumerConfigConstants - .DEFAULT_STREAM_INITIAL_POSITION)) - .toSentinelSequenceNumber(); - - fetcher.registerNewSubscribedShardState( - new KinesisStreamShardState( - kinesisStreamShard.getShardMetadata(), - shard, - startingSeqNum.get())); - - if (LOG.isInfoEnabled()) { - LOG.info( - "Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}", - getRuntimeContext().getIndexOfThisSubtask(), - shard.toString(), - startingSeqNum.get()); - } } } @@ -408,6 +399,94 @@ public void run(SourceContext sourceContext) throws Exception { sourceContext.close(); } + private Set getStreamsToForceInitialPositionIn() { + String streamsToForceInitialPositionInStr = + configProps.getProperty( + ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO); + + if (streamsToForceInitialPositionInStr == null) { + return Collections.emptySet(); + } + + return Arrays.stream(streamsToForceInitialPositionInStr.split(",")) + .map(String::trim) + .collect(Collectors.toSet()); + } + + private Boolean getApplyStreamInitialPositionForNewStreamsFlag() { + return Optional.ofNullable( + configProps.getProperty( + ConsumerConfigConstants + .APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS)) + .map(Boolean::parseBoolean) + .orElse( + ConsumerConfigConstants + .DEFAULT_APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS); + } + + private void registerFromBeginning( + KinesisDataFetcher fetcher, + StreamShardHandle shard, + EquivalenceWrapper kinesisStreamShard) { + fetcher.registerNewSubscribedShardState( + new KinesisStreamShardState( + kinesisStreamShard.getShardMetadata(), + shard, + SentinelSequenceNumber.SENTINEL_EARLIEST_SEQUENCE_NUM.get())); + + if (LOG.isInfoEnabled()) { + LOG.info( + "Subtask {} is seeding the fetcher with new discovered shard {}," + + " starting state set to the SENTINEL_EARLIEST_SEQUENCE_NUM", + getRuntimeContext().getIndexOfThisSubtask(), + shard.toString()); + } + } + + private void registerFromInitialPosition( + KinesisDataFetcher fetcher, + StreamShardHandle shard, + EquivalenceWrapper kinesisStreamShard) { + SentinelSequenceNumber startingSeqNum = + InitialPosition.valueOf( + configProps.getProperty( + ConsumerConfigConstants.STREAM_INITIAL_POSITION, + ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)) + .toSentinelSequenceNumber(); + + fetcher.registerNewSubscribedShardState( + new KinesisStreamShardState( + kinesisStreamShard.getShardMetadata(), shard, startingSeqNum.get())); + + if (LOG.isInfoEnabled()) { + LOG.info( + "Subtask {} will be seeded with initial shard {}, starting state set as sequence number {}", + getRuntimeContext().getIndexOfThisSubtask(), + shard.toString(), + startingSeqNum.get()); + } + } + + private void registerFromState( + KinesisDataFetcher fetcher, + StreamShardHandle shard, + EquivalenceWrapper kinesisStreamShard) { + fetcher.registerNewSubscribedShardState( + new KinesisStreamShardState( + kinesisStreamShard.getShardMetadata(), + shard, + sequenceNumsToRestore.get(kinesisStreamShard))); + + if (LOG.isInfoEnabled()) { + LOG.info( + "Subtask {} is seeding the fetcher with restored shard {}," + + " starting state set to the restored sequence number {}", + getRuntimeContext().getIndexOfThisSubtask(), + shard.toString(), + sequenceNumsToRestore.get(kinesisStreamShard)); + } + } + @Override public void cancel() { running = false; @@ -464,8 +543,10 @@ public void initializeState(FunctionInitializationContext context) throws Except if (context.isRestored()) { if (sequenceNumsToRestore == null) { sequenceNumsToRestore = new HashMap<>(); + knownStreams = new HashSet<>(); for (Tuple2 kinesisSequenceNumber : sequenceNumsStateForCheckpoint.get()) { + StreamShardMetadata streamShardMetadata = kinesisSequenceNumber.f0; sequenceNumsToRestore.put( // we wrap the restored metadata inside an equivalence wrapper that // checks only stream name and shard id, @@ -474,8 +555,9 @@ public void initializeState(FunctionInitializationContext context) throws Except // the savepoint and has a different metadata than what we last stored, // we will still be able to match it in sequenceNumsToRestore. Please // see FLINK-8484 for details. - new StreamShardMetadata.EquivalenceWrapper(kinesisSequenceNumber.f0), + new StreamShardMetadata.EquivalenceWrapper(streamShardMetadata), kinesisSequenceNumber.f1); + knownStreams.add(streamShardMetadata.getStreamName()); } LOG.info( diff --git a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java index 710682266..e6d0fb84c 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java +++ b/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/config/ConsumerConfigConstants.java @@ -320,12 +320,39 @@ public enum EFORegistrationType { public static final String EFO_HTTP_CLIENT_READ_TIMEOUT_MILLIS = "flink.stream.efo.http-client.read-timeout"; + /** + * Flag to configure whether {@link #STREAM_INITIAL_POSITION} should be considered for new + * streams, when the app is already consuming from other streams. If set to true, then any + * stream that doesn't have any shard tracked by state yet will use the initial position. If + * false (default), it is assumed that we should consume from the beginning, which is + * appropriate when you want to ensure no data is lost if the stream is already being used by + * the data producers. + */ + public static final String APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS = + "flink.stream.initpos-for-new-streams"; + + /** + * Property that can be used to ignore the restore state for a particular stream and instead use + * the initial position. This is useful to reset a specific stream to consume from TRIM_HORIZON + * or LATEST if needed. Values must be passed in a comma separated list. If a stream is in this + * list, it will use initial position regardless of the value of the {@link + * #APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS} property. + */ + public static final String STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO = + "flink.stream.initpos-streams"; + // ------------------------------------------------------------------------ // Default values for consumer configuration // ------------------------------------------------------------------------ public static final String DEFAULT_STREAM_INITIAL_POSITION = InitialPosition.LATEST.toString(); + /** + * False for now so that we preserve old behaviour. TODO switch to true in the next major? If so + * update the javadoc. + */ + public static final boolean DEFAULT_APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS = false; + public static final String DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"; diff --git a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index 82836dffe..e86587c5d 100644 --- a/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connector-aws/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -43,6 +43,7 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition; import org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShard; import org.apache.flink.streaming.connectors.kinesis.model.KinesisStreamShardState; @@ -95,7 +96,10 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import java.util.stream.Collectors; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS; +import static org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; @@ -1332,4 +1336,445 @@ static void assertGlobalWatermark(long expected) { assertThat(WATERMARK.get()).isEqualTo(expected); } } + + /* =========================================================================== + Tests for FLINK-35299 + The setup for these tests will always be the same: + - stream A with state for shards 0 and 1 + - stream B with state for shard 0 + + Then new shards will be discovered: + - new shard (1) for stream B + - new shard (0) for stream C - since stream C is not in state yet, it qualifies as a "new stream". + ==============================================================================*/ + + /** + * Tests FLINK-35299 with the default config values: - IF there is no state at all, all new + * streams/shards should start from INITIAL POSITION. + */ + @Test + @SuppressWarnings("unchecked") + public void testFLINK35299DefaultsWhenThereIsNoState() throws Exception { + Properties config = TestUtils.getStandardProperties(); + + List> existingState = null; + + List shardsToSubscribe = new ArrayList<>(); + StreamShardHandle streamShardA0 = getStreamShard("stream-A", 0); + StreamShardHandle streamShardA1 = getStreamShard("stream-A", 1); + StreamShardHandle streamShardB0 = getStreamShard("stream-B", 0); + StreamShardHandle streamShardB1 = getStreamShard("stream-B", 1); + StreamShardHandle streamShardC0 = getStreamShard("stream-C", 0); + shardsToSubscribe.add(streamShardA0); + shardsToSubscribe.add(streamShardA1); + shardsToSubscribe.add(streamShardB0); + shardsToSubscribe.add(streamShardB1); // new shard for existing stream + shardsToSubscribe.add(streamShardC0); // new stream + + Map expectedResults = new HashMap<>(); + SequenceNumber defaultInitialPositionSeqNumber = + getSequenceNumber( + InitialPosition.valueOf( + ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)); + expectedResults.put(streamShardA0, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardA1, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardB0, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardB1, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardC0, defaultInitialPositionSeqNumber); + + runAndValidate(config, existingState, shardsToSubscribe, expectedResults); + } + + /** + * Tests FLINK-35299 with the {@link + * ConsumerConfigConstants#APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS} flag is set to true. - + * IF there is no state at all, all new streams/shards should start from INITIAL POSITION. + */ + @Test + @SuppressWarnings("unchecked") + public void testFLINK35299ApplyStreamInitialPositionForNewStreamsWhenThereIsNoState() + throws Exception { + Properties config = TestUtils.getStandardProperties(); + config.setProperty(APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS, "true"); + + List> existingState = null; + + List shardsToSubscribe = new ArrayList<>(); + StreamShardHandle streamShardA0 = getStreamShard("stream-A", 0); + StreamShardHandle streamShardA1 = getStreamShard("stream-A", 1); + StreamShardHandle streamShardB0 = getStreamShard("stream-B", 0); + StreamShardHandle streamShardB1 = getStreamShard("stream-B", 1); + StreamShardHandle streamShardC0 = getStreamShard("stream-C", 0); + shardsToSubscribe.add(streamShardA0); + shardsToSubscribe.add(streamShardA1); + shardsToSubscribe.add(streamShardB0); + shardsToSubscribe.add(streamShardB1); // new shard for existing stream + shardsToSubscribe.add(streamShardC0); // new stream + + Map expectedResults = new HashMap<>(); + SequenceNumber defaultInitialPositionSeqNumber = + getSequenceNumber( + InitialPosition.valueOf( + ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)); + expectedResults.put(streamShardA0, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardA1, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardB0, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardB1, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardC0, defaultInitialPositionSeqNumber); + + runAndValidate(config, existingState, shardsToSubscribe, expectedResults); + } + + /** + * Tests FLINK-35299 with the {@link + * ConsumerConfigConstants#STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO} list contains some + * values. - IF there is no state at all, all new streams/shards should start from INITIAL + * POSITION. + */ + @Test + @SuppressWarnings("unchecked") + public void testFLINK35299StreamsToApplyStreamInitialPositionToWhenThereIsNoState() + throws Exception { + Properties config = TestUtils.getStandardProperties(); + config.setProperty(STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO, "stream-A"); + + List> existingState = null; + + List shardsToSubscribe = new ArrayList<>(); + StreamShardHandle streamShardA0 = getStreamShard("stream-A", 0); + StreamShardHandle streamShardA1 = getStreamShard("stream-A", 1); + StreamShardHandle streamShardB0 = getStreamShard("stream-B", 0); + StreamShardHandle streamShardB1 = getStreamShard("stream-B", 1); + StreamShardHandle streamShardC0 = getStreamShard("stream-C", 0); + shardsToSubscribe.add(streamShardA0); + shardsToSubscribe.add(streamShardA1); + shardsToSubscribe.add(streamShardB0); + shardsToSubscribe.add(streamShardB1); // new shard for existing stream + shardsToSubscribe.add(streamShardC0); // new stream + + Map expectedResults = new HashMap<>(); + SequenceNumber defaultInitialPositionSeqNumber = + getSequenceNumber( + InitialPosition.valueOf( + ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION)); + expectedResults.put(streamShardA0, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardA1, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardB0, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardB1, defaultInitialPositionSeqNumber); + expectedResults.put(streamShardC0, defaultInitialPositionSeqNumber); + + runAndValidate(config, existingState, shardsToSubscribe, expectedResults); + } + + /** + * Tests the default values of the properties introduced in FLINK-35299: - IF there is some + * state already - new streams should start from EARLIEST - new shards for existing streams + * start from EARLIEST - existing shards should continue from the state value + */ + @Test + @SuppressWarnings("unchecked") + public void testFLINK35299DefaultsWhenThereIsState() throws Exception { + Properties config = TestUtils.getStandardProperties(); + + List> existingState = new ArrayList<>(); + existingState.add(createShardState("stream-A", 0, "A0")); + existingState.add(createShardState("stream-A", 1, "A1")); + existingState.add(createShardState("stream-B", 0, "B0")); + + List shardsToSubscribe = new ArrayList<>(); + StreamShardHandle streamShardA0 = getStreamShard("stream-A", 0); + StreamShardHandle streamShardA1 = getStreamShard("stream-A", 1); + StreamShardHandle streamShardB0 = getStreamShard("stream-B", 0); + StreamShardHandle streamShardB1 = getStreamShard("stream-B", 1); + StreamShardHandle streamShardC0 = getStreamShard("stream-C", 0); + shardsToSubscribe.add(streamShardA0); + shardsToSubscribe.add(streamShardA1); + shardsToSubscribe.add(streamShardB0); + shardsToSubscribe.add(streamShardB1); // new shard for existing stream + shardsToSubscribe.add(streamShardC0); // new stream + + Map expectedResults = new HashMap<>(); + expectedResults.put(streamShardA0, getSequenceNumber("A0")); + expectedResults.put(streamShardA1, getSequenceNumber("A1")); + expectedResults.put(streamShardB0, getSequenceNumber("B0")); + expectedResults.put(streamShardB1, getSequenceNumber(InitialPosition.TRIM_HORIZON)); + expectedResults.put(streamShardC0, getSequenceNumber(InitialPosition.TRIM_HORIZON)); + + runAndValidate(config, existingState, shardsToSubscribe, expectedResults); + } + + /** + * Tests FLINK-35299 when the {@link + * ConsumerConfigConstants#APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS} flag is set to true. + * In this case any NEW streams should start from the initial position configured and everything + * else should stay as it was. + */ + @Test + @SuppressWarnings("unchecked") + public void testFLINK35299ApplyStreamInitialPositionForNewStreamsSetToTrue() throws Exception { + Properties config = TestUtils.getStandardProperties(); + config.setProperty(APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS, "true"); + + List> existingState = new ArrayList<>(); + existingState.add(createShardState("stream-A", 0, "A0")); + existingState.add(createShardState("stream-A", 1, "A1")); + existingState.add(createShardState("stream-B", 0, "B0")); + + List shardsToSubscribe = new ArrayList<>(); + StreamShardHandle streamShardA0 = getStreamShard("stream-A", 0); + StreamShardHandle streamShardA1 = getStreamShard("stream-A", 1); + StreamShardHandle streamShardB0 = getStreamShard("stream-B", 0); + StreamShardHandle streamShardB1 = getStreamShard("stream-B", 1); + StreamShardHandle streamShardC0 = getStreamShard("stream-C", 0); + shardsToSubscribe.add(streamShardA0); + shardsToSubscribe.add(streamShardA1); + shardsToSubscribe.add(streamShardB0); + shardsToSubscribe.add(streamShardB1); // new shard for existing stream + shardsToSubscribe.add(streamShardC0); // new stream + + Map expectedResults = new HashMap<>(); + expectedResults.put(streamShardA0, getSequenceNumber("A0")); + expectedResults.put(streamShardA1, getSequenceNumber("A1")); + expectedResults.put(streamShardB0, getSequenceNumber("B0")); + expectedResults.put(streamShardB1, getSequenceNumber(InitialPosition.TRIM_HORIZON)); + expectedResults.put( + streamShardC0, + getSequenceNumber( + InitialPosition.valueOf( + ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); + + runAndValidate(config, existingState, shardsToSubscribe, expectedResults); + } + + /** + * Tests FLINK-35299 when the {@link + * ConsumerConfigConstants#STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO} flag is set to a + * non-null list. In this case the stream used in that list should use the initial position from + * the config instead of using the state value. Everything else should behave as before. + */ + @Test + @SuppressWarnings("unchecked") + public void testFLINK35299StreamsToApplyStreamInitialPositionTo() throws Exception { + Properties config = TestUtils.getStandardProperties(); + config.setProperty(STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO, "stream-A"); + + List> existingState = new ArrayList<>(); + existingState.add(createShardState("stream-A", 0, "A0")); + existingState.add(createShardState("stream-A", 1, "A1")); + existingState.add(createShardState("stream-B", 0, "B0")); + + List shardsToSubscribe = new ArrayList<>(); + StreamShardHandle streamShardA0 = getStreamShard("stream-A", 0); + StreamShardHandle streamShardA1 = getStreamShard("stream-A", 1); + StreamShardHandle streamShardB0 = getStreamShard("stream-B", 0); + StreamShardHandle streamShardB1 = getStreamShard("stream-B", 1); + StreamShardHandle streamShardC0 = getStreamShard("stream-C", 0); + shardsToSubscribe.add(streamShardA0); + shardsToSubscribe.add(streamShardA1); + shardsToSubscribe.add(streamShardB0); + shardsToSubscribe.add(streamShardB1); // new shard for existing stream + shardsToSubscribe.add(streamShardC0); // new stream + + Map expectedResults = new HashMap<>(); + expectedResults.put( + streamShardA0, + getSequenceNumber( + InitialPosition.valueOf( + ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); + expectedResults.put( + streamShardA1, + getSequenceNumber( + InitialPosition.valueOf( + ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); + expectedResults.put(streamShardB0, getSequenceNumber("B0")); + expectedResults.put(streamShardB1, getSequenceNumber(InitialPosition.TRIM_HORIZON)); + expectedResults.put(streamShardC0, getSequenceNumber(InitialPosition.TRIM_HORIZON)); + + runAndValidate(config, existingState, shardsToSubscribe, expectedResults); + } + + /** + * Tests FLINK-35299 when the {@link + * ConsumerConfigConstants#STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO} flag contains streams + * that are not tracked yet. This is an edge case of {@link + * #testFLINK35299StreamsToApplyStreamInitialPositionTo()}. + */ + @Test + @SuppressWarnings("unchecked") + public void testFLINK35299StreamsToApplyStreamInitialPositionToForANewStream() + throws Exception { + Properties config = TestUtils.getStandardProperties(); + config.setProperty(STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO, "stream-C"); + + List> existingState = new ArrayList<>(); + existingState.add(createShardState("stream-A", 0, "A0")); + existingState.add(createShardState("stream-A", 1, "A1")); + existingState.add(createShardState("stream-B", 0, "B0")); + + List shardsToSubscribe = new ArrayList<>(); + StreamShardHandle streamShardA0 = getStreamShard("stream-A", 0); + StreamShardHandle streamShardA1 = getStreamShard("stream-A", 1); + StreamShardHandle streamShardB0 = getStreamShard("stream-B", 0); + StreamShardHandle streamShardB1 = getStreamShard("stream-B", 1); + StreamShardHandle streamShardC0 = getStreamShard("stream-C", 0); + shardsToSubscribe.add(streamShardA0); + shardsToSubscribe.add(streamShardA1); + shardsToSubscribe.add(streamShardB0); + shardsToSubscribe.add(streamShardB1); // new shard for existing stream + shardsToSubscribe.add(streamShardC0); // new stream + + Map expectedResults = new HashMap<>(); + expectedResults.put(streamShardA0, getSequenceNumber("A0")); + expectedResults.put(streamShardA1, getSequenceNumber("A1")); + expectedResults.put(streamShardB0, getSequenceNumber("B0")); + expectedResults.put(streamShardB1, getSequenceNumber(InitialPosition.TRIM_HORIZON)); + expectedResults.put( + streamShardC0, + getSequenceNumber( + InitialPosition.valueOf( + ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); + + runAndValidate(config, existingState, shardsToSubscribe, expectedResults); + } + + /** + * Tests FLINK-35299 when the {@link + * ConsumerConfigConstants#APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS} flag is set to true + * and the {@link ConsumerConfigConstants#STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO} list is + * set to a non-null value. + */ + @SuppressWarnings("unchecked") + public void testFLINK35299BothNewPropertiesBeingUsed() throws Exception { + Properties config = TestUtils.getStandardProperties(); + config.setProperty(APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS, "true"); + config.setProperty(STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO, "stream-B"); + + List> existingState = new ArrayList<>(); + existingState.add(createShardState("stream-A", 0, "A0")); + existingState.add(createShardState("stream-A", 1, "A1")); + existingState.add(createShardState("stream-B", 0, "B0")); + + List shardsToSubscribe = new ArrayList<>(); + StreamShardHandle streamShardA0 = getStreamShard("stream-A", 0); + StreamShardHandle streamShardA1 = getStreamShard("stream-A", 1); + StreamShardHandle streamShardB0 = getStreamShard("stream-B", 0); + StreamShardHandle streamShardB1 = getStreamShard("stream-B", 1); + StreamShardHandle streamShardC0 = getStreamShard("stream-C", 0); + shardsToSubscribe.add(streamShardA0); + shardsToSubscribe.add(streamShardA1); + shardsToSubscribe.add(streamShardB0); + shardsToSubscribe.add(streamShardB1); // new shard for existing stream + shardsToSubscribe.add(streamShardC0); // new stream + + Map expectedResults = new HashMap<>(); + expectedResults.put(streamShardA0, getSequenceNumber("A0")); + expectedResults.put(streamShardA1, getSequenceNumber("A1")); + expectedResults.put( + streamShardB0, + getSequenceNumber( + InitialPosition.valueOf( + ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); + expectedResults.put(streamShardB1, getSequenceNumber(InitialPosition.TRIM_HORIZON)); + expectedResults.put( + streamShardC0, + getSequenceNumber( + InitialPosition.valueOf( + ConsumerConfigConstants.DEFAULT_STREAM_INITIAL_POSITION))); + + runAndValidate(config, existingState, shardsToSubscribe, expectedResults); + } + + private void runAndValidate( + Properties config, + List> existingState, + List shardsToSubscribe, + Map expectedResults) + throws Exception { + KinesisDataFetcher mockedFetcher = mockKinesisDataFetcher(); + + TestingListState> listState = + new TestingListState<>(); + if (existingState != null) { + listState.addAll(existingState); + } + when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shardsToSubscribe); + + List streamsToConsume = + shardsToSubscribe.stream() + .map(StreamShardHandle::getStreamName) + .distinct() + .collect(Collectors.toList()); + FlinkKinesisConsumer consumer = + new FlinkKinesisConsumer<>( + streamsToConsume, + new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()), + config); + RuntimeContext context = new MockStreamingRuntimeContext(true, 2, 0); + consumer.setRuntimeContext(context); + + OperatorStateStore operatorStateStore = mock(OperatorStateStore.class); + when(operatorStateStore.getUnionListState(any(ListStateDescriptor.class))) + .thenReturn(listState); + + StateInitializationContext initializationContext = mock(StateInitializationContext.class); + when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore); + when(initializationContext.isRestored()).thenReturn(true); + + consumer.initializeState(initializationContext); + + consumer.open(new Configuration()); + consumer.run(Mockito.mock(SourceFunction.SourceContext.class)); + + // check interactions with fetched + expectedResults.forEach( + (streamShardHandle, sequenceNumber) -> + verifyRegisterNewSubscribedShard( + mockedFetcher, streamShardHandle, sequenceNumber)); + + // arbitrary checkpoint to validate new state + consumer.snapshotState(new StateSnapshotContextSynchronousImpl(123, 123)); + assertThat(listState.isClearCalled()).isTrue(); + List> list = listState.getList(); + for (Tuple2 entry : list) { + StreamShardMetadata streamShardMetadata = entry.f0; + SequenceNumber sequenceNumber = entry.f1; + + SequenceNumber expectedSequenceNumber = + expectedResults.get(getStreamShard(streamShardMetadata)); + assertThat(sequenceNumber).isEqualTo(expectedSequenceNumber); + } + } + + private static SequenceNumber getSequenceNumber(String seqNumber) { + return new SequenceNumber(seqNumber); + } + + private static SequenceNumber getSequenceNumber(InitialPosition initialPosition) { + return initialPosition.toSentinelSequenceNumber().get(); + } + + private static void verifyRegisterNewSubscribedShard( + KinesisDataFetcher mockedFetcher, + StreamShardHandle streamShardHandle, + SequenceNumber sequenceNumber) { + Mockito.verify(mockedFetcher) + .registerNewSubscribedShardState( + new KinesisStreamShardState( + KinesisDataFetcher.convertToStreamShardMetadata(streamShardHandle), + streamShardHandle, + sequenceNumber)); + } + + private StreamShardHandle getStreamShard(StreamShardMetadata streamShardMetadata) { + return getStreamShard( + streamShardMetadata.getStreamName(), streamShardMetadata.getShardId()); + } + + private static StreamShardHandle getStreamShard(String streamName, int shardId) { + return getStreamShard(streamName, KinesisShardIdGenerator.generateFromShardOrder(shardId)); + } + + private static StreamShardHandle getStreamShard(String streamName, String shardId) { + return new StreamShardHandle(streamName, new Shard().withShardId(shardId)); + } }