From 3bc31186fe1c18ce09a7a014c29a191ca7b99964 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Wed, 16 Oct 2024 10:31:29 +0530 Subject: [PATCH 1/4] connector changes to use user specified lsn type --- .../postgresql/PostgresConnectorConfig.java | 69 ++++++++++ .../PostgresStreamingChangeEventSource.java | 120 +++++++++++++++--- .../connector/postgresql/connection/Lsn.java | 2 +- .../PostgresReplicationConnection.java | 11 +- .../connection/WalPositionLocator.java | 11 +- 5 files changed, 192 insertions(+), 21 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index f342426dce3..5d2a2b11575 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -374,6 +374,62 @@ public static SecureConnectionMode parse(String value, String defaultValue) { } } + public enum LsnType implements EnumeratedValue { + SEQUENCE("sequence") { + @Override + public String getLsnTypeName() { + return getValue(); + } + + @Override + public boolean isSequence() { + return true; + } + + @Override + public boolean isHybridTime() { + return false; + } + }, + HYBRID_TIME("hybrid_time") { + @Override + public String getLsnTypeName() { + return getValue(); + } + + @Override + public boolean isSequence() { + return false; + } + + @Override + public boolean isHybridTime() { + return true; + } + }; + + private final String lsnTypeName; + + LsnType(String lsnTypeName) { + this.lsnTypeName = lsnTypeName; + } + + public static LsnType parse(String s) { + return valueOf(s.trim().toUpperCase()); + } + + @Override + public String getValue() { + return lsnTypeName; + } + + public abstract boolean isSequence(); + + public abstract boolean isHybridTime(); + + public abstract String getLsnTypeName(); + } + public enum LogicalDecoder implements EnumeratedValue { PGOUTPUT("pgoutput") { @Override @@ -572,6 +628,14 @@ public static SchemaRefreshMode parse(String value) { + "'. " + "Defaults to '" + LogicalDecoder.YBOUTPUT.getValue() + "'."); + public static final Field SLOT_LSN_TYPE = Field.create("slot.lsn.type") + .withDisplayName("Slot LSN type") + .withType(Type.STRING) + .withWidth(Width.MEDIUM) + .withImportance(Importance.MEDIUM) + .withEnum(LsnType.class, LsnType.SEQUENCE) + .withDescription("LSN type being used with the replication slot"); + public static final Field SLOT_NAME = Field.create("slot.name") .withDisplayName("Slot") .withType(Type.STRING) @@ -1032,6 +1096,10 @@ protected String slotName() { return getConfig().getString(SLOT_NAME); } + public LsnType slotLsnType() { + return LsnType.parse(getConfig().getString(SLOT_LSN_TYPE)); + } + protected boolean dropSlotOnStop() { if (getConfig().hasKey(DROP_SLOT_ON_STOP.name())) { return getConfig().getBoolean(DROP_SLOT_ON_STOP); @@ -1154,6 +1222,7 @@ protected SourceInfoStructMaker getSourceInfoStruc DATABASE_NAME, PLUGIN_NAME, SLOT_NAME, + SLOT_LSN_TYPE, PUBLICATION_NAME, PUBLICATION_AUTOCREATE_MODE, REPLICA_IDENTITY_AUTOSET_VALUES, diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index 8d2fcaa1b62..896b23cffb9 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -9,6 +9,7 @@ import java.util.Map; import java.util.Objects; import java.util.OptionalLong; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.connect.errors.ConnectException; @@ -80,12 +81,16 @@ public class PostgresStreamingChangeEventSource implements StreamingChangeEventS */ private long numberOfEventsSinceLastEventSentOrWalGrowingWarning = 0; private Lsn lastCompletelyProcessedLsn; + private Lsn lastSentFeedback = Lsn.valueOf(2L); private PostgresOffsetContext effectiveOffset; + protected ConcurrentLinkedQueue commitTimes; + /** * For DEBUGGING */ private OptionalLong lastTxnidForWhichCommitSeen = OptionalLong.empty(); + private long recordCount = 0; public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfig, Snapshotter snapshotter, PostgresConnection connection, PostgresEventDispatcher dispatcher, ErrorHandler errorHandler, Clock clock, @@ -148,13 +153,31 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio } if (hasStartLsnStoredInContext) { - // start streaming from the last recorded position in the offset - final Lsn lsn = this.effectiveOffset.lastCompletelyProcessedLsn() != null ? this.effectiveOffset.lastCompletelyProcessedLsn() - : this.effectiveOffset.lsn(); - final Operation lastProcessedMessageType = this.effectiveOffset.lastProcessedMessageType(); - LOGGER.info("Retrieved latest position from stored offset '{}'", lsn); - walPosition = new WalPositionLocator(this.effectiveOffset.lastCommitLsn(), lsn, lastProcessedMessageType); - replicationStream.compareAndSet(null, replicationConnection.startStreaming(lsn, walPosition)); + if (connectorConfig.slotLsnType().isHybridTime()) { + LOGGER.info("LSN is stored in context for type HT"); + final Lsn lsn = this.effectiveOffset.lastCommitLsn() == null ? + lastSentFeedback : this.effectiveOffset.lastCommitLsn(); + + if (this.effectiveOffset.lastCommitLsn() == null) { + LOGGER.info("Last commit stored in offset is null"); + } + + LOGGER.info("Retrieved last committed LSN from stored offset '{}'", lsn); + + final Operation lastProcessedMessageType = this.effectiveOffset.lastProcessedMessageType(); + walPosition = new WalPositionLocator(lsn, lsn, lastProcessedMessageType); + replicationStream.compareAndSet(null, replicationConnection.startStreaming(lsn, walPosition)); + lastSentFeedback = lsn; + } else { + // This is the SEQUENCE LSN type + // start streaming from the last recorded position in the offset + final Lsn lsn = this.effectiveOffset.lastCompletelyProcessedLsn() != null ? this.effectiveOffset.lastCompletelyProcessedLsn() + : this.effectiveOffset.lsn(); + final Operation lastProcessedMessageType = this.effectiveOffset.lastProcessedMessageType(); + LOGGER.info("Retrieved latest position from stored offset '{}'", lsn); + walPosition = new WalPositionLocator(this.effectiveOffset.lastCommitLsn(), lsn, lastProcessedMessageType); + replicationStream.compareAndSet(null, replicationConnection.startStreaming(lsn, walPosition)); + } } else { LOGGER.info("No previous LSN found in Kafka, streaming from the latest xlogpos or flushed LSN..."); @@ -198,7 +221,12 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio replicationConnection.getConnectedNodeIp()); } - replicationStream.set(replicationConnection.startStreaming(walPosition.getLastEventStoredLsn(), walPosition)); + if (connectorConfig.slotLsnType().isHybridTime()) { + replicationStream.set(replicationConnection.startStreaming(walPosition.getLastCommitStoredLsn(), walPosition)); + } else { + // This is for lsn type SEQUENCE. + replicationStream.set(replicationConnection.startStreaming(walPosition.getLastEventStoredLsn(), walPosition)); + } stream = this.replicationStream.get(); stream.startKeepAlive(Threads.newSingleThreadExecutor(YugabyteDBConnector.class, connectorConfig.getLogicalName(), KEEP_ALIVE_THREAD_NAME)); } @@ -292,6 +320,8 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff LOGGER.debug("Processing BEGIN with end LSN {} and txnid {}", lsn, message.getTransactionId()); } else { LOGGER.debug("Processing COMMIT with end LSN {} and txnid {}", lsn, message.getTransactionId()); + LOGGER.debug("Record count in the txn {} is {} with commit time {}", message.getTransactionId(), recordCount, lsn.asLong() - 1); + recordCount = 0; } OptionalLong currentTxnid = message.getTransactionId(); @@ -308,7 +338,7 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff // Don't skip on BEGIN message as it would flush LSN for the whole transaction // too early if (message.getOperation() == Operation.COMMIT) { - commitMessage(partition, offsetContext, lsn); + commitMessage(partition, offsetContext, lsn, message); } return; } @@ -321,7 +351,7 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff dispatcher.dispatchTransactionStartedEvent(partition, toString(message.getTransactionId()), offsetContext, message.getCommitTime()); } else if (message.getOperation() == Operation.COMMIT) { - commitMessage(partition, offsetContext, lsn); + commitMessage(partition, offsetContext, lsn, message); dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext, message.getCommitTime()); } maybeWarnAboutGrowingWalBacklog(true); @@ -333,7 +363,7 @@ else if (message.getOperation() == Operation.MESSAGE) { // non-transactional message that will not be followed by a COMMIT message if (message.isLastEventForLsn()) { - commitMessage(partition, offsetContext, lsn); + commitMessage(partition, offsetContext, lsn, message); } dispatcher.dispatchLogicalDecodingMessage( @@ -346,6 +376,9 @@ else if (message.getOperation() == Operation.MESSAGE) { } // DML event else { + LOGGER.trace("Processing DML event with lsn {} and lastCompletelyProcessedLsn {}", lsn, lastCompletelyProcessedLsn); + ++recordCount; + TableId tableId = null; if (message.getOperation() != Operation.NOOP) { tableId = PostgresSchema.parse(message.getTable()); @@ -384,8 +417,17 @@ private void searchWalPosition(ChangeEventSourceContext context, PostgresPartiti while (context.isRunning() && resumeLsn.get() == null) { boolean receivedMessage = stream.readPending(message -> { - final Lsn lsn = stream.lastReceivedLsn(); + final Lsn lsn; + if (connectorConfig.slotLsnType().isHybridTime()) { + lsn = walPosition.getLastCommitStoredLsn() != null ? walPosition.getLastCommitStoredLsn() : lastSentFeedback; + } else { + lsn = stream.lastReceivedLsn(); + } resumeLsn.set(walPosition.resumeFromLsn(lsn, message).orElse(null)); + + if (resumeLsn.get() == null) { + LOGGER.info("Resume LSN is null"); + } }); if (receivedMessage) { @@ -412,7 +454,14 @@ private void probeConnectionIfNeeded() throws SQLException { } } - private void commitMessage(PostgresPartition partition, PostgresOffsetContext offsetContext, final Lsn lsn) throws SQLException, InterruptedException { + private void commitMessage(PostgresPartition partition, PostgresOffsetContext offsetContext, final Lsn lsn, ReplicationMessage message) throws SQLException, InterruptedException { + if (this.connectorConfig.slotLsnType().isHybridTime()) { + if (message.getOperation() == Operation.COMMIT) { + LOGGER.info("Adding '{}' as lsn to the commit times queue", Lsn.valueOf(lsn.asLong() - 1)); + commitTimes.add(Lsn.valueOf(lsn.asLong() - 1)); + } + } + lastCompletelyProcessedLsn = lsn; offsetContext.updateCommitPosition(lsn, lastCompletelyProcessedLsn); maybeWarnAboutGrowingWalBacklog(false); @@ -463,18 +512,30 @@ public void commitOffset(Map partition, Map offset) { final Lsn changeLsn = Lsn.valueOf((Long) offset.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)); final Lsn lsn = (commitLsn != null) ? commitLsn : changeLsn; - LOGGER.debug("Received offset commit request on commit LSN '{}' and change LSN '{}'", commitLsn, changeLsn); + LOGGER.info("Received offset commit request on commit LSN '{}' and change LSN '{}'", commitLsn, changeLsn); if (replicationStream != null && lsn != null) { if (!lsnFlushingAllowed) { LOGGER.info("Received offset commit request on '{}', but ignoring it. LSN flushing is not allowed yet", lsn); return; } + Lsn finalLsn; + if (this.connectorConfig.slotLsnType().isHybridTime()) { + finalLsn = getLsnToBeFlushed(lsn); + } else { + finalLsn = lsn; + } + if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Flushing LSN to server: {}", lsn); + LOGGER.info("Flushing LSN to server: {}", finalLsn); } // tell the server the point up to which we've processed data, so it can be free to recycle WAL segments replicationStream.flushLsn(lsn); + + if (this.connectorConfig.slotLsnType().isHybridTime()) { + lastSentFeedback = finalLsn; + cleanCommitTimeQueue(finalLsn); + } } else { LOGGER.debug("Streaming has already stopped, ignoring commit callback..."); @@ -485,6 +546,35 @@ public void commitOffset(Map partition, Map offset) { } } + protected Lsn getLsnToBeFlushed(Lsn lsn) { + if (commitTimes == null || commitTimes.isEmpty()) { + // This means that the queue has not been initialised and the task is still starting. + return lastSentFeedback; + } + + Lsn result = lastSentFeedback; + + LOGGER.info("Queue at this time: {}", commitTimes); + + for (Lsn commitLsn : commitTimes) { + if (commitLsn.compareTo(lsn) < 0) { + LOGGER.debug("Assigning result as {}", commitLsn); + result = commitLsn; + } else { + // This will be the loop exit when we encounter any bigger element. + break; + } + } + + return result; + } + + protected void cleanCommitTimeQueue(Lsn lsn) { + if (commitTimes != null) { + commitTimes.removeIf(ele -> ele.compareTo(lsn) < 1); + } + } + @Override public PostgresOffsetContext getOffsetContext() { return effectiveOffset; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java index b9f4b7dc8fe..d2cedf2a033 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java @@ -144,7 +144,7 @@ public boolean isValid() { @Override public String toString() { - return "LSN{" + asString() + '}'; + return "LSN{" + asLong() + '}'; } @Override diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index a95d46c239d..75c917841e3 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -64,6 +64,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep private static Logger LOGGER = LoggerFactory.getLogger(PostgresReplicationConnection.class); private final String slotName; + private final PostgresConnectorConfig.LsnType lsnType; private final String publicationName; private final RelationalTableFilters tableFilter; private final PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode; @@ -114,6 +115,7 @@ private PostgresReplicationConnection(PostgresConnectorConfig config, this.connectorConfig = config; this.slotName = slotName; + this.lsnType = config.slotLsnType(); this.publicationName = publicationName; this.tableFilter = tableFilter; this.publicationAutocreateMode = publicationAutocreateMode; @@ -395,8 +397,8 @@ public ReplicationStream startStreaming(Lsn offset, WalPositionLocator walPositi offset = defaultStartingPos; } Lsn lsn = offset; - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("starting streaming from LSN '{}'", lsn); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("starting streaming from LSN '{}'", lsn); } final int maxRetries = connectorConfig.maxRetries(); @@ -522,10 +524,11 @@ public Optional createReplicationSlot() throws SQLException try (Statement stmt = pgConnection().createStatement()) { String createCommand = String.format( - "CREATE_REPLICATION_SLOT \"%s\" %s LOGICAL %s", + "CREATE_REPLICATION_SLOT \"%s\" %s LOGICAL %s %s", slotName, tempPart, - plugin.getPostgresPluginName()); + plugin.getPostgresPluginName(), + lsnType.getLsnTypeName()); LOGGER.info("Creating replication slot with command {}", createCommand); stmt.execute(createCommand); // when we are in Postgres 9.4+, we can parse the slot creation info, diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/WalPositionLocator.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/WalPositionLocator.java index 9d9cb4fa6f7..5fecef1c569 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/WalPositionLocator.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/WalPositionLocator.java @@ -43,6 +43,7 @@ public class WalPositionLocator { public WalPositionLocator(Lsn lastCommitStoredLsn, Lsn lastEventStoredLsn, Operation lastProcessedMessageType) { this.lastCommitStoredLsn = lastCommitStoredLsn; this.lastEventStoredLsn = lastEventStoredLsn; + // YB Note: lastEventStoredLsn and lastCommitStoredLsn will be the same in case of LSN type SEQUENCE. this.lastProcessedMessageType = lastProcessedMessageType; LOGGER.info("Looking for WAL restart position for last commit LSN '{}' and last change LSN '{}'", @@ -82,6 +83,7 @@ public Optional resumeFromLsn(Lsn currentLsn, ReplicationMessage message) { startStreamingLsn = txStartLsn; return Optional.of(startStreamingLsn); } + LOGGER.info("Returning optional empty as resume LSN"); return Optional.empty(); } lsnAfterLastEventStoredLsn = currentLsn; @@ -91,14 +93,21 @@ public Optional resumeFromLsn(Lsn currentLsn, ReplicationMessage message) { return Optional.of(startStreamingLsn); } if (currentLsn.equals(lastEventStoredLsn)) { + LOGGER.info("Current LSN is equal to the last event stored LSN {}", lastEventStoredLsn); storeLsnAfterLastEventStoredLsn = true; } if (lastCommitStoredLsn == null) { startStreamingLsn = firstLsnReceived; + LOGGER.info("Last commit stored LSN is null, returning firstLsnReceived {}", startStreamingLsn); return Optional.of(startStreamingLsn); } + if (currentLsn.equals(lastCommitStoredLsn)) { + LOGGER.info("Returning lastCommitStoredLsn {} for resuming", lastCommitStoredLsn); + return Optional.of(lastCommitStoredLsn); + } + switch (message.getOperation()) { case BEGIN: txStartLsn = currentLsn; @@ -160,7 +169,7 @@ public boolean skipMessage(Lsn lsn) { lsn, lsnSeen)); } - LOGGER.debug("Message with LSN '{}' filtered", lsn); + LOGGER.info("Message with LSN '{}' filtered", lsn); return true; } From 572c0aff3a185ce46453614b1e20ce04a6f76ad1 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Wed, 16 Oct 2024 17:26:12 +0530 Subject: [PATCH 2/4] changes --- .../postgresql/PostgresConnectorConfig.java | 4 ++-- .../PostgresStreamingChangeEventSource.java | 15 ++++++++------- .../postgresql/connection/WalPositionLocator.java | 12 ++++++++++-- .../connector/postgresql/PostgresConnectorIT.java | 1 + 4 files changed, 21 insertions(+), 11 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java index 5d2a2b11575..14657a63304 100755 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresConnectorConfig.java @@ -375,7 +375,7 @@ public static SecureConnectionMode parse(String value, String defaultValue) { } public enum LsnType implements EnumeratedValue { - SEQUENCE("sequence") { + SEQUENCE("SEQUENCE") { @Override public String getLsnTypeName() { return getValue(); @@ -391,7 +391,7 @@ public boolean isHybridTime() { return false; } }, - HYBRID_TIME("hybrid_time") { + HYBRID_TIME("HYBRID_TIME") { @Override public String getLsnTypeName() { return getValue(); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index 896b23cffb9..ccdf038c8e2 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -106,7 +106,7 @@ public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfi this.snapshotter = snapshotter; this.replicationConnection = (PostgresReplicationConnection) replicationConnection; this.connectionProbeTimer = ElapsedTimeStrategy.constant(Clock.system(), connectorConfig.statusUpdateInterval()); - + this.commitTimes = new ConcurrentLinkedQueue<>(); } @Override @@ -165,7 +165,7 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio LOGGER.info("Retrieved last committed LSN from stored offset '{}'", lsn); final Operation lastProcessedMessageType = this.effectiveOffset.lastProcessedMessageType(); - walPosition = new WalPositionLocator(lsn, lsn, lastProcessedMessageType); + walPosition = new WalPositionLocator(lsn, lsn, lastProcessedMessageType, this.connectorConfig.slotLsnType().isHybridTime()); replicationStream.compareAndSet(null, replicationConnection.startStreaming(lsn, walPosition)); lastSentFeedback = lsn; } else { @@ -175,13 +175,13 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio : this.effectiveOffset.lsn(); final Operation lastProcessedMessageType = this.effectiveOffset.lastProcessedMessageType(); LOGGER.info("Retrieved latest position from stored offset '{}'", lsn); - walPosition = new WalPositionLocator(this.effectiveOffset.lastCommitLsn(), lsn, lastProcessedMessageType); + walPosition = new WalPositionLocator(this.effectiveOffset.lastCommitLsn(), lsn, lastProcessedMessageType, this.connectorConfig.slotLsnType().isHybridTime()); replicationStream.compareAndSet(null, replicationConnection.startStreaming(lsn, walPosition)); } } else { LOGGER.info("No previous LSN found in Kafka, streaming from the latest xlogpos or flushed LSN..."); - walPosition = new WalPositionLocator(); + walPosition = new WalPositionLocator(this.connectorConfig.slotLsnType().isHybridTime()); replicationStream.compareAndSet(null, replicationConnection.startStreaming(walPosition)); } // for large dbs, the refresh of schema can take too much time @@ -455,6 +455,9 @@ private void probeConnectionIfNeeded() throws SQLException { } private void commitMessage(PostgresPartition partition, PostgresOffsetContext offsetContext, final Lsn lsn, ReplicationMessage message) throws SQLException, InterruptedException { + lastCompletelyProcessedLsn = lsn; + offsetContext.updateCommitPosition(lsn, lastCompletelyProcessedLsn); + if (this.connectorConfig.slotLsnType().isHybridTime()) { if (message.getOperation() == Operation.COMMIT) { LOGGER.info("Adding '{}' as lsn to the commit times queue", Lsn.valueOf(lsn.asLong() - 1)); @@ -462,8 +465,6 @@ private void commitMessage(PostgresPartition partition, PostgresOffsetContext of } } - lastCompletelyProcessedLsn = lsn; - offsetContext.updateCommitPosition(lsn, lastCompletelyProcessedLsn); maybeWarnAboutGrowingWalBacklog(false); dispatcher.dispatchHeartbeatEvent(partition, offsetContext); } @@ -530,7 +531,7 @@ public void commitOffset(Map partition, Map offset) { LOGGER.info("Flushing LSN to server: {}", finalLsn); } // tell the server the point up to which we've processed data, so it can be free to recycle WAL segments - replicationStream.flushLsn(lsn); + replicationStream.flushLsn(finalLsn); if (this.connectorConfig.slotLsnType().isHybridTime()) { lastSentFeedback = finalLsn; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/WalPositionLocator.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/WalPositionLocator.java index 5fecef1c569..f6e10b864e6 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/WalPositionLocator.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/WalPositionLocator.java @@ -39,21 +39,29 @@ public class WalPositionLocator { private Lsn startStreamingLsn = null; private boolean storeLsnAfterLastEventStoredLsn = false; private Set lsnSeen = new HashSet<>(1_000); + private boolean isLsnTypeHybridTime = false; - public WalPositionLocator(Lsn lastCommitStoredLsn, Lsn lastEventStoredLsn, Operation lastProcessedMessageType) { + public WalPositionLocator(Lsn lastCommitStoredLsn, Lsn lastEventStoredLsn, Operation lastProcessedMessageType, + boolean isLsnTypeHybridTime) { this.lastCommitStoredLsn = lastCommitStoredLsn; this.lastEventStoredLsn = lastEventStoredLsn; // YB Note: lastEventStoredLsn and lastCommitStoredLsn will be the same in case of LSN type SEQUENCE. this.lastProcessedMessageType = lastProcessedMessageType; + this.isLsnTypeHybridTime = isLsnTypeHybridTime; LOGGER.info("Looking for WAL restart position for last commit LSN '{}' and last change LSN '{}'", lastCommitStoredLsn, lastEventStoredLsn); } public WalPositionLocator() { + this(false); + } + + public WalPositionLocator(boolean isLsnTypeHybridTime) { this.lastCommitStoredLsn = null; this.lastEventStoredLsn = null; this.lastProcessedMessageType = null; + this.isLsnTypeHybridTime = isLsnTypeHybridTime; LOGGER.info("WAL position will not be searched"); } @@ -103,7 +111,7 @@ public Optional resumeFromLsn(Lsn currentLsn, ReplicationMessage message) { return Optional.of(startStreamingLsn); } - if (currentLsn.equals(lastCommitStoredLsn)) { + if (currentLsn.equals(lastCommitStoredLsn) && isLsnTypeHybridTime) { LOGGER.info("Returning lastCommitStoredLsn {} for resuming", lastCommitStoredLsn); return Optional.of(lastCommitStoredLsn); } diff --git a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java index 0a743c99fd0..bc311806f99 100644 --- a/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java +++ b/debezium-connector-postgres/src/test/java/io/debezium/connector/postgresql/PostgresConnectorIT.java @@ -1010,6 +1010,7 @@ public void shouldProduceEventsWhenSnapshotsAreNeverAllowed() throws Interrupted Configuration config = TestHelper.defaultConfig() .with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.NEVER.getValue()) .with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.TRUE) + .with(PostgresConnectorConfig.SLOT_LSN_TYPE, "HYBRID_TIME") .build(); start(YugabyteDBConnector.class, config); assertConnectorIsRunning(); From 70473c0445e3cde1bd7b8edfe3e4d42f893223ef Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Mon, 13 Jan 2025 10:43:30 +0530 Subject: [PATCH 3/4] addressed review comments --- .../PostgresStreamingChangeEventSource.java | 31 +++++++++---------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index ccdf038c8e2..b0f7a752755 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -153,7 +153,16 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio } if (hasStartLsnStoredInContext) { - if (connectorConfig.slotLsnType().isHybridTime()) { + if (connectorConfig.slotLsnType().isSequence()) { + // This is the SEQUENCE LSN type + // start streaming from the last recorded position in the offset + final Lsn lsn = this.effectiveOffset.lastCompletelyProcessedLsn() != null ? this.effectiveOffset.lastCompletelyProcessedLsn() + : this.effectiveOffset.lsn(); + final Operation lastProcessedMessageType = this.effectiveOffset.lastProcessedMessageType(); + LOGGER.info("Retrieved latest position from stored offset '{}'", lsn); + walPosition = new WalPositionLocator(this.effectiveOffset.lastCommitLsn(), lsn, lastProcessedMessageType, false /* isLsnTypeHybridTime */); + replicationStream.compareAndSet(null, replicationConnection.startStreaming(lsn, walPosition)); + } else { LOGGER.info("LSN is stored in context for type HT"); final Lsn lsn = this.effectiveOffset.lastCommitLsn() == null ? lastSentFeedback : this.effectiveOffset.lastCommitLsn(); @@ -165,18 +174,9 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio LOGGER.info("Retrieved last committed LSN from stored offset '{}'", lsn); final Operation lastProcessedMessageType = this.effectiveOffset.lastProcessedMessageType(); - walPosition = new WalPositionLocator(lsn, lsn, lastProcessedMessageType, this.connectorConfig.slotLsnType().isHybridTime()); + walPosition = new WalPositionLocator(lsn, lsn, lastProcessedMessageType, true /* isLsnTypeHybridTime */); replicationStream.compareAndSet(null, replicationConnection.startStreaming(lsn, walPosition)); lastSentFeedback = lsn; - } else { - // This is the SEQUENCE LSN type - // start streaming from the last recorded position in the offset - final Lsn lsn = this.effectiveOffset.lastCompletelyProcessedLsn() != null ? this.effectiveOffset.lastCompletelyProcessedLsn() - : this.effectiveOffset.lsn(); - final Operation lastProcessedMessageType = this.effectiveOffset.lastProcessedMessageType(); - LOGGER.info("Retrieved latest position from stored offset '{}'", lsn); - walPosition = new WalPositionLocator(this.effectiveOffset.lastCommitLsn(), lsn, lastProcessedMessageType, this.connectorConfig.slotLsnType().isHybridTime()); - replicationStream.compareAndSet(null, replicationConnection.startStreaming(lsn, walPosition)); } } else { @@ -221,12 +221,9 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio replicationConnection.getConnectedNodeIp()); } - if (connectorConfig.slotLsnType().isHybridTime()) { - replicationStream.set(replicationConnection.startStreaming(walPosition.getLastCommitStoredLsn(), walPosition)); - } else { - // This is for lsn type SEQUENCE. - replicationStream.set(replicationConnection.startStreaming(walPosition.getLastEventStoredLsn(), walPosition)); - } + Lsn lastStoredLsn = connectorConfig.slotLsnType().isHybridTime() ? walPosition.getLastCommitStoredLsn() : walPosition.getLastEventStoredLsn(); + replicationStream.set(replicationConnection.startStreaming(lastStoredLsn, walPosition)); + stream = this.replicationStream.get(); stream.startKeepAlive(Threads.newSingleThreadExecutor(YugabyteDBConnector.class, connectorConfig.getLogicalName(), KEEP_ALIVE_THREAD_NAME)); } From 4bf6304a66473b738c2d7e52b31d8f7276aa3803 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Thu, 16 Jan 2025 10:02:20 +0530 Subject: [PATCH 4/4] added comments to explain --- .../postgresql/PostgresStreamingChangeEventSource.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index b0f7a752755..caef497c43a 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -163,7 +163,10 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio walPosition = new WalPositionLocator(this.effectiveOffset.lastCommitLsn(), lsn, lastProcessedMessageType, false /* isLsnTypeHybridTime */); replicationStream.compareAndSet(null, replicationConnection.startStreaming(lsn, walPosition)); } else { - LOGGER.info("LSN is stored in context for type HT"); + // We are in the block for HYBRID_TIME and last commit can be null for cases where + // we have just started/restarted the connector, in that case, we simply sent the + // initial value of lastSentFeedback and let the server handle the time we + // should get the changes from. final Lsn lsn = this.effectiveOffset.lastCommitLsn() == null ? lastSentFeedback : this.effectiveOffset.lastCommitLsn(); @@ -416,6 +419,11 @@ private void searchWalPosition(ChangeEventSourceContext context, PostgresPartiti boolean receivedMessage = stream.readPending(message -> { final Lsn lsn; if (connectorConfig.slotLsnType().isHybridTime()) { + // Last commit can be null for cases where + // we have just started/restarted the connector, in that case, we simply sent the + // initial value of lastSentFeedback and let the server handle the time we + // should get the changes from. + lsn = walPosition.getLastCommitStoredLsn() != null ? walPosition.getLastCommitStoredLsn() : lastSentFeedback; } else { lsn = stream.lastReceivedLsn();