Skip to content

Conversation

poorbarcode
Copy link

@poorbarcode poorbarcode commented Mar 7, 2025

Purpose of the change

Background 1: dynamically create a Pulsar Topic by Flink connector-pulsar

Flink connector-pulsar provided a way to dynamically create a Pulsar Topic when DynamicTopicRouter returns a non-existing one. see also: flink-connector-pulsar/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java at main · apache/flink-connector-pulsar

  • pulsarClient.getPartitionsForTopic(topic) will create a topic automatically if it does not exist.

Background 2:  how dynamically created topics in Pulsar Server

  • There is a config named allowAutoTopicCreationType, which can be set to partitioned or non-partitioned
  • If it was set partitioned, Pulsar will create a partitioned topic with {defaultNumPartitions} partitions. For example, Pulsar will create topics named {tenant}/{namespace}/{topic name}-partition-0 and {tenant}/{namespace}/{topic name}-partition-1, and create a relationship between them, which indicates they are in a same partitioned topic.
  • If it was set non-partitioned, Pulsar will create a non-partitioned topic. Pulsar will create topics named {tenant}/{namespace}/{topic name}, which does not include a suffix partition-{num}.

Issue:

  • if pulsarClient.getPartitionsForTopic(topic) get a param {tenant}/{namespace}/{topic name}-partition-0, which includes the suffix partition-0, Pulsar will create a non-partitioned topic named {tenant}/{namespace}/{topic name}-partition-0
  • After you call pulsarClient.getPartitionsForTopic(topic) with a param {tenant}/{namespace}/{topic name}-partition-1, you will get two partitions named {tenant}/{namespace}/{topic name}-partition-0 and {tenant}/{namespace}/{topic name}-partition-1, but there is no relationship record between them.

Relates to https://issues.apache.org/jira/projects/FLINK/issues/FLINK-37436

Brief change log

Fix the incorrect API calling.

Verifying this change

This change is a minor change and don't have any tests.

Significant changes

(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for
convenience.)

  • Dependencies have been added or upgraded
  • Public API has been changed (Public API is any class annotated with @Public(Evolving))
  • Serializers have been changed
  • New feature has been introduced
    • If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)

…en dynamically creating topics by DynamicTopicRouter
@boring-cyborg
Copy link

boring-cyborg bot commented Mar 7, 2025

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

Comment on lines +225 to +227
pulsarClient
.getPartitionsForTopic(TopicName.get(topic).getPartitionedTopicName())
.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me this could be simplified to:

pulsarClient.getPartitionsForTopic(topicName.getPartitionedTopicName()).get();

.getPartitionsForTopic(TopicName.get(topic).getPartitionedTopicName())
.get();
}
// Step-2: create partition.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't step 2 inside an else branch? I don't know anything about the pulsar client but duplicating this call does not seem right.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants