Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling empty tables #91

Open
wants to merge 3 commits into
base: voyager-main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,12 @@ public class YbExporterConsumer extends BaseChangeConsumer implements DebeziumEn
private RecordTransformer recordTransformer;
Thread flusherThread;
boolean shutDown = false;
private Config config;

@PostConstruct
void connect() throws URISyntaxException {
LOGGER.info("connect() called: dataDir = {}", dataDir);
final Config config = ConfigProvider.getConfig();
config = ConfigProvider.getConfig();

snapshotMode = config.getOptionalValue("debezium.source.snapshot.mode", String.class).orElse("");
retrieveSourceType(config);
Expand All @@ -69,7 +70,7 @@ void connect() throws URISyntaxException {
exportStatus.updateMode(getExportModeToStartWith(snapshotMode));
}
if (exportStatus.getMode().equals(ExportMode.STREAMING)) {
handleSnapshotComplete();
openCDCWriter();
}
parser = new KafkaConnectRecordParser(dataDir, sourceType, tableMap);
String propertyVal = PROP_PREFIX + SequenceObjectUpdater.propertyName;
Expand Down Expand Up @@ -249,12 +250,39 @@ private void checkIfSnapshotAlreadyComplete(Record r) {
}

private void handleSnapshotComplete() {
handleSnapshotsForEmptyTables();
closeSnapshotWriters();
exportStatus.updateMode(ExportMode.STREAMING);
exportStatus.flushToDisk();
openCDCWriter();
}

private void handleSnapshotsForEmptyTables(){
String tableListStr = config.getValue("debezium.source.table.include.list", String.class);
for (String qualifiedTableStr : tableListStr.split(",")) {
String[] parts = qualifiedTableStr.split("\\.");
if (parts.length != 2){
throw new RuntimeException(String.format("expected qualified table name in config table.include.list. Received %s", qualifiedTableStr));
}
String schemaName = parts[0];
String tableName = parts[1];
boolean tableSnapshotted = false;
for (Table t : tableMap.values()) {
if (t.tableName.equals(tableName) && t.schemaName.equals(schemaName)){
tableSnapshotted = true;
break;
}
}
if (!tableSnapshotted){
// table must have been empty.
// creating dummy entry.
Table t = new Table("", schemaName, tableName);
TableSnapshotWriterCSV writer = new TableSnapshotWriterCSV(dataDir, t, sourceType);
writer.close();
}
}
}

private void handleSnapshotOnlyComplete() {
if ((exportStatus.getMode() == ExportMode.STREAMING) && (snapshotMode.equals("initial_only"))) {
LOGGER.info("Snapshot complete. Interrupting thread as snapshot mode = initial_only");
Expand Down