From 3dfc2a8f31cd781d3fc1b3f97b75c33f23aed912 Mon Sep 17 00:00:00 2001 From: "xu.guo" Date: Mon, 14 Apr 2025 14:24:39 +0800 Subject: [PATCH] Resolve FLINK-35477,FLINK-37299 --- .../pulsar/source/PulsarSourceBuilder.java | 9 +++++++++ .../pulsar/source/PulsarSourceOptions.java | 8 ++++++++ .../pulsar/source/config/SourceConfiguration.java | 15 +++++++++++++-- .../source/enumerator/PulsarSourceEnumerator.java | 4 +++- .../source/reader/PulsarPartitionSplitReader.java | 3 +++ 5 files changed, 36 insertions(+), 3 deletions(-) diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java index 687e9b8f..ac9695ad 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java @@ -46,6 +46,7 @@ import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.SchemaInfo; @@ -56,6 +57,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.regex.Pattern; @@ -65,6 +67,7 @@ import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_NAME; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CRYPTO_FAILURE_ACTION; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_INITIAL_CURSOR; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_READ_SCHEMA_EVOLUTION; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME; @@ -590,6 +593,12 @@ public PulsarSource build() { } } + if (Objects.equals(startCursor, StartCursor.latest())) { + configBuilder.set(PULSAR_INITIAL_CURSOR, SubscriptionInitialPosition.Latest); + } else { + configBuilder.set(PULSAR_INITIAL_CURSOR, SubscriptionInitialPosition.Earliest); + } + // Make sure they are serializable. checkState( isSerializable(deserializationSchema), diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java index de1595a5..3346fed8 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java @@ -28,6 +28,7 @@ import org.apache.flink.connector.pulsar.source.config.CursorVerification; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; import java.time.Duration; @@ -232,6 +233,13 @@ private PulsarSourceOptions() { code("StartCursor")) .build()); + public static final ConfigOption PULSAR_INITIAL_CURSOR = + ConfigOptions.key(SOURCE_CONFIG_PREFIX + "initialCursor") + .enumType(SubscriptionInitialPosition.class) + .defaultValue(SubscriptionInitialPosition.Latest) + .withDescription( + Description.builder().text("Consumer initial position.").build()); + /////////////////////////////////////////////////////////////////////////////// // // The configuration for ConsumerConfigurationData part. diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java index 2b472975..765db30e 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java @@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; @@ -40,6 +41,7 @@ import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_SOURCE_METRICS; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_FETCH_ONE_MESSAGE_TIME; +import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_INITIAL_CURSOR; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME; import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS; @@ -68,6 +70,7 @@ public class SourceConfiguration extends PulsarConfiguration { private final boolean enableSchemaEvolution; private final boolean enableMetrics; private final boolean resetSubscriptionCursor; + private final SubscriptionInitialPosition subscriptionInitialPosition; public SourceConfiguration(Configuration configuration) { super(configuration); @@ -87,6 +90,7 @@ public SourceConfiguration(Configuration configuration) { this.enableMetrics = get(PULSAR_ENABLE_SOURCE_METRICS) && get(PULSAR_STATS_INTERVAL_SECONDS) > 0; this.resetSubscriptionCursor = get(PULSAR_RESET_SUBSCRIPTION_CURSOR); + this.subscriptionInitialPosition = get(PULSAR_INITIAL_CURSOR); } /** The capacity of the element queue in the source reader. */ @@ -209,6 +213,11 @@ public String getSubscriptionDesc() { return getSubscriptionName() + "(Exclusive," + getSubscriptionMode() + ")"; } + /** The initial position for the subscription. */ + public SubscriptionInitialPosition getInitialPosition() { + return subscriptionInitialPosition; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -234,7 +243,8 @@ public boolean equals(Object o) { && allowKeySharedOutOfOrderDelivery == that.allowKeySharedOutOfOrderDelivery && enableSchemaEvolution == that.enableSchemaEvolution && enableMetrics == that.enableMetrics - && resetSubscriptionCursor == that.resetSubscriptionCursor; + && resetSubscriptionCursor == that.resetSubscriptionCursor + && subscriptionInitialPosition == that.subscriptionInitialPosition; } @Override @@ -254,6 +264,7 @@ public int hashCode() { allowKeySharedOutOfOrderDelivery, enableSchemaEvolution, enableMetrics, - resetSubscriptionCursor); + resetSubscriptionCursor, + subscriptionInitialPosition); } } diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java index 57bb72b7..238521e6 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/PulsarSourceEnumerator.java @@ -220,7 +220,9 @@ private void checkPartitionChanges(Set fetchedPartitions, Throwa startCursor.position(partition.getTopic(), partition.getPartitionId()); try { - position.setupSubPosition(pulsarClient, topic, subscriptionName); + if (sourceConfiguration.isResetSubscriptionCursor()) { + position.setupSubPosition(pulsarClient, topic, subscriptionName); + } } catch (PulsarClientException e) { throw new FlinkRuntimeException(e); } diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java index 2b196e30..6e76e98b 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReader.java @@ -313,6 +313,9 @@ private Consumer createPulsarConsumer(TopicPartition partition) consumerBuilder.keySharedPolicy(policy); } + // set initial position + consumerBuilder.subscriptionInitialPosition(sourceConfiguration.getInitialPosition()); + // Create the consumer configuration by using common utils. Consumer consumer = consumerBuilder.subscribe();