Skip to content

Commit 3b4b184

Browse files
thomasg19930417ferenc-csaky
authored andcommitted
[FLINK-35477] Make initial cursor position configurable
Closes #105
1 parent 0f7b744 commit 3b4b184

File tree

5 files changed

+35
-3
lines changed

5 files changed

+35
-3
lines changed

flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
4747
import org.apache.pulsar.client.api.RegexSubscriptionMode;
4848
import org.apache.pulsar.client.api.Schema;
49+
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
4950
import org.apache.pulsar.client.api.schema.GenericRecord;
5051
import org.apache.pulsar.common.schema.KeyValue;
5152
import org.apache.pulsar.common.schema.SchemaInfo;
@@ -56,6 +57,7 @@
5657
import java.util.Arrays;
5758
import java.util.List;
5859
import java.util.Map;
60+
import java.util.Objects;
5961
import java.util.Properties;
6062
import java.util.regex.Pattern;
6163

@@ -65,6 +67,7 @@
6567
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
6668
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_NAME;
6769
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CRYPTO_FAILURE_ACTION;
70+
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_INITIAL_CURSOR;
6871
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
6972
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_READ_SCHEMA_EVOLUTION;
7073
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
@@ -590,6 +593,12 @@ public PulsarSource<OUT> build() {
590593
}
591594
}
592595

596+
if (Objects.equals(startCursor, StartCursor.latest())) {
597+
configBuilder.set(PULSAR_INITIAL_CURSOR, SubscriptionInitialPosition.Latest);
598+
} else {
599+
configBuilder.set(PULSAR_INITIAL_CURSOR, SubscriptionInitialPosition.Earliest);
600+
}
601+
593602
// Make sure they are serializable.
594603
checkState(
595604
isSerializable(deserializationSchema),

flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.connector.pulsar.source.config.CursorVerification;
2929

3030
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
31+
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
3132
import org.apache.pulsar.client.api.SubscriptionMode;
3233

3334
import java.time.Duration;
@@ -232,6 +233,13 @@ private PulsarSourceOptions() {
232233
code("StartCursor"))
233234
.build());
234235

236+
public static final ConfigOption<SubscriptionInitialPosition> PULSAR_INITIAL_CURSOR =
237+
ConfigOptions.key(SOURCE_CONFIG_PREFIX + "initialCursor")
238+
.enumType(SubscriptionInitialPosition.class)
239+
.defaultValue(SubscriptionInitialPosition.Latest)
240+
.withDescription(
241+
Description.builder().text("Consumer initial position.").build());
242+
235243
///////////////////////////////////////////////////////////////////////////////
236244
//
237245
// The configuration for ConsumerConfigurationData part.

flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828
import org.apache.pulsar.client.api.ConsumerBuilder;
2929
import org.apache.pulsar.client.api.Schema;
30+
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
3031
import org.apache.pulsar.client.api.SubscriptionMode;
3132
import org.apache.pulsar.client.api.SubscriptionType;
3233

@@ -40,6 +41,7 @@
4041
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
4142
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_SOURCE_METRICS;
4243
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_FETCH_ONE_MESSAGE_TIME;
44+
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_INITIAL_CURSOR;
4345
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
4446
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
4547
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
@@ -68,6 +70,7 @@ public class SourceConfiguration extends PulsarConfiguration {
6870
private final boolean enableSchemaEvolution;
6971
private final boolean enableMetrics;
7072
private final boolean resetSubscriptionCursor;
73+
private final SubscriptionInitialPosition subscriptionInitialPosition;
7174

7275
public SourceConfiguration(Configuration configuration) {
7376
super(configuration);
@@ -87,6 +90,7 @@ public SourceConfiguration(Configuration configuration) {
8790
this.enableMetrics =
8891
get(PULSAR_ENABLE_SOURCE_METRICS) && get(PULSAR_STATS_INTERVAL_SECONDS) > 0;
8992
this.resetSubscriptionCursor = get(PULSAR_RESET_SUBSCRIPTION_CURSOR);
93+
this.subscriptionInitialPosition = get(PULSAR_INITIAL_CURSOR);
9094
}
9195

9296
/** The capacity of the element queue in the source reader. */
@@ -209,6 +213,11 @@ public String getSubscriptionDesc() {
209213
return getSubscriptionName() + "(Exclusive," + getSubscriptionMode() + ")";
210214
}
211215

216+
/** The initial position for the subscription. */
217+
public SubscriptionInitialPosition getInitialPosition() {
218+
return subscriptionInitialPosition;
219+
}
220+
212221
@Override
213222
public boolean equals(Object o) {
214223
if (this == o) {
@@ -234,7 +243,8 @@ public boolean equals(Object o) {
234243
&& allowKeySharedOutOfOrderDelivery == that.allowKeySharedOutOfOrderDelivery
235244
&& enableSchemaEvolution == that.enableSchemaEvolution
236245
&& enableMetrics == that.enableMetrics
237-
&& resetSubscriptionCursor == that.resetSubscriptionCursor;
246+
&& resetSubscriptionCursor == that.resetSubscriptionCursor
247+
&& subscriptionInitialPosition == that.subscriptionInitialPosition;
238248
}
239249

240250
@Override
@@ -254,6 +264,7 @@ public int hashCode() {
254264
allowKeySharedOutOfOrderDelivery,
255265
enableSchemaEvolution,
256266
enableMetrics,
257-
resetSubscriptionCursor);
267+
resetSubscriptionCursor,
268+
subscriptionInitialPosition);
258269
}
259270
}

flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,8 @@ private void checkPartitionChanges(Set<TopicPartition> fetchedPartitions, Throwa
220220
startCursor.position(partition.getTopic(), partition.getPartitionId());
221221

222222
try {
223-
//If resetSubscriptionCursor is set to true, the position is reset to the position specified by StartCursor each time
223+
// If resetSubscriptionCursor is set to true, the position is reset to the position
224+
// specified by StartCursor each time
224225
if (sourceConfiguration.isResetSubscriptionCursor()) {
225226
position.setupSubPosition(pulsarClient, topic, subscriptionName);
226227
}

flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,9 @@ private Consumer<byte[]> createPulsarConsumer(TopicPartition partition)
313313
consumerBuilder.keySharedPolicy(policy);
314314
}
315315

316+
// set initial position
317+
consumerBuilder.subscriptionInitialPosition(sourceConfiguration.getInitialPosition());
318+
316319
// Create the consumer configuration by using common utils.
317320
Consumer<byte[]> consumer = consumerBuilder.subscribe();
318321

0 commit comments

Comments
 (0)