Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use commit time for lsn #158

Open
wants to merge 7 commits into
base: ybdb-debezium-2.5.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.kafka.connect.errors.ConnectException;
Expand Down Expand Up @@ -80,8 +82,11 @@ public class PostgresStreamingChangeEventSource implements StreamingChangeEventS
*/
private long numberOfEventsSinceLastEventSentOrWalGrowingWarning = 0;
private Lsn lastCompletelyProcessedLsn;
private Lsn lastSentFeedback = Lsn.valueOf(1L);
private PostgresOffsetContext effectiveOffset;

protected ConcurrentLinkedQueue<Lsn> commitTimes;

/**
* For DEBUGGING
*/
Expand All @@ -101,7 +106,7 @@ public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfi
this.snapshotter = snapshotter;
this.replicationConnection = (PostgresReplicationConnection) replicationConnection;
this.connectionProbeTimer = ElapsedTimeStrategy.constant(Clock.system(), connectorConfig.statusUpdateInterval());

this.commitTimes = new ConcurrentLinkedQueue<>();
}

@Override
Expand Down Expand Up @@ -308,7 +313,7 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff
// Don't skip on BEGIN message as it would flush LSN for the whole transaction
// too early
if (message.getOperation() == Operation.COMMIT) {
commitMessage(partition, offsetContext, lsn);
commitMessage(partition, offsetContext, lsn, message);
}
return;
}
Expand All @@ -321,7 +326,7 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff
dispatcher.dispatchTransactionStartedEvent(partition, toString(message.getTransactionId()), offsetContext, message.getCommitTime());
}
else if (message.getOperation() == Operation.COMMIT) {
commitMessage(partition, offsetContext, lsn);
commitMessage(partition, offsetContext, lsn, message);
dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext, message.getCommitTime());
}
maybeWarnAboutGrowingWalBacklog(true);
Expand All @@ -333,7 +338,7 @@ else if (message.getOperation() == Operation.MESSAGE) {

// non-transactional message that will not be followed by a COMMIT message
if (message.isLastEventForLsn()) {
commitMessage(partition, offsetContext, lsn);
commitMessage(partition, offsetContext, lsn, message);
}

dispatcher.dispatchLogicalDecodingMessage(
Expand All @@ -346,6 +351,8 @@ else if (message.getOperation() == Operation.MESSAGE) {
}
// DML event
else {
LOGGER.trace("Processing DML event with lsn {} and lastCompletelyProcessedLsn {}", lsn, lastCompletelyProcessedLsn);

TableId tableId = null;
if (message.getOperation() != Operation.NOOP) {
tableId = PostgresSchema.parse(message.getTable());
Expand Down Expand Up @@ -412,9 +419,15 @@ private void probeConnectionIfNeeded() throws SQLException {
}
}

private void commitMessage(PostgresPartition partition, PostgresOffsetContext offsetContext, final Lsn lsn) throws SQLException, InterruptedException {
private void commitMessage(PostgresPartition partition, PostgresOffsetContext offsetContext, final Lsn lsn, ReplicationMessage message) throws SQLException, InterruptedException {
lastCompletelyProcessedLsn = lsn;
offsetContext.updateCommitPosition(lsn, lastCompletelyProcessedLsn);

if (message.getOperation() == Operation.COMMIT) {
LOGGER.info("Adding '{}' as lsn to the commit times queue", Lsn.valueOf(lsn.asLong() - 1));
commitTimes.add(Lsn.valueOf(lsn.asLong() - 1));
}

maybeWarnAboutGrowingWalBacklog(false);
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
Expand Down Expand Up @@ -463,7 +476,7 @@ public void commitOffset(Map<String, ?> partition, Map<String, ?> offset) {
final Lsn changeLsn = Lsn.valueOf((Long) offset.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY));
final Lsn lsn = (commitLsn != null) ? commitLsn : changeLsn;

LOGGER.debug("Received offset commit request on commit LSN '{}' and change LSN '{}'", commitLsn, changeLsn);
LOGGER.info("Received offset commit request on commit LSN '{}' and change LSN '{}'", commitLsn, changeLsn);
if (replicationStream != null && lsn != null) {
if (!lsnFlushingAllowed) {
LOGGER.info("Received offset commit request on '{}', but ignoring it. LSN flushing is not allowed yet", lsn);
Expand All @@ -473,8 +486,15 @@ public void commitOffset(Map<String, ?> partition, Map<String, ?> offset) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Flushing LSN to server: {}", lsn);
}

Lsn finalLsn = getLsnToBeFlushed(lsn);
LOGGER.info("Flushing lsn '{}'", finalLsn);

// tell the server the point up to which we've processed data, so it can be free to recycle WAL segments
replicationStream.flushLsn(lsn);
replicationStream.flushLsn(finalLsn);
lastSentFeedback = finalLsn;

cleanCommitTimeQueue(finalLsn);
}
else {
LOGGER.debug("Streaming has already stopped, ignoring commit callback...");
Expand All @@ -485,6 +505,35 @@ public void commitOffset(Map<String, ?> partition, Map<String, ?> offset) {
}
}

protected Lsn getLsnToBeFlushed(Lsn lsn) {
if (commitTimes == null || commitTimes.isEmpty()) {
// This means that the queue has not been initialised and the task is still starting.
return lastSentFeedback;
}

Lsn result = lastSentFeedback;

LOGGER.info("Queue at this time: {}", commitTimes);

for (Lsn commitLsn : commitTimes) {
if (commitLsn.compareTo(lsn) < 0) {
LOGGER.debug("Assigning result as {}", commitLsn);
result = commitLsn;
} else {
// This will be the loop exit when we encounter any bigger element.
break;
}
}

return result;
}

protected void cleanCommitTimeQueue(Lsn lsn) {
if (commitTimes != null) {
commitTimes.removeIf(ele -> ele.compareTo(lsn) < 1);
}
}

@Override
public PostgresOffsetContext getOffsetContext() {
return effectiveOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public boolean isValid() {

@Override
public String toString() {
return "LSN{" + asString() + '}';
return "LSN{" + asLong() + '}';
}

@Override
Expand Down