Skip to content

Commit 3e3cfab

Browse files
authored
[DBZ-PGYB] Bug fix for inability to alter publication upon transitioning to streaming (#174)
## Problem When #172 got merged, we added the logic to begin a read only transaction so that we can use `USE_SNAPSHOT` and capture the snapshot from the same. However, upon transition to streaming, when `PostgresReplicationConnection#startStreaming` during streaming phase, it internally calls `PostgresReplicationConnection#initConnection` and then ``PostgresReplicationConnection#initPublication` which tries to execute an `ALTER` command on the publication. The `ALTER` command fails owing to the fact that we're still in a read-only transaction opened at the time of slot creation. The exception is similar to: ``` Caused by: org.apache.kafka.connect.errors.ConnectException: Unable to update filtered publication dbz_publication for "public"."test" at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createOrUpdatePublicationModeFilterted(PostgresReplicationConnection.java:232) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initPublication(PostgresReplicationConnection.java:196) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.initConnection(PostgresReplicationConnection.java:496) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.startStreaming(PostgresReplicationConnection.java:396) at io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(PostgresStreamingChangeEventSource.java:182) ... 9 more Caused by: com.yugabyte.util.PSQLException: ERROR: cannot execute ALTER PUBLICATION in a read-only transaction at com.yugabyte.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2675) at com.yugabyte.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2365) at com.yugabyte.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:355) at com.yugabyte.jdbc.PgStatement.executeInternal(PgStatement.java:490) at com.yugabyte.jdbc.PgStatement.execute(PgStatement.java:408) at com.yugabyte.jdbc.PgStatement.executeWithFlags(PgStatement.java:329) at com.yugabyte.jdbc.PgStatement.executeCachedSql(PgStatement.java:315) at com.yugabyte.jdbc.PgStatement.executeWithFlags(PgStatement.java:291) at com.yugabyte.jdbc.PgStatement.execute(PgStatement.java:286) at io.debezium.connector.postgresql.connection.PostgresReplicationConnection.createOrUpdatePublicationModeFilterted(PostgresReplicationConnection.java:229) ``` ## Solution As a fix, we will now commit at the time of snapshot completion and rollback at the time of snapshot abortion so that we ensure that we are not going into the streaming phase with any open transaction. Another added guardrail in this PR is that we only open the read-only transaction when `streaming.mode` is set to `parallel`.
1 parent 1ac9152 commit 3e3cfab

File tree

2 files changed

+23
-2
lines changed

2 files changed

+23
-2
lines changed

debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java

+15
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.util.Set;
1515
import java.util.stream.Collectors;
1616

17+
import io.debezium.DebeziumException;
1718
import io.debezium.pipeline.spi.ChangeRecordEmitter;
1819
import org.slf4j.Logger;
1920
import org.slf4j.LoggerFactory;
@@ -260,11 +261,25 @@ protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext<Postgr
260261

261262
@Override
262263
protected void completed(SnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext) {
264+
try {
265+
jdbcConnection.commit();
266+
} catch (SQLException sqle) {
267+
LOGGER.error("Exception while committing prior to reporting snapshot completion {}", sqle);
268+
throw new DebeziumException(sqle);
269+
}
270+
263271
snapshotter.snapshotCompleted();
264272
}
265273

266274
@Override
267275
protected void aborted(SnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext) {
276+
try {
277+
jdbcConnection.rollback();
278+
} catch (SQLException sqle) {
279+
LOGGER.error("Exception while rolling back prior to reporting snapshot abortion {}", sqle);
280+
throw new DebeziumException(sqle);
281+
}
282+
268283
snapshotter.snapshotAborted();
269284
}
270285

debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -533,8 +533,14 @@ public Optional<SlotCreationResult> createReplicationSlot() throws SQLException
533533
plugin.getPostgresPluginName(),
534534
lsnType.getLsnTypeName(),
535535
streamingMode.isParallel() ? "USE_SNAPSHOT" : "");
536-
LOGGER.info("executing: BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY");
537-
stmt.execute("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY");
536+
537+
// Begin a read-only transaction when it is the parallel streaming mode because
538+
// we will be using this read-only transaction to take the snapshot further.
539+
if (connectorConfig.streamingMode().isParallel() ) {
540+
LOGGER.info("executing: BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY");
541+
stmt.execute("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ READ ONLY");
542+
}
543+
538544
LOGGER.info("Creating replication slot with command {}", createCommand);
539545
stmt.execute(createCommand);
540546
// when we are in Postgres 9.4+, we can parse the slot creation info,

0 commit comments

Comments
 (0)