Skip to content

Commit ef8b373

Browse files
authored
[DBZ-PGYB] Specify only HYBRID_TIME when creating slot, skip otherwise (#175)
## Problem With the introduction of `slot.lsn.type`, the default value we use for creating the replication slot is `SEQUENCE` when the user does not provide any value to the configuration property. However, while creating the replication slot, if the user doesn't provide this config, the `CREATE_REPLICATION_SLOT` statement looks like: ``` CREATE_REPLICATION_SLOT slot_name LOGICAL plugin_name SEQUENCE; ``` The above breaks the backward compatibility and causes the creation of replication slot to fail on older versions where the newly added syntax is not supported. ## Solution As a fix, we will now only specify a LSN type while creating the replication slot if it is `HYBRID_TIME`. For the case with `SEQUENCE`, we will now rely on the default values as specified on the service to create a replication slot of LSN type `SEQUENCE`.
1 parent 3e3cfab commit ef8b373

File tree

1 file changed

+5
-1
lines changed

1 file changed

+5
-1
lines changed

debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -525,13 +525,17 @@ public Optional<SlotCreationResult> createReplicationSlot() throws SQLException
525525
// For pgoutput specifically, the publication must be created prior to the slot.
526526
initPublication();
527527

528+
// YB Note: We will only be specifying the LSN type when it is HYBRID_TIME, for other case(s)
529+
// i.e. SEQUENCE, we will let the service handle it with the default value. This is to ensure
530+
// that we stay backward compatible as the syntax is not recognizable by initial versions
531+
// of logical replication in YugabyteDB.
528532
try (Statement stmt = pgConnection().createStatement()) {
529533
String createCommand = String.format(
530534
"CREATE_REPLICATION_SLOT \"%s\" %s LOGICAL %s %s %s",
531535
slotName,
532536
tempPart,
533537
plugin.getPostgresPluginName(),
534-
lsnType.getLsnTypeName(),
538+
lsnType.getLsnTypeName().equalsIgnoreCase("SEQUENCE") ? "" : "HYBRID_TIME",
535539
streamingMode.isParallel() ? "USE_SNAPSHOT" : "");
536540

537541
// Begin a read-only transaction when it is the parallel streaming mode because

0 commit comments

Comments
 (0)