diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java index 8d2fcaa1b62..fedd1f82bc7 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java @@ -9,6 +9,8 @@ import java.util.Map; import java.util.Objects; import java.util.OptionalLong; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.connect.errors.ConnectException; @@ -80,12 +82,16 @@ public class PostgresStreamingChangeEventSource implements StreamingChangeEventS */ private long numberOfEventsSinceLastEventSentOrWalGrowingWarning = 0; private Lsn lastCompletelyProcessedLsn; + private Lsn lastSentFeedback = Lsn.valueOf(2L); private PostgresOffsetContext effectiveOffset; + protected ConcurrentLinkedQueue commitTimes; + /** * For DEBUGGING */ private OptionalLong lastTxnidForWhichCommitSeen = OptionalLong.empty(); + private long recordCount = 0; public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfig, Snapshotter snapshotter, PostgresConnection connection, PostgresEventDispatcher dispatcher, ErrorHandler errorHandler, Clock clock, @@ -101,7 +107,7 @@ public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfi this.snapshotter = snapshotter; this.replicationConnection = (PostgresReplicationConnection) replicationConnection; this.connectionProbeTimer = ElapsedTimeStrategy.constant(Clock.system(), connectorConfig.statusUpdateInterval()); - + this.commitTimes = new ConcurrentLinkedQueue<>(); } @Override @@ -147,14 +153,30 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio replicationConnection.getConnectedNodeIp()); } + // B, 1, 2 - lsn, 3, 4, C, B, 5, 6, 7, 8 + if (hasStartLsnStoredInContext) { // start streaming from the last recorded position in the offset - final Lsn lsn = this.effectiveOffset.lastCompletelyProcessedLsn() != null ? this.effectiveOffset.lastCompletelyProcessedLsn() - : this.effectiveOffset.lsn(); +// final Lsn lsn = this.effectiveOffset.lastCompletelyProcessedLsn() != null ? this.effectiveOffset.lastCompletelyProcessedLsn() +// : this.effectiveOffset.lsn(); + // we will be streaming from the last commit lsn since we are sure that we have + // received that transaction completely. + // if lastCommitLsn is null, that means we are only in the beginning of streaming. + LOGGER.info("LSN is stored in context"); + final Lsn lsn = this.effectiveOffset.lastCommitLsn() == null ? + lastSentFeedback : this.effectiveOffset.lastCommitLsn(); + + if (this.effectiveOffset.lastCommitLsn() == null) { + LOGGER.info("Last commit stored in offset is null"); + } + + LOGGER.info("Retrieved last committed LSN from stored offset '{}'", lsn); + final Operation lastProcessedMessageType = this.effectiveOffset.lastProcessedMessageType(); - LOGGER.info("Retrieved latest position from stored offset '{}'", lsn); - walPosition = new WalPositionLocator(this.effectiveOffset.lastCommitLsn(), lsn, lastProcessedMessageType); +// LOGGER.info("Retrieved latest position from stored offset '{}'", lsn); + walPosition = new WalPositionLocator(lsn, lsn, lastProcessedMessageType); replicationStream.compareAndSet(null, replicationConnection.startStreaming(lsn, walPosition)); + lastSentFeedback = lsn; } else { LOGGER.info("No previous LSN found in Kafka, streaming from the latest xlogpos or flushed LSN..."); @@ -198,7 +220,7 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio replicationConnection.getConnectedNodeIp()); } - replicationStream.set(replicationConnection.startStreaming(walPosition.getLastEventStoredLsn(), walPosition)); + replicationStream.set(replicationConnection.startStreaming(walPosition.getLastCommitStoredLsn(), walPosition)); stream = this.replicationStream.get(); stream.startKeepAlive(Threads.newSingleThreadExecutor(YugabyteDBConnector.class, connectorConfig.getLogicalName(), KEEP_ALIVE_THREAD_NAME)); } @@ -292,6 +314,8 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff LOGGER.debug("Processing BEGIN with end LSN {} and txnid {}", lsn, message.getTransactionId()); } else { LOGGER.debug("Processing COMMIT with end LSN {} and txnid {}", lsn, message.getTransactionId()); + LOGGER.debug("Record count in the txn {} is {} with commit time {}", message.getTransactionId(), recordCount, lsn.asLong() - 1); + recordCount = 0; } OptionalLong currentTxnid = message.getTransactionId(); @@ -308,7 +332,7 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff // Don't skip on BEGIN message as it would flush LSN for the whole transaction // too early if (message.getOperation() == Operation.COMMIT) { - commitMessage(partition, offsetContext, lsn); + commitMessage(partition, offsetContext, lsn, message); } return; } @@ -321,7 +345,7 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff dispatcher.dispatchTransactionStartedEvent(partition, toString(message.getTransactionId()), offsetContext, message.getCommitTime()); } else if (message.getOperation() == Operation.COMMIT) { - commitMessage(partition, offsetContext, lsn); + commitMessage(partition, offsetContext, lsn, message); dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext, message.getCommitTime()); } maybeWarnAboutGrowingWalBacklog(true); @@ -333,7 +357,7 @@ else if (message.getOperation() == Operation.MESSAGE) { // non-transactional message that will not be followed by a COMMIT message if (message.isLastEventForLsn()) { - commitMessage(partition, offsetContext, lsn); + commitMessage(partition, offsetContext, lsn, message); } dispatcher.dispatchLogicalDecodingMessage( @@ -346,12 +370,19 @@ else if (message.getOperation() == Operation.MESSAGE) { } // DML event else { + LOGGER.trace("Processing DML event with lsn {} and lastCompletelyProcessedLsn {}", lsn, lastCompletelyProcessedLsn); + ++recordCount; + TableId tableId = null; if (message.getOperation() != Operation.NOOP) { tableId = PostgresSchema.parse(message.getTable()); Objects.requireNonNull(tableId); } + /* + tx1: BEGIN2 - DML1 - COMMIT2 (updates lastCommitLsn) + tx2: BEGIN2 - DML2 (update lastCompletelyProcessedLsn) - COMMIT2 (updates lastCommitLsn) + */ offsetContext.updateWalPosition(lsn, lastCompletelyProcessedLsn, message.getCommitTime(), toLong(message.getTransactionId()), taskContext.getSlotXmin(connection), tableId, @@ -384,8 +415,15 @@ private void searchWalPosition(ChangeEventSourceContext context, PostgresPartiti while (context.isRunning() && resumeLsn.get() == null) { boolean receivedMessage = stream.readPending(message -> { - final Lsn lsn = stream.lastReceivedLsn(); + // YB Note: We do not need this, we need to start from the last commit lsn from the + // walPosition +// final Lsn lsn = stream.lastReceivedLsn(); + final Lsn lsn = walPosition.getLastCommitStoredLsn() != null ? walPosition.getLastCommitStoredLsn() : lastSentFeedback; resumeLsn.set(walPosition.resumeFromLsn(lsn, message).orElse(null)); + + if (resumeLsn.get() == null) { + LOGGER.info("Resume LSN is null"); + } }); if (receivedMessage) { @@ -412,9 +450,17 @@ private void probeConnectionIfNeeded() throws SQLException { } } - private void commitMessage(PostgresPartition partition, PostgresOffsetContext offsetContext, final Lsn lsn) throws SQLException, InterruptedException { + private void commitMessage(PostgresPartition partition, PostgresOffsetContext offsetContext, final Lsn lsn, ReplicationMessage message) throws SQLException, InterruptedException { lastCompletelyProcessedLsn = lsn; + + // todo offsetContext.updateCommitPosition(lsn, lastCompletelyProcessedLsn); + + if (message.getOperation() == Operation.COMMIT) { + LOGGER.info("Adding '{}' as lsn to the commit times queue", Lsn.valueOf(lsn.asLong() - 1)); + commitTimes.add(Lsn.valueOf(lsn.asLong() - 1)); + } + maybeWarnAboutGrowingWalBacklog(false); dispatcher.dispatchHeartbeatEvent(partition, offsetContext); } @@ -463,7 +509,7 @@ public void commitOffset(Map partition, Map offset) { final Lsn changeLsn = Lsn.valueOf((Long) offset.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY)); final Lsn lsn = (commitLsn != null) ? commitLsn : changeLsn; - LOGGER.debug("Received offset commit request on commit LSN '{}' and change LSN '{}'", commitLsn, changeLsn); + LOGGER.info("Received offset commit request on commit LSN '{}' and change LSN '{}'", commitLsn, changeLsn); if (replicationStream != null && lsn != null) { if (!lsnFlushingAllowed) { LOGGER.info("Received offset commit request on '{}', but ignoring it. LSN flushing is not allowed yet", lsn); @@ -473,8 +519,15 @@ public void commitOffset(Map partition, Map offset) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Flushing LSN to server: {}", lsn); } + + Lsn finalLsn = getLsnToBeFlushed(lsn); + LOGGER.info("Flushing lsn '{}' for table", finalLsn); + // tell the server the point up to which we've processed data, so it can be free to recycle WAL segments - replicationStream.flushLsn(lsn); + replicationStream.flushLsn(finalLsn); + lastSentFeedback = finalLsn; + + cleanCommitTimeQueue(finalLsn); } else { LOGGER.debug("Streaming has already stopped, ignoring commit callback..."); @@ -485,6 +538,35 @@ public void commitOffset(Map partition, Map offset) { } } + protected Lsn getLsnToBeFlushed(Lsn lsn) { + if (commitTimes == null || commitTimes.isEmpty()) { + // This means that the queue has not been initialised and the task is still starting. + return lastSentFeedback; + } + + Lsn result = lastSentFeedback; + + LOGGER.info("Queue at this time: {}", commitTimes); + + for (Lsn commitLsn : commitTimes) { + if (commitLsn.compareTo(lsn) < 0) { + LOGGER.debug("Assigning result as {}", commitLsn); + result = commitLsn; + } else { + // This will be the loop exit when we encounter any bigger element. + break; + } + } + + return result; + } + + protected void cleanCommitTimeQueue(Lsn lsn) { + if (commitTimes != null) { + commitTimes.removeIf(ele -> ele.compareTo(lsn) < 1); + } + } + @Override public PostgresOffsetContext getOffsetContext() { return effectiveOffset; diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java index b9f4b7dc8fe..d2cedf2a033 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/Lsn.java @@ -144,7 +144,7 @@ public boolean isValid() { @Override public String toString() { - return "LSN{" + asString() + '}'; + return "LSN{" + asLong() + '}'; } @Override diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java index a95d46c239d..391e1a6b8e3 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java @@ -395,8 +395,8 @@ public ReplicationStream startStreaming(Lsn offset, WalPositionLocator walPositi offset = defaultStartingPos; } Lsn lsn = offset; - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("starting streaming from LSN '{}'", lsn); + if (LOGGER.isInfoEnabled()) { + LOGGER.info("starting streaming from LSN '{}'", lsn); } final int maxRetries = connectorConfig.maxRetries(); diff --git a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/WalPositionLocator.java b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/WalPositionLocator.java index 9d9cb4fa6f7..d60599e9af4 100644 --- a/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/WalPositionLocator.java +++ b/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/WalPositionLocator.java @@ -9,6 +9,7 @@ import java.util.Optional; import java.util.Set; +import io.debezium.connector.postgresql.YugabyteDBServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +43,7 @@ public class WalPositionLocator { public WalPositionLocator(Lsn lastCommitStoredLsn, Lsn lastEventStoredLsn, Operation lastProcessedMessageType) { this.lastCommitStoredLsn = lastCommitStoredLsn; + // YB Note: lastEventStoredLsn and lastCommitStoredLsn will be the same. this.lastEventStoredLsn = lastEventStoredLsn; this.lastProcessedMessageType = lastProcessedMessageType; @@ -82,23 +84,33 @@ public Optional resumeFromLsn(Lsn currentLsn, ReplicationMessage message) { startStreamingLsn = txStartLsn; return Optional.of(startStreamingLsn); } + LOGGER.info("Returning optional empty as resume LSN"); return Optional.empty(); } lsnAfterLastEventStoredLsn = currentLsn; + + // YB Note: we do not want this to be turned true ever. storeLsnAfterLastEventStoredLsn = false; LOGGER.info("LSN after last stored change LSN '{}' received", lsnAfterLastEventStoredLsn); startStreamingLsn = lsnAfterLastEventStoredLsn; return Optional.of(startStreamingLsn); } if (currentLsn.equals(lastEventStoredLsn)) { + LOGGER.info("Current LSN is equal to the last event stored LSN {}", lastEventStoredLsn); storeLsnAfterLastEventStoredLsn = true; } if (lastCommitStoredLsn == null) { startStreamingLsn = firstLsnReceived; + LOGGER.info("Last commit stored LSN is null, returning firstLsnReceived {}", startStreamingLsn); return Optional.of(startStreamingLsn); } + if (currentLsn.equals(lastCommitStoredLsn)) { + LOGGER.info("Returning lastCommitStoredLsn {} for resuming", lastCommitStoredLsn); + return Optional.of(lastCommitStoredLsn); + } + switch (message.getOperation()) { case BEGIN: txStartLsn = currentLsn; @@ -145,6 +157,11 @@ else if (txStartLsn != null) { * @return true if the message should be skipped, false otherwise */ public boolean skipMessage(Lsn lsn) { + if (YugabyteDBServer.isEnabled()) { + // YB Note: We will not be skipping any message. + return false; + } + if (passMessages) { return false; } @@ -160,7 +177,7 @@ public boolean skipMessage(Lsn lsn) { lsn, lsnSeen)); } - LOGGER.debug("Message with LSN '{}' filtered", lsn); + LOGGER.info("Message with LSN '{}' filtered", lsn); return true; }