Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DBZ-PGYB][yugabyte/yuyabyte-db#24204] Changes to support LSN types with replication slot #162

Open
wants to merge 6 commits into
base: ybdb-debezium-2.5.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,62 @@ public static SecureConnectionMode parse(String value, String defaultValue) {
}
}

public enum LsnType implements EnumeratedValue {
SEQUENCE("SEQUENCE") {
@Override
public String getLsnTypeName() {
return getValue();
}

@Override
public boolean isSequence() {
return true;
}

@Override
public boolean isHybridTime() {
return false;
}
},
HYBRID_TIME("HYBRID_TIME") {
@Override
public String getLsnTypeName() {
return getValue();
}

@Override
public boolean isSequence() {
return false;
}

@Override
public boolean isHybridTime() {
return true;
}
};

private final String lsnTypeName;

LsnType(String lsnTypeName) {
this.lsnTypeName = lsnTypeName;
}

public static LsnType parse(String s) {
return valueOf(s.trim().toUpperCase());
}

@Override
public String getValue() {
return lsnTypeName;
}

public abstract boolean isSequence();

public abstract boolean isHybridTime();

public abstract String getLsnTypeName();
}

public enum LogicalDecoder implements EnumeratedValue {
PGOUTPUT("pgoutput") {
@Override
Expand Down Expand Up @@ -572,6 +628,14 @@ public static SchemaRefreshMode parse(String value) {
+ "'. " +
"Defaults to '" + LogicalDecoder.YBOUTPUT.getValue() + "'.");

public static final Field SLOT_LSN_TYPE = Field.create("slot.lsn.type")
.withDisplayName("Slot LSN type")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withEnum(LsnType.class, LsnType.SEQUENCE)
.withDescription("LSN type being used with the replication slot");

public static final Field SLOT_NAME = Field.create("slot.name")
.withDisplayName("Slot")
.withType(Type.STRING)
Expand Down Expand Up @@ -1032,6 +1096,10 @@ protected String slotName() {
return getConfig().getString(SLOT_NAME);
}

public LsnType slotLsnType() {
return LsnType.parse(getConfig().getString(SLOT_LSN_TYPE));
}

protected boolean dropSlotOnStop() {
if (getConfig().hasKey(DROP_SLOT_ON_STOP.name())) {
return getConfig().getBoolean(DROP_SLOT_ON_STOP);
Expand Down Expand Up @@ -1154,6 +1222,7 @@ protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStruc
DATABASE_NAME,
PLUGIN_NAME,
SLOT_NAME,
SLOT_LSN_TYPE,
PUBLICATION_NAME,
PUBLICATION_AUTOCREATE_MODE,
REPLICA_IDENTITY_AUTOSET_VALUES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.OptionalLong;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.kafka.connect.errors.ConnectException;
Expand Down Expand Up @@ -80,12 +81,16 @@ public class PostgresStreamingChangeEventSource implements StreamingChangeEventS
*/
private long numberOfEventsSinceLastEventSentOrWalGrowingWarning = 0;
private Lsn lastCompletelyProcessedLsn;
private Lsn lastSentFeedback = Lsn.valueOf(2L);
private PostgresOffsetContext effectiveOffset;

protected ConcurrentLinkedQueue<Lsn> commitTimes;

/**
* For DEBUGGING
*/
private OptionalLong lastTxnidForWhichCommitSeen = OptionalLong.empty();
private long recordCount = 0;

public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfig, Snapshotter snapshotter,
PostgresConnection connection, PostgresEventDispatcher<TableId> dispatcher, ErrorHandler errorHandler, Clock clock,
Expand All @@ -101,7 +106,7 @@ public PostgresStreamingChangeEventSource(PostgresConnectorConfig connectorConfi
this.snapshotter = snapshotter;
this.replicationConnection = (PostgresReplicationConnection) replicationConnection;
this.connectionProbeTimer = ElapsedTimeStrategy.constant(Clock.system(), connectorConfig.statusUpdateInterval());

this.commitTimes = new ConcurrentLinkedQueue<>();
}

@Override
Expand Down Expand Up @@ -148,17 +153,35 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio
}

if (hasStartLsnStoredInContext) {
// start streaming from the last recorded position in the offset
final Lsn lsn = this.effectiveOffset.lastCompletelyProcessedLsn() != null ? this.effectiveOffset.lastCompletelyProcessedLsn()
: this.effectiveOffset.lsn();
final Operation lastProcessedMessageType = this.effectiveOffset.lastProcessedMessageType();
LOGGER.info("Retrieved latest position from stored offset '{}'", lsn);
walPosition = new WalPositionLocator(this.effectiveOffset.lastCommitLsn(), lsn, lastProcessedMessageType);
replicationStream.compareAndSet(null, replicationConnection.startStreaming(lsn, walPosition));
if (connectorConfig.slotLsnType().isHybridTime()) {
vaibhav-yb marked this conversation as resolved.
Show resolved Hide resolved
LOGGER.info("LSN is stored in context for type HT");
final Lsn lsn = this.effectiveOffset.lastCommitLsn() == null ?
vaibhav-yb marked this conversation as resolved.
Show resolved Hide resolved
lastSentFeedback : this.effectiveOffset.lastCommitLsn();

if (this.effectiveOffset.lastCommitLsn() == null) {
LOGGER.info("Last commit stored in offset is null");
}

LOGGER.info("Retrieved last committed LSN from stored offset '{}'", lsn);

final Operation lastProcessedMessageType = this.effectiveOffset.lastProcessedMessageType();
walPosition = new WalPositionLocator(lsn, lsn, lastProcessedMessageType, this.connectorConfig.slotLsnType().isHybridTime());
vaibhav-yb marked this conversation as resolved.
Show resolved Hide resolved
replicationStream.compareAndSet(null, replicationConnection.startStreaming(lsn, walPosition));
lastSentFeedback = lsn;
} else {
// This is the SEQUENCE LSN type
// start streaming from the last recorded position in the offset
final Lsn lsn = this.effectiveOffset.lastCompletelyProcessedLsn() != null ? this.effectiveOffset.lastCompletelyProcessedLsn()
: this.effectiveOffset.lsn();
final Operation lastProcessedMessageType = this.effectiveOffset.lastProcessedMessageType();
LOGGER.info("Retrieved latest position from stored offset '{}'", lsn);
walPosition = new WalPositionLocator(this.effectiveOffset.lastCommitLsn(), lsn, lastProcessedMessageType, this.connectorConfig.slotLsnType().isHybridTime());
vaibhav-yb marked this conversation as resolved.
Show resolved Hide resolved
replicationStream.compareAndSet(null, replicationConnection.startStreaming(lsn, walPosition));
}
}
else {
LOGGER.info("No previous LSN found in Kafka, streaming from the latest xlogpos or flushed LSN...");
walPosition = new WalPositionLocator();
walPosition = new WalPositionLocator(this.connectorConfig.slotLsnType().isHybridTime());
replicationStream.compareAndSet(null, replicationConnection.startStreaming(walPosition));
}
// for large dbs, the refresh of schema can take too much time
Expand Down Expand Up @@ -198,7 +221,12 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio
replicationConnection.getConnectedNodeIp());
}

replicationStream.set(replicationConnection.startStreaming(walPosition.getLastEventStoredLsn(), walPosition));
vaibhav-yb marked this conversation as resolved.
Show resolved Hide resolved
if (connectorConfig.slotLsnType().isHybridTime()) {
replicationStream.set(replicationConnection.startStreaming(walPosition.getLastCommitStoredLsn(), walPosition));
} else {
// This is for lsn type SEQUENCE.
replicationStream.set(replicationConnection.startStreaming(walPosition.getLastEventStoredLsn(), walPosition));
}
stream = this.replicationStream.get();
stream.startKeepAlive(Threads.newSingleThreadExecutor(YugabyteDBConnector.class, connectorConfig.getLogicalName(), KEEP_ALIVE_THREAD_NAME));
}
Expand Down Expand Up @@ -292,6 +320,8 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff
LOGGER.debug("Processing BEGIN with end LSN {} and txnid {}", lsn, message.getTransactionId());
} else {
LOGGER.debug("Processing COMMIT with end LSN {} and txnid {}", lsn, message.getTransactionId());
LOGGER.debug("Record count in the txn {} is {} with commit time {}", message.getTransactionId(), recordCount, lsn.asLong() - 1);
recordCount = 0;
}

OptionalLong currentTxnid = message.getTransactionId();
Expand All @@ -308,7 +338,7 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff
// Don't skip on BEGIN message as it would flush LSN for the whole transaction
// too early
if (message.getOperation() == Operation.COMMIT) {
commitMessage(partition, offsetContext, lsn);
commitMessage(partition, offsetContext, lsn, message);
}
return;
}
Expand All @@ -321,7 +351,7 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff
dispatcher.dispatchTransactionStartedEvent(partition, toString(message.getTransactionId()), offsetContext, message.getCommitTime());
}
else if (message.getOperation() == Operation.COMMIT) {
commitMessage(partition, offsetContext, lsn);
commitMessage(partition, offsetContext, lsn, message);
dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext, message.getCommitTime());
}
maybeWarnAboutGrowingWalBacklog(true);
Expand All @@ -333,7 +363,7 @@ else if (message.getOperation() == Operation.MESSAGE) {

// non-transactional message that will not be followed by a COMMIT message
if (message.isLastEventForLsn()) {
commitMessage(partition, offsetContext, lsn);
commitMessage(partition, offsetContext, lsn, message);
}

dispatcher.dispatchLogicalDecodingMessage(
Expand All @@ -346,6 +376,9 @@ else if (message.getOperation() == Operation.MESSAGE) {
}
// DML event
else {
LOGGER.trace("Processing DML event with lsn {} and lastCompletelyProcessedLsn {}", lsn, lastCompletelyProcessedLsn);
++recordCount;

TableId tableId = null;
if (message.getOperation() != Operation.NOOP) {
tableId = PostgresSchema.parse(message.getTable());
Expand Down Expand Up @@ -384,8 +417,17 @@ private void searchWalPosition(ChangeEventSourceContext context, PostgresPartiti
while (context.isRunning() && resumeLsn.get() == null) {

boolean receivedMessage = stream.readPending(message -> {
final Lsn lsn = stream.lastReceivedLsn();
final Lsn lsn;
if (connectorConfig.slotLsnType().isHybridTime()) {
lsn = walPosition.getLastCommitStoredLsn() != null ? walPosition.getLastCommitStoredLsn() : lastSentFeedback;
vaibhav-yb marked this conversation as resolved.
Show resolved Hide resolved
} else {
lsn = stream.lastReceivedLsn();
}
resumeLsn.set(walPosition.resumeFromLsn(lsn, message).orElse(null));

if (resumeLsn.get() == null) {
LOGGER.info("Resume LSN is null");
}
});

if (receivedMessage) {
Expand All @@ -412,9 +454,17 @@ private void probeConnectionIfNeeded() throws SQLException {
}
}

private void commitMessage(PostgresPartition partition, PostgresOffsetContext offsetContext, final Lsn lsn) throws SQLException, InterruptedException {
private void commitMessage(PostgresPartition partition, PostgresOffsetContext offsetContext, final Lsn lsn, ReplicationMessage message) throws SQLException, InterruptedException {
lastCompletelyProcessedLsn = lsn;
offsetContext.updateCommitPosition(lsn, lastCompletelyProcessedLsn);

if (this.connectorConfig.slotLsnType().isHybridTime()) {
if (message.getOperation() == Operation.COMMIT) {
LOGGER.info("Adding '{}' as lsn to the commit times queue", Lsn.valueOf(lsn.asLong() - 1));
commitTimes.add(Lsn.valueOf(lsn.asLong() - 1));
}
}

maybeWarnAboutGrowingWalBacklog(false);
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
Expand Down Expand Up @@ -463,18 +513,30 @@ public void commitOffset(Map<String, ?> partition, Map<String, ?> offset) {
final Lsn changeLsn = Lsn.valueOf((Long) offset.get(PostgresOffsetContext.LAST_COMPLETELY_PROCESSED_LSN_KEY));
final Lsn lsn = (commitLsn != null) ? commitLsn : changeLsn;

LOGGER.debug("Received offset commit request on commit LSN '{}' and change LSN '{}'", commitLsn, changeLsn);
LOGGER.info("Received offset commit request on commit LSN '{}' and change LSN '{}'", commitLsn, changeLsn);
if (replicationStream != null && lsn != null) {
if (!lsnFlushingAllowed) {
LOGGER.info("Received offset commit request on '{}', but ignoring it. LSN flushing is not allowed yet", lsn);
return;
}

Lsn finalLsn;
if (this.connectorConfig.slotLsnType().isHybridTime()) {
finalLsn = getLsnToBeFlushed(lsn);
} else {
finalLsn = lsn;
}

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Flushing LSN to server: {}", lsn);
LOGGER.info("Flushing LSN to server: {}", finalLsn);
}
// tell the server the point up to which we've processed data, so it can be free to recycle WAL segments
replicationStream.flushLsn(lsn);
replicationStream.flushLsn(finalLsn);

if (this.connectorConfig.slotLsnType().isHybridTime()) {
lastSentFeedback = finalLsn;
cleanCommitTimeQueue(finalLsn);
}
}
else {
LOGGER.debug("Streaming has already stopped, ignoring commit callback...");
Expand All @@ -485,6 +547,35 @@ public void commitOffset(Map<String, ?> partition, Map<String, ?> offset) {
}
}

protected Lsn getLsnToBeFlushed(Lsn lsn) {
if (commitTimes == null || commitTimes.isEmpty()) {
// This means that the queue has not been initialised and the task is still starting.
return lastSentFeedback;
}

Lsn result = lastSentFeedback;

LOGGER.info("Queue at this time: {}", commitTimes);

for (Lsn commitLsn : commitTimes) {
if (commitLsn.compareTo(lsn) < 0) {
LOGGER.debug("Assigning result as {}", commitLsn);
result = commitLsn;
} else {
// This will be the loop exit when we encounter any bigger element.
break;
}
}

return result;
}

protected void cleanCommitTimeQueue(Lsn lsn) {
if (commitTimes != null) {
commitTimes.removeIf(ele -> ele.compareTo(lsn) < 1);
}
}

@Override
public PostgresOffsetContext getOffsetContext() {
return effectiveOffset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public boolean isValid() {

@Override
public String toString() {
return "LSN{" + asString() + '}';
return "LSN{" + asLong() + '}';
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class PostgresReplicationConnection extends JdbcConnection implements Rep
private static Logger LOGGER = LoggerFactory.getLogger(PostgresReplicationConnection.class);

private final String slotName;
private final PostgresConnectorConfig.LsnType lsnType;
private final String publicationName;
private final RelationalTableFilters tableFilter;
private final PostgresConnectorConfig.AutoCreateMode publicationAutocreateMode;
Expand Down Expand Up @@ -114,6 +115,7 @@ private PostgresReplicationConnection(PostgresConnectorConfig config,

this.connectorConfig = config;
this.slotName = slotName;
this.lsnType = config.slotLsnType();
this.publicationName = publicationName;
this.tableFilter = tableFilter;
this.publicationAutocreateMode = publicationAutocreateMode;
Expand Down Expand Up @@ -395,8 +397,8 @@ public ReplicationStream startStreaming(Lsn offset, WalPositionLocator walPositi
offset = defaultStartingPos;
}
Lsn lsn = offset;
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("starting streaming from LSN '{}'", lsn);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("starting streaming from LSN '{}'", lsn);
}

final int maxRetries = connectorConfig.maxRetries();
Expand Down Expand Up @@ -522,10 +524,11 @@ public Optional<SlotCreationResult> createReplicationSlot() throws SQLException

try (Statement stmt = pgConnection().createStatement()) {
String createCommand = String.format(
"CREATE_REPLICATION_SLOT \"%s\" %s LOGICAL %s",
"CREATE_REPLICATION_SLOT \"%s\" %s LOGICAL %s %s",
slotName,
tempPart,
plugin.getPostgresPluginName());
plugin.getPostgresPluginName(),
lsnType.getLsnTypeName());
LOGGER.info("Creating replication slot with command {}", createCommand);
stmt.execute(createCommand);
// when we are in Postgres 9.4+, we can parse the slot creation info,
Expand Down
Loading