Skip to content

Commit 060f600

Browse files
authored
[DBZ-PGYB][yugabyte/yugabyte-db#25716] Set GUC to disable catalog version check for connections (#169)
## Problem Recent changes in YugabyteDB introduced some changes where the `relcache/catcache` is reloaded as soon as `yb_read_time` is set. Now that caused an issue with the snapshots in the connector i.e. whenever the connector sets the `yb_read_time` and goes on to take the snapshot by running a `SELECT *` query, it gets the following error: ``` Caused by: io.debezium.DebeziumException: com.yugabyte.util.PSQLException: ERROR: The catalog snapshot used for this transaction has been invalidated: expected: 1, got: 0: MISMATCHED_SCHEMA Where: Catalog Version Mismatch: A DDL occurred while processing this query. Try again. ``` ## Solution As a workaround, we will now bypass the catalog version check by setting a GUC `yb_disable_catalog_version_check` at the connection level so that we do not hit the error during snapshot. Note that we do not require this during streaming phase and since streaming used a separate object of `ReplicationConnection` class, we will not be modifying it. This closes yugabyte/yugabyte-db#25716
1 parent 842d267 commit 060f600

File tree

1 file changed

+27
-0
lines changed

1 file changed

+27
-0
lines changed

debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresSnapshotChangeEventSource.java

+27
Original file line numberDiff line numberDiff line change
@@ -284,8 +284,35 @@ protected Optional<String> getSnapshotSelect(RelationalSnapshotContext<PostgresP
284284
return snapshotter.buildSnapshotQuery(tableId, columns);
285285
}
286286

287+
protected void disableCatalogVersionCheck() throws SQLException {
288+
final String disableCatalogVersionCheckStmt =
289+
"DO " +
290+
"LANGUAGE plpgsql $$ " +
291+
"BEGIN " +
292+
"SET yb_disable_catalog_version_check = true; " +
293+
"EXCEPTION " +
294+
"WHEN sqlstate '42704' THEN "+
295+
"RAISE EXCEPTION 'GUC not found'; " +
296+
"WHEN OTHERS THEN " +
297+
"CALL disable_catalog_version_check(); " +
298+
"END $$;";
299+
300+
LOGGER.info("Disabling catalog version check with statement: {}", disableCatalogVersionCheckStmt);
301+
try {
302+
jdbcConnection.execute(disableCatalogVersionCheckStmt);
303+
} catch (SQLException sqle) {
304+
if (sqle.getMessage().contains("GUC not found")) {
305+
LOGGER.warn("GUC not present: yb_disable_catalog_version_check");
306+
jdbcConnection.execute("ABORT;");
307+
} else {
308+
throw sqle;
309+
}
310+
}
311+
}
287312
protected void setSnapshotTransactionIsolationLevel(boolean isOnDemand) throws SQLException {
288313
if (!YugabyteDBServer.isEnabled() || connectorConfig.isYbConsistentSnapshotEnabled()) {
314+
disableCatalogVersionCheck();
315+
289316
LOGGER.info("Setting isolation level");
290317
String transactionStatement = snapshotter.snapshotTransactionIsolationLevelStatement(slotCreatedInfo, isOnDemand);
291318
LOGGER.info("Opening transaction with statement {}", transactionStatement);

0 commit comments

Comments
 (0)