Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaInfo;
Expand All @@ -56,6 +57,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.regex.Pattern;

Expand All @@ -65,6 +67,7 @@
import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_SERVICE_URL;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CONSUMER_NAME;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_CRYPTO_FAILURE_ACTION;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_INITIAL_CURSOR;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_READ_SCHEMA_EVOLUTION;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_SUBSCRIPTION_NAME;
Expand Down Expand Up @@ -590,6 +593,12 @@ public PulsarSource<OUT> build() {
}
}

if (Objects.equals(startCursor, StartCursor.latest())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: StartCursor.latest().equals(startCursor) is also null-safe, shorter and does not require a util class

configBuilder.set(PULSAR_INITIAL_CURSOR, SubscriptionInitialPosition.Latest);
} else {
configBuilder.set(PULSAR_INITIAL_CURSOR, SubscriptionInitialPosition.Earliest);
}

// Make sure they are serializable.
checkState(
isSerializable(deserializationSchema),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.connector.pulsar.source.config.CursorVerification;

import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;

import java.time.Duration;
Expand Down Expand Up @@ -232,6 +233,13 @@ private PulsarSourceOptions() {
code("StartCursor"))
.build());

public static final ConfigOption<SubscriptionInitialPosition> PULSAR_INITIAL_CURSOR =
ConfigOptions.key(SOURCE_CONFIG_PREFIX + "initialCursor")
.enumType(SubscriptionInitialPosition.class)
.defaultValue(SubscriptionInitialPosition.Latest)
.withDescription(
Description.builder().text("Consumer initial position.").build());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a bit more descriptive text here, e.g.: Initial cursor position of the consumer.


///////////////////////////////////////////////////////////////////////////////
//
// The configuration for ConsumerConfigurationData part.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;

Expand All @@ -40,6 +41,7 @@
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_ENABLE_SOURCE_METRICS;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_FETCH_ONE_MESSAGE_TIME;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_INITIAL_CURSOR;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_RECORDS;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_MAX_FETCH_TIME;
import static org.apache.flink.connector.pulsar.source.PulsarSourceOptions.PULSAR_PARTITION_DISCOVERY_INTERVAL_MS;
Expand Down Expand Up @@ -68,6 +70,7 @@ public class SourceConfiguration extends PulsarConfiguration {
private final boolean enableSchemaEvolution;
private final boolean enableMetrics;
private final boolean resetSubscriptionCursor;
private final SubscriptionInitialPosition subscriptionInitialPosition;

public SourceConfiguration(Configuration configuration) {
super(configuration);
Expand All @@ -87,6 +90,7 @@ public SourceConfiguration(Configuration configuration) {
this.enableMetrics =
get(PULSAR_ENABLE_SOURCE_METRICS) && get(PULSAR_STATS_INTERVAL_SECONDS) > 0;
this.resetSubscriptionCursor = get(PULSAR_RESET_SUBSCRIPTION_CURSOR);
this.subscriptionInitialPosition = get(PULSAR_INITIAL_CURSOR);
}

/** The capacity of the element queue in the source reader. */
Expand Down Expand Up @@ -209,6 +213,11 @@ public String getSubscriptionDesc() {
return getSubscriptionName() + "(Exclusive," + getSubscriptionMode() + ")";
}

/** The initial position for the subscription. */
public SubscriptionInitialPosition getInitialPosition() {
return subscriptionInitialPosition;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -234,7 +243,8 @@ public boolean equals(Object o) {
&& allowKeySharedOutOfOrderDelivery == that.allowKeySharedOutOfOrderDelivery
&& enableSchemaEvolution == that.enableSchemaEvolution
&& enableMetrics == that.enableMetrics
&& resetSubscriptionCursor == that.resetSubscriptionCursor;
&& resetSubscriptionCursor == that.resetSubscriptionCursor
&& subscriptionInitialPosition == that.subscriptionInitialPosition;
}

@Override
Expand All @@ -254,6 +264,7 @@ public int hashCode() {
allowKeySharedOutOfOrderDelivery,
enableSchemaEvolution,
enableMetrics,
resetSubscriptionCursor);
resetSubscriptionCursor,
subscriptionInitialPosition);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,9 @@ private void checkPartitionChanges(Set<TopicPartition> fetchedPartitions, Throwa
startCursor.position(partition.getTopic(), partition.getPartitionId());

try {
position.setupSubPosition(pulsarClient, topic, subscriptionName);
if (sourceConfiguration.isResetSubscriptionCursor()) {
position.setupSubPosition(pulsarClient, topic, subscriptionName);
}
Comment on lines +223 to +225
Copy link

@vbabenkoru vbabenkoru Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the actual fix for FLINK-35477? And the rest of the changes are unrelated to the fix?

} catch (PulsarClientException e) {
throw new FlinkRuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,9 @@ private Consumer<byte[]> createPulsarConsumer(TopicPartition partition)
consumerBuilder.keySharedPolicy(policy);
}

// set initial position
consumerBuilder.subscriptionInitialPosition(sourceConfiguration.getInitialPosition());

// Create the consumer configuration by using common utils.
Consumer<byte[]> consumer = consumerBuilder.subscribe();

Expand Down