Skip to content

Conversation

@thomasg19930417
Copy link
Contributor

Purpose of the change

For example: Add dynamic sink topic support for Pulsar connector.

Brief change log

  • Change the internal design of ProducerRegister.
  • Expose topic metadata query in PulsarSinkContext.
  • Change the internal metadata cache in MetadataListener.

Verifying this change

Please make sure both new and modified tests in this PR follows the conventions defined in our code quality
guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added unit tests
  • Added integration tests for end-to-end deployment
  • Manually verified by running the Pulsar connector on a local Flink cluster.

Significant changes

(Please check any boxes [x] if the answer is "yes". You can first publish the PR and check them afterwards, for
convenience.)

  • Dependencies have been added or upgraded
  • Public API has been changed (Public API is any class annotated with @Public(Evolving))
  • Serializers have been changed
  • New feature has been introduced
    • If yes, how is this documented? (not applicable / docs / JavaDocs / not documented)

@boring-cyborg
Copy link

boring-cyborg bot commented Apr 14, 2025

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

@thomasg19930417
Copy link
Contributor Author

In Pull Request #103, only the isResetSubscriptionCursor parameter was simply enabled. If this parameter is set to false, then there will be no code related to setting the initial cursor. All subscriptions will start from the default earliest or latest position, instead of the explicitly set earliest or latest position.

@thomasg19930417 thomasg19930417 changed the title Resolve FLINK-35477,FLINK-37299 Rollback can control whether to consume from a specified cursor and also set the initial cursor. Apr 14, 2025
@thomasg19930417 thomasg19930417 changed the title Rollback can control whether to consume from a specified cursor and also set the initial cursor. [FLINK-35477],[FLINK-37299] Solve the problem that when not starting from the Checkpoint (CK), the cursor is always reset to the set initial cursor position. Apr 14, 2025
@thomasg19930417 thomasg19930417 marked this pull request as draft April 17, 2025 01:34
@thomasg19930417 thomasg19930417 marked this pull request as ready for review April 17, 2025 01:35
@thomasg19930417 thomasg19930417 marked this pull request as draft May 17, 2025 09:50
@thomasg19930417 thomasg19930417 marked this pull request as ready for review July 11, 2025 01:11
Copy link
Contributor

@ferenc-csaky ferenc-csaky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic LGTM, added 2 minor suggestions.

}
}

if (Objects.equals(startCursor, StartCursor.latest())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: StartCursor.latest().equals(startCursor) is also null-safe, shorter and does not require a util class

.enumType(SubscriptionInitialPosition.class)
.defaultValue(SubscriptionInitialPosition.Latest)
.withDescription(
Description.builder().text("Consumer initial position.").build());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a bit more descriptive text here, e.g.: Initial cursor position of the consumer.

@ferenc-csaky ferenc-csaky changed the title [FLINK-35477],[FLINK-37299] Solve the problem that when not starting from the Checkpoint (CK), the cursor is always reset to the set initial cursor position. [FLINK-35477] Make initial cursor position configurable Oct 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants