Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DBZ-PGYB] Fixed checkstyle according to conventions #152

Open
wants to merge 1 commit into
base: ybdb-debezium-2.5.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,14 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
import io.debezium.util.Metronome;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.DebeziumException;
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.common.CdcSourceTaskContext;
import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.connector.postgresql.spi.SlotState;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.pipeline.ChangeEventSourceCoordinator;
Expand All @@ -34,7 +30,11 @@
import io.debezium.pipeline.source.spi.ChangeEventSource.ChangeEventSourceContext;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.spi.Offsets;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.schema.DatabaseSchema;
import io.debezium.util.Clock;
import io.debezium.util.LoggingContext;
import io.debezium.util.Metronome;

/**
* Coordinates one or more {@link ChangeEventSource}s and executes them in order. Extends the base
Expand Down Expand Up @@ -67,7 +67,8 @@ public PostgresChangeEventSourceCoordinator(Offsets<PostgresPartition, PostgresO
}

@Override
protected void executeChangeEventSources(CdcSourceTaskContext taskContext, SnapshotChangeEventSource<PostgresPartition, PostgresOffsetContext> snapshotSource, Offsets<PostgresPartition, PostgresOffsetContext> previousOffsets,
protected void executeChangeEventSources(CdcSourceTaskContext taskContext, SnapshotChangeEventSource<PostgresPartition, PostgresOffsetContext> snapshotSource,
Offsets<PostgresPartition, PostgresOffsetContext> previousOffsets,
AtomicReference<LoggingContext.PreviousContext> previousLogContext, ChangeEventSourceContext context)
throws InterruptedException {
final PostgresPartition partition = previousOffsets.getTheOnlyPartition();
Expand All @@ -81,7 +82,7 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps
LOGGER.debug("Snapshot result {}", snapshotResult);

if (context.isRunning() && snapshotResult.isCompletedOrSkipped()) {
if(YugabyteDBServer.isEnabled() && !isSnapshotSkipped(snapshotResult)) {
if (YugabyteDBServer.isEnabled() && !isSnapshotSkipped(snapshotResult)) {
LOGGER.info("Will wait for snapshot completion before transitioning to streaming");
waitForSnapshotCompletion = true;
while (waitForSnapshotCompletion) {
Expand Down Expand Up @@ -139,7 +140,7 @@ public void commitOffset(Map<String, ?> partition, Map<String, ?> offset) {
if (YugabyteDBServer.isEnabled() && waitForSnapshotCompletion) {
LOGGER.debug("Checking the offset value for snapshot completion");
OffsetState offsetState = new PostgresOffsetContext.Loader((PostgresConnectorConfig) connectorConfig).load(offset).asOffsetState();
if(!offsetState.snapshotInEffect()) {
if (!offsetState.snapshotInEffect()) {
LOGGER.info("Offset conveys that snapshot has completed");
waitForSnapshotCompletion = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public PostgresChangeEventSourceFactory(PostgresConnectorConfig configuration, S
}

@Override
public SnapshotChangeEventSource<PostgresPartition, PostgresOffsetContext> getSnapshotChangeEventSource(SnapshotProgressListener<PostgresPartition> snapshotProgressListener,
public SnapshotChangeEventSource<PostgresPartition, PostgresOffsetContext> getSnapshotChangeEventSource(
SnapshotProgressListener<PostgresPartition> snapshotProgressListener,
NotificationService<PostgresPartition, PostgresOffsetContext> notificationService) {
return new PostgresSnapshotChangeEventSource(
configuration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
import java.util.Set;
import java.util.stream.Collectors;

import io.debezium.connector.postgresql.connection.ReplicaIdentityInfo;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import com.yugabyte.core.BaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.yugabyte.core.BaseConnection;

import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicaIdentityInfo;
import io.debezium.connector.postgresql.connection.ReplicationMessage;
import io.debezium.data.Envelope.Operation;
import io.debezium.function.Predicates;
Expand Down Expand Up @@ -160,7 +160,7 @@ private DataCollectionSchema synchronizeTableSchema(DataCollectionSchema tableSc
}

protected Object[] columnValues(List<ReplicationMessage.Column> columns, TableId tableId, boolean refreshSchemaIfChanged,
boolean sourceOfToasted, boolean oldValues)
boolean sourceOfToasted, boolean oldValues)
throws SQLException {
if (columns == null || columns.isEmpty()) {
return null;
Expand Down Expand Up @@ -202,9 +202,10 @@ protected Object[] columnValues(List<ReplicationMessage.Column> columns, TableId
// the unchanged toasted value, we will not form a value struct for it.
// Ultimately, it will be emitted as a NULL value.
if (!UnchangedToastedReplicationMessageColumn.isUnchangedToastedValue(value)) {
values[position] = new Object[]{value, Boolean.TRUE};
values[position] = new Object[]{ value, Boolean.TRUE };
}
} else {
}
else {
LOGGER.debug("Plugin is NOT yboutput");
values[position] = value;
}
Expand Down Expand Up @@ -236,7 +237,7 @@ protected void emitUpdateRecord(Receiver<PostgresPartition> receiver, TableSchem
*/
if (skipMessagesWithoutChange() && Objects.nonNull(newValue) && newValue.equals(oldValue)) {
LOGGER.debug("No new values found for table '{}' in included columns from update message at '{}'; skipping record", tableSchema,
getOffset().getSourceInfo());
getOffset().getSourceInfo());
return;
}
// some configurations does not provide old values in case of updates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,6 @@
import java.util.Optional;
import java.util.regex.Pattern;

import io.debezium.data.Envelope;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.heartbeat.HeartbeatConnectionProvider;
import io.debezium.heartbeat.HeartbeatErrorHandler;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.spi.topic.TopicNamingStrategy;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
Expand Down Expand Up @@ -47,11 +40,18 @@
import io.debezium.connector.postgresql.snapshot.InitialSnapshotter;
import io.debezium.connector.postgresql.snapshot.NeverSnapshotter;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.data.Envelope;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.heartbeat.HeartbeatConnectionProvider;
import io.debezium.heartbeat.HeartbeatErrorHandler;
import io.debezium.jdbc.JdbcConfiguration;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
import io.debezium.schema.SchemaNameAdjuster;
import io.debezium.spi.topic.TopicNamingStrategy;
import io.debezium.util.Strings;

/**
Expand Down Expand Up @@ -295,11 +295,11 @@ public enum SecureConnectionMode implements EnumeratedValue {
ALLOW("allow"),

/**
* Establish a secure connection first.
* Establish an unencrypted connection next if a secure connection cannot be established
*
* see the {@code sslmode} Postgres JDBC driver option
*/
* Establish a secure connection first.
* Establish an unencrypted connection next if a secure connection cannot be established
*
* see the {@code sslmode} Postgres JDBC driver option
*/
PREFER("prefer"),

/**
Expand Down Expand Up @@ -608,7 +608,7 @@ public static SchemaRefreshMode parse(String value) {
.withDefault(true)
.withImportance(Importance.LOW)
.withDescription("Whether or not to take a consistent snapshot of the tables." +
"Disabling this option may result in duplication of some already snapshot data in the streaming phase.");
"Disabling this option may result in duplication of some already snapshot data in the streaming phase.");

public enum AutoCreateMode implements EnumeratedValue {
/**
Expand Down Expand Up @@ -1217,10 +1217,10 @@ private static int validateFlushLsnSource(Configuration config, Field field, Fie
*/
public static JdbcConnection.ConnectionFactory getConnectionFactory(String hostName) {
return hostName.contains(":")
? JdbcConnection.patternBasedFactory(PostgresConnection.MULTI_HOST_URL_PATTERN, com.yugabyte.Driver.class.getName(),
PostgresConnection.class.getClassLoader(), JdbcConfiguration.PORT.withDefault(PostgresConnectorConfig.PORT.defaultValueAsString()))
: JdbcConnection.patternBasedFactory(PostgresConnection.URL_PATTERN, com.yugabyte.Driver.class.getName(),
PostgresConnection.class.getClassLoader(), JdbcConfiguration.PORT.withDefault(PostgresConnectorConfig.PORT.defaultValueAsString()));
? JdbcConnection.patternBasedFactory(PostgresConnection.MULTI_HOST_URL_PATTERN, com.yugabyte.Driver.class.getName(),
PostgresConnection.class.getClassLoader(), JdbcConfiguration.PORT.withDefault(PostgresConnectorConfig.PORT.defaultValueAsString()))
: JdbcConnection.patternBasedFactory(PostgresConnection.URL_PATTERN, com.yugabyte.Driver.class.getName(),
PostgresConnection.class.getClassLoader(), JdbcConfiguration.PORT.withDefault(PostgresConnectorConfig.PORT.defaultValueAsString()));
}

protected static int validateReplicaAutoSetField(Configuration config, Field field, Field.ValidationOutput problems) {
Expand Down Expand Up @@ -1281,7 +1281,8 @@ public Heartbeat createHeartbeat(TopicNamingStrategy topicNamingStrategy,

return new YBHeartbeatImpl(getHeartbeatInterval(), topicNamingStrategy.heartbeatTopic(),
getLogicalName(), schemaNameAdjuster);
} else {
}
else {
return super.createHeartbeat(topicNamingStrategy, schemaNameAdjuster, connectionProvider, errorHandler);
}
}
Expand Down Expand Up @@ -1313,13 +1314,13 @@ protected static int validateYBHostname(Configuration config, Field field, Field
}

if (!YB_HOSTNAME_PATTERN.asPredicate().test(hostName)) {
problems.accept(field, hostName, hostName + " has invalid format (only the underscore, hyphen, dot, comma, colon and alphanumeric characters are allowed)");
problems.accept(field, hostName,
hostName + " has invalid format (only the underscore, hyphen, dot, comma, colon and alphanumeric characters are allowed)");
++problemCount;
}
}

return problemCount;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
coordinator.start(taskContext, this.queue, metadataProvider);

return coordinator;
} finally {
}
finally {
previousContext.restore();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ public void applySchemaChangesForTableWithReplicaIdentity(int relationId, Table
applySchemaChangesForTable(relationId, table);

tableIdToReplicaIdentity.put(table.id(),
ReplicaIdentityInfo.ReplicaIdentity.parseFromDB(String.valueOf((char) replicaIdentityId)));
ReplicaIdentityInfo.ReplicaIdentity.parseFromDB(String.valueOf((char) replicaIdentityId)));

LOGGER.info("Replica identity being stored for table {} is {}", table.id(), getReplicaIdentity(table.id()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.util.Set;
import java.util.stream.Collectors;

import io.debezium.pipeline.spi.ChangeRecordEmitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,6 +29,7 @@
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.SnapshottingTask;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
Expand Down Expand Up @@ -92,14 +92,15 @@ protected SnapshotContext<PostgresPartition, PostgresOffsetContext> prepare(Post

@Override
protected ChangeRecordEmitter<PostgresPartition> getChangeRecordEmitter(
PostgresPartition partition, PostgresOffsetContext offset, TableId tableId, Object[] row,
Instant timestamp) {
PostgresPartition partition, PostgresOffsetContext offset, TableId tableId, Object[] row,
Instant timestamp) {
if (YugabyteDBServer.isEnabled() && connectorConfig.plugin().isYBOutput()) {
offset.event(tableId, timestamp);

return new YBSnapshotChangeRecordEmitter<>(partition, offset, row, getClock(),
connectorConfig);
} else {
connectorConfig);
}
else {
return super.getChangeRecordEmitter(partition, offset, tableId, row, timestamp);
}
}
Expand All @@ -110,9 +111,9 @@ protected void connectionCreated(RelationalSnapshotContext<PostgresPartition, Po
if (YugabyteDBServer.isEnabled()) {
// In case of YB, the consistent snapshot is performed as follows -
// 1) If connector created the slot, then the snapshotName returned as part of the CREATE_REPLICATION_SLOT
// command will have the hybrid time as of which the snapshot query is to be run
// command will have the hybrid time as of which the snapshot query is to be run
// 2) If slot already exists, then the snapshot query will be run as of the hybrid time corresponding to the
// restart_lsn. This information is available in the pg_replication_slots view
// restart_lsn. This information is available in the pg_replication_slots view
// In either case, the setSnapshotTransactionIsolationLevel function needs to be called so that the preparatory
// commands can be run on the snapshot connection so that the snapshot query can be run as of the appropriate
// hybrid time
Expand Down Expand Up @@ -290,7 +291,8 @@ protected void setSnapshotTransactionIsolationLevel(boolean isOnDemand) throws S
String transactionStatement = snapshotter.snapshotTransactionIsolationLevelStatement(slotCreatedInfo, isOnDemand);
LOGGER.info("Opening transaction with statement {}", transactionStatement);
jdbcConnection.executeWithoutCommitting(transactionStatement);
} else {
}
else {
LOGGER.info("Skipping setting snapshot time, snapshot data will not be consistent");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
import java.util.concurrent.atomic.AtomicReference;

import org.apache.kafka.connect.errors.ConnectException;
import com.yugabyte.core.BaseConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.yugabyte.core.BaseConnection;

import io.debezium.DebeziumException;
import io.debezium.connector.postgresql.connection.LogicalDecodingMessage;
import io.debezium.connector.postgresql.connection.Lsn;
Expand Down Expand Up @@ -143,8 +144,8 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio
// twice - the wal position locator is searching for a wal position
if (YugabyteDBServer.isEnabled()) {
LOGGER.info("PID for replication connection: {} on node {}",
replicationConnection.getBackendPid(),
replicationConnection.getConnectedNodeIp());
replicationConnection.getBackendPid(),
replicationConnection.getConnectedNodeIp());
}

if (hasStartLsnStoredInContext) {
Expand Down Expand Up @@ -185,7 +186,8 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio
if (!isInPreSnapshotCatchUpStreaming(this.effectiveOffset)) {
connection.commit();
}
} catch (Exception e) {
}
catch (Exception e) {
LOGGER.info("Commit failed while preparing for reconnect", e);
}
walPosition.enableFiltering();
Expand All @@ -202,7 +204,8 @@ public void execute(ChangeEventSourceContext context, PostgresPartition partitio
stream = this.replicationStream.get();
stream.startKeepAlive(Threads.newSingleThreadExecutor(YugabyteDBConnector.class, connectorConfig.getLogicalName(), KEEP_ALIVE_THREAD_NAME));
}
} else {
}
else {
LOGGER.info("Connector config provide.transaction.metadata is set to true. Therefore, skip records filtering in order to ship entire transactions.");
}

Expand Down Expand Up @@ -288,9 +291,10 @@ private void processReplicationMessages(PostgresPartition partition, PostgresOff

// Tx BEGIN/END event
if (message.isTransactionalMessage()) {
if(message.getOperation() == Operation.BEGIN) {
if (message.getOperation() == Operation.BEGIN) {
LOGGER.debug("Processing BEGIN with end LSN {} and txnid {}", lsn, message.getTransactionId());
} else {
}
else {
LOGGER.debug("Processing COMMIT with end LSN {} and txnid {}", lsn, message.getTransactionId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.errors.ConnectException;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.yugabyte.PGStatement;
import com.yugabyte.geometric.PGpoint;
import com.yugabyte.jdbc.PgArray;
import com.yugabyte.util.HStoreConverter;
import com.yugabyte.util.PGInterval;
import com.yugabyte.util.PGobject;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;

import io.debezium.config.CommonConnectorConfig.BinaryHandlingMode;
import io.debezium.connector.postgresql.PostgresConnectorConfig.HStoreHandlingMode;
import io.debezium.connector.postgresql.PostgresConnectorConfig.IntervalHandlingMode;
Expand Down
Loading