Skip to content

Commit

Permalink
[DBZ-PGYB][yugabyte/yugabyte-db#24200] Execute snapshot in chunks (#161)
Browse files Browse the repository at this point in the history
## Problem

For very large tables, the default `SELECT *` query can take a really
long time to complete leading to longer time for snapshots.

## Solution

This PR aims to implement snapshotting the table in parallel using an
inbuilt method `yb_hash_code` to only run the query for a given hash
range. The following 2 configuration properties are introduced with this
PR:
1. A new `snapshot.mode` called `parallel` - this will behave exactly
like `initial_only` but we will have the ability to launch multiple
tasks.
2. `primary.key.hash.columns` - this config takes in a comma separated
values of the primary key hash component of the table.

> **Note:** When `snapshot.mode` is set to `parallel`, we will not
support providing regex in the property `table.include.list` and the
user will need to specify the full name of the table in the property.
Additionally, we will only allow one table in the `table.include.list`
if `snapshot.mode` is `parallel`.
  • Loading branch information
vaibhav-yb authored Oct 17, 2024
1 parent 3ca8afd commit 2cda9b7
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps
final PostgresPartition partition = previousOffsets.getTheOnlyPartition();
final PostgresOffsetContext previousOffset = previousOffsets.getTheOnlyOffset();

previousLogContext.set(taskContext.configureLoggingContext("snapshot", partition));
previousLogContext.set(taskContext.configureLoggingContext(
String.format("snapshot|%s", taskContext.getTaskId()), partition));
SnapshotResult<PostgresOffsetContext> snapshotResult = doSnapshot(snapshotSource, context, partition, previousOffset);

getSignalProcessor(previousOffsets).ifPresent(s -> s.setContext(snapshotResult.getOffset()));
Expand All @@ -94,7 +95,8 @@ protected void executeChangeEventSources(CdcSourceTaskContext taskContext, Snaps
}
}
LOGGER.info("Transitioning to streaming");
previousLogContext.set(taskContext.configureLoggingContext("streaming", partition));
previousLogContext.set(taskContext.configureLoggingContext(
String.format("streaming|%s", taskContext.getTaskId()), partition));
streamEvents(context, partition, snapshotResult.getOffset());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.regex.Pattern;

import io.debezium.DebeziumException;
import io.debezium.connector.postgresql.snapshot.ParallelSnapshotter;
import io.debezium.data.Envelope;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.heartbeat.HeartbeatConnectionProvider;
Expand Down Expand Up @@ -212,6 +213,11 @@ public enum SnapshotMode implements EnumeratedValue {
*/
INITIAL_ONLY("initial_only", (c) -> new InitialOnlySnapshotter()),

/**
* Perform a snapshot using parallel tasks.
*/
PARALLEL("parallel", (c) -> new ParallelSnapshotter()),

/**
* Inject a custom snapshotter, which allows for more control over snapshots.
*/
Expand Down Expand Up @@ -983,6 +989,27 @@ public static AutoCreateMode parse(String value, String defaultValue) {
public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER
.withDefault(PostgresSourceInfoStructMaker.class.getName());

public static final Field TASK_ID = Field.create("task.id")
.withDisplayName("ID of the connector task")
.withType(Type.INT)
.withDefault(0)
.withImportance(Importance.LOW)
.withDescription("Internal use only");

public static final Field PRIMARY_KEY_HASH_COLUMNS = Field.create("primary.key.hash.columns")
.withDisplayName("Comma separated primary key fields")
.withType(Type.STRING)
.withImportance(Importance.LOW)
.withDescription("A comma separated value having all the hash components of the primary key")
.withValidation((config, field, output) -> {
if (config.getString(SNAPSHOT_MODE).equalsIgnoreCase("parallel") && config.getString(field, "").isEmpty()) {
output.accept(field, "", "primary.key.hash.columns cannot be empty when snapshot.mode is 'parallel'");
return 1;
}

return 0;
});

private final LogicalDecodingMessageFilter logicalDecodingMessageFilter;
private final HStoreHandlingMode hStoreHandlingMode;
private final IntervalHandlingMode intervalHandlingMode;
Expand Down Expand Up @@ -1108,6 +1135,14 @@ public boolean isFlushLsnOnSource() {
return flushLsnOnSource;
}

public int taskId() {
return getConfig().getInteger(TASK_ID);
}

public String primaryKeyHashColumns() {
return getConfig().getString(PRIMARY_KEY_HASH_COLUMNS);
}

@Override
public byte[] getUnavailableValuePlaceholder() {
String placeholder = getConfig().getString(UNAVAILABLE_VALUE_PLACEHOLDER);
Expand Down Expand Up @@ -1181,6 +1216,7 @@ protected SourceInfoStructMaker<? extends AbstractSourceInfo> getSourceInfoStruc
SNAPSHOT_MODE,
SNAPSHOT_MODE_CLASS,
YB_CONSISTENT_SNAPSHOT,
PRIMARY_KEY_HASH_COLUMNS,
HSTORE_HANDLING_MODE,
BINARY_HANDLING_MODE,
SCHEMA_NAME_ADJUSTMENT_MODE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public ChangeEventSourceCoordinator<PostgresPartition, PostgresOffsetContext> st
final PostgresValueConverter valueConverter = valueConverterBuilder.build(typeRegistry);

schema = new PostgresSchema(connectorConfig, defaultValueConverter, topicNamingStrategy, valueConverter);
this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicNamingStrategy);
this.taskContext = new PostgresTaskContext(connectorConfig, schema, topicNamingStrategy, connectorConfig.taskId());
final Offsets<PostgresPartition, PostgresOffsetContext> previousOffsets = getPreviousOffsets(
new PostgresPartition.Provider(connectorConfig, config), new PostgresOffsetContext.Loader(connectorConfig));
final Clock clock = Clock.system();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema sch
this.schema = schema;
}

protected PostgresTaskContext(PostgresConnectorConfig config, PostgresSchema schema, TopicNamingStrategy<TableId> topicNamingStrategy, int taskId) {
super(config.getContextName(), config.getLogicalName(), String.valueOf(taskId), config.getCustomMetricTags(), Collections::emptySet);

this.config = config;
if (config.xminFetchInterval().toMillis() > 0) {
this.refreshXmin = ElapsedTimeStrategy.constant(Clock.SYSTEM, config.xminFetchInterval().toMillis());
}
this.topicNamingStrategy = topicNamingStrategy;
assert schema != null;
this.schema = schema;
}

protected TopicNamingStrategy<TableId> topicNamingStrategy() {
return topicNamingStrategy;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package io.debezium.connector.postgresql;

import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -61,10 +62,81 @@ public void start(Map<String, String> props) {

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
if (props == null) {
return Collections.emptyList();
}

if (props.containsKey(PostgresConnectorConfig.SNAPSHOT_MODE.name())
&& props.get(PostgresConnectorConfig.SNAPSHOT_MODE.name())
.equalsIgnoreCase(PostgresConnectorConfig.SnapshotMode.PARALLEL.getValue())) {
LOGGER.info("Initialising parallel snapshot consumption");

final String tableIncludeList = props.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name());
// Perform basic validations.
validateSingleTableProvidedForParallelSnapshot(tableIncludeList);

// Publication auto create mode should not be for all tables.
if (props.containsKey(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE.name())
&& props.get(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE.name())
.equalsIgnoreCase(PostgresConnectorConfig.AutoCreateMode.ALL_TABLES.getValue())) {
throw new DebeziumException("Snapshot mode parallel is not supported with publication.autocreate.mode all_tables, " +
"use publication.autocreate.mode=filtered");
}

// Add configuration for select override.
props.put(PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name(), tableIncludeList);

return getConfigForParallelSnapshotConsumption(maxTasks);
}

// YB Note: Only applicable when snapshot mode is not parallel.
// this will always have just one task with the given list of properties
return props == null ? Collections.emptyList() : Collections.singletonList(new HashMap<>(props));
}

protected void validateSingleTableProvidedForParallelSnapshot(String tableIncludeList) throws DebeziumException {
if (tableIncludeList == null) {
throw new DebeziumException("No table provided, provide a table in the table.include.list");
} else if (tableIncludeList.contains(",")) {
// This might indicate the presence of multiple tables in the include list, we do not want that.
throw new DebeziumException("parallel snapshot consumption is only supported with one table at a time");
}
}

protected List<Map<String, String>> getConfigForParallelSnapshotConsumption(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();

final long upperBoundExclusive = 64 * 1024;
final long rangeSize = upperBoundExclusive / maxTasks;

for (int i = 0; i < maxTasks; ++i) {
Map<String, String> taskProps = new HashMap<>(this.props);

taskProps.put(PostgresConnectorConfig.TASK_ID.name(), String.valueOf(i));

long lowerBound = i * rangeSize;
long upperBound = (i == maxTasks - 1) ? upperBoundExclusive - 1 : (lowerBound + rangeSize - 1);

LOGGER.info("Using query for task {}: {}", i, getQueryForParallelSnapshotSelect(lowerBound, upperBound));

taskProps.put(
PostgresConnectorConfig.SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE.name() + "." + taskProps.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name()),
getQueryForParallelSnapshotSelect(lowerBound, upperBound)
);

taskConfigs.add(taskProps);
}

return taskConfigs;
}

protected String getQueryForParallelSnapshotSelect(long lowerBound, long upperBound) {
return String.format("SELECT * FROM %s WHERE yb_hash_code(%s) >= %d AND yb_hash_code(%s) <= %d",
props.get(PostgresConnectorConfig.TABLE_INCLUDE_LIST.name()),
props.get(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS.name()), lowerBound,
props.get(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS.name()), upperBound);
}

@Override
public void stop() {
this.props = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package io.debezium.connector.postgresql.snapshot;

import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.spi.OffsetState;
import io.debezium.connector.postgresql.spi.SlotState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Snapshotter class to take snapshot using parallel tasks.
*
* @author Vaibhav Kushwaha ([email protected])
*/
public class ParallelSnapshotter extends QueryingSnapshotter {
private final static Logger LOGGER = LoggerFactory.getLogger(ParallelSnapshotter.class);
private OffsetState sourceInfo;

@Override
public void init(PostgresConnectorConfig config, OffsetState sourceInfo, SlotState slotState) {
super.init(config, sourceInfo, slotState);
this.sourceInfo = sourceInfo;

LOGGER.info("Initialised ParallelSnapshotter for task {}", config.taskId());
}

@Override
public boolean shouldStream() {
return false;
}

@Override
public boolean shouldSnapshot() {
if (sourceInfo == null) {
LOGGER.info("Taking parallel snapshot for new datasource");
return true;
}
else if (sourceInfo.snapshotInEffect()) {
LOGGER.info("Found previous incomplete snapshot");
return true;
}
else {
LOGGER.info("Previous snapshot completed, no snapshot will be performed");
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,62 @@ public void shouldHaveBeforeImageOfUpdatedRow() throws InterruptedException {
assertThat(updateRecordValue.getStruct(Envelope.FieldName.AFTER).getStruct("aa").getInt32("value")).isEqualTo(404);
}

@Test
public void shouldFailIfNoPrimaryKeyHashColumnSpecifiedWithSnapshotModeParallel() throws Exception {
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.PARALLEL.getValue())
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.test")
.with(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS, "");

start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> {
assertFalse(success);
assertThat(message.contains("primary.key.hash.columns cannot be empty when snapshot.mode is 'parallel'")).isTrue();
});
}

@Test
public void shouldFailIfParallelSnapshotRunWithMultipleTables() throws Exception {
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.PARALLEL.getValue())
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.test,public.test2")
.with(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS, "id");

start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> {
assertFalse(success);

assertThat(error.getMessage().contains("parallel snapshot consumption is only supported with one table at a time")).isTrue();
});
}

@Test
public void shouldFailWithSnapshotModeParallelIfNoTableIncludeListProvided() throws Exception {
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.PARALLEL.getValue())
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "")
.with(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS, "id");

start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> {
assertFalse(success);

assertThat(error.getMessage().contains("No table provided, provide a table in the table.include.list")).isTrue();
});
}

@Test
public void shouldFailIfSnapshotModeParallelHasPublicationAutoCreateModeAllTables() throws Exception {
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, SnapshotMode.PARALLEL.getValue())
.with(PostgresConnectorConfig.TABLE_INCLUDE_LIST, "public.test")
.with(PostgresConnectorConfig.PUBLICATION_AUTOCREATE_MODE, PostgresConnectorConfig.AutoCreateMode.ALL_TABLES)
.with(PostgresConnectorConfig.PRIMARY_KEY_HASH_COLUMNS, "id");;

start(YugabyteDBConnector.class, configBuilder.build(), (success, message, error) -> {
assertFalse(success);

assertThat(error.getMessage().contains("Snapshot mode parallel is not supported with publication.autocreate.mode all_tables")).isTrue();
});
}

@Test
public void shouldResumeSnapshotIfFailingMidstream() throws Exception {
// insert another set of rows so we can stop at certain point
Expand Down

0 comments on commit 2cda9b7

Please sign in to comment.