Skip to content

Commit

Permalink
Destinations snowflake+bigquery: Improve performance by filtering raw…
Browse files Browse the repository at this point in the history
… table on extracted_at (airbytehq#31191)

Co-authored-by: edgao <[email protected]>
  • Loading branch information
2 people authored and ariesgun committed Oct 23, 2023
1 parent 103e154 commit 6386f26
Show file tree
Hide file tree
Showing 19 changed files with 367 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airbyte.commons.string.Strings;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -381,7 +382,7 @@ public void incrementalDedupSameNameNamespace() throws Exception {
}
""")));

final String sql = generator.updateTable(stream, "");
final String sql = generator.updateTable(stream, "", Optional.empty());
destinationHandler.execute(sql);

final List<JsonNode> rawRecords = dumpRawTableRecords(streamId);
Expand Down Expand Up @@ -410,7 +411,7 @@ public void allTypes() throws Exception {
streamId,
BaseTypingDedupingTest.readRecords("sqlgenerator/alltypes_inputrecords.jsonl"));

final String sql = generator.updateTable(incrementalDedupStream, "_foo");
final String sql = generator.updateTable(incrementalDedupStream, "_foo", Optional.empty());
destinationHandler.execute(sql);

verifyRecords(
Expand All @@ -420,6 +421,140 @@ public void allTypes() throws Exception {
dumpFinalTableRecords(streamId, "_foo"));
}

/**
* Run through some plausible T+D scenarios to verify that we correctly identify the min raw
* timestamp.
*/
@Test
public void minTimestampBehavesCorrectly() throws Exception {
// When the raw table doesn't exist, there is no timestamp
assertEquals(Optional.empty(), destinationHandler.getMinTimestampForSync(streamId));

// When the raw table is empty, there is no timestamp
createRawTable(streamId);
assertEquals(Optional.empty(), destinationHandler.getMinTimestampForSync(streamId));

// If we insert some raw records with null loaded_at, we should get the min extracted_at
insertRawTableRecords(
streamId,
List.of(
Jsons.deserialize(
"""
{
"_airbyte_raw_id": "899d3bc3-7921-44f0-8517-c748a28fe338",
"_airbyte_extracted_at": "2023-01-01T00:00:00Z",
"_airbyte_data": {}
}
"""),
Jsons.deserialize(
"""
{
"_airbyte_raw_id": "47f46eb6-fcae-469c-a7fc-31d4b9ce7474",
"_airbyte_extracted_at": "2023-01-02T00:00:00Z",
"_airbyte_data": {}
}
""")));
Instant actualTimestamp = destinationHandler.getMinTimestampForSync(streamId).get();
assertTrue(
actualTimestamp.isBefore(Instant.parse("2023-01-01T00:00:00Z")),
"When all raw records have null loaded_at, the min timestamp should be earlier than all of their extracted_at values (2023-01-01). Was actually "
+ actualTimestamp);

// Execute T+D to set loaded_at on the records
createFinalTable(incrementalAppendStream, "");
final String sql = generator.updateTable(incrementalAppendStream, "", Optional.empty());
destinationHandler.execute(sql);

assertEquals(
destinationHandler.getMinTimestampForSync(streamId).get(),
Instant.parse("2023-01-02T00:00:00Z"),
"When all raw records have non-null loaded_at, the min timestamp should be equal to the latest extracted_at");

// If we insert another raw record with older extracted_at than the typed records, we should fetch a
// timestamp earlier than this new record.
// This emulates a sync inserting some records out of order, running T+D on newer records, inserting
// an older record, and then crashing before it can execute T+D. The next sync should recognize
// that older record as still needing to be processed.
insertRawTableRecords(
streamId,
List.of(Jsons.deserialize(
"""
{
"_airbyte_raw_id": "899d3bc3-7921-44f0-8517-c748a28fe338",
"_airbyte_extracted_at": "2023-01-01T12:00:00Z",
"_airbyte_data": {}
}
""")));
actualTimestamp = destinationHandler.getMinTimestampForSync(streamId).get();
// this is a pretty confusing pair of assertions. To explain them in more detail: There are three
// records in the raw table:
// * loaded_at not null, extracted_at = 2023-01-01 00:00Z
// * loaded_at is null, extracted_at = 2023-01-01 12:00Z
// * loaded_at not null, extracted_at = 2023-01-02 00:00Z
// We should have a timestamp which is older than the second record, but newer than or equal to
// (i.e. not before) the first record. This allows us to query the raw table using
// `_airbyte_extracted_at > ?`, which will include the second record and exclude the first record.
assertTrue(
actualTimestamp.isBefore(Instant.parse("2023-01-01T12:00:00Z")),
"When some raw records have null loaded_at, the min timestamp should be earlier than the oldest unloaded record (2023-01-01 12:00Z). Was actually "
+ actualTimestamp);
assertFalse(
actualTimestamp.isBefore(Instant.parse("2023-01-01T00:00:00Z")),
"When some raw records have null loaded_at, the min timestamp should be later than the newest loaded record older than the oldest unloaded record (2023-01-01 00:00Z). Was actually "
+ actualTimestamp);
}

/**
* Identical to {@link #allTypes()}, but queries for the min raw timestamp first. This verifies that
* if a previous sync doesn't fully type-and-dedupe a table, we still get those records on the next
* sync.
*/
@Test
public void handlePreexistingRecords() throws Exception {
createRawTable(streamId);
createFinalTable(incrementalDedupStream, "");
insertRawTableRecords(
streamId,
BaseTypingDedupingTest.readRecords("sqlgenerator/alltypes_inputrecords.jsonl"));

final Optional<Instant> minTimestampForSync = destinationHandler.getMinTimestampForSync(streamId);
assertTrue(minTimestampForSync.isPresent(), "After writing some raw records, the min timestamp should be present.");

final String sql = generator.updateTable(incrementalDedupStream, "", minTimestampForSync);
destinationHandler.execute(sql);

verifyRecords(
"sqlgenerator/alltypes_expectedrecords_raw.jsonl",
dumpRawTableRecords(streamId),
"sqlgenerator/alltypes_expectedrecords_final.jsonl",
dumpFinalTableRecords(streamId, ""));
}

/**
* Identical to {@link #handlePreexistingRecords()}, but queries for the min timestamp before
* inserting any raw records. This emulates a sync starting with an empty table.
*/
@Test
public void handleNoPreexistingRecords() throws Exception {
createRawTable(streamId);
final Optional<Instant> minTimestampForSync = destinationHandler.getMinTimestampForSync(streamId);
assertEquals(Optional.empty(), minTimestampForSync);

createFinalTable(incrementalDedupStream, "");
insertRawTableRecords(
streamId,
BaseTypingDedupingTest.readRecords("sqlgenerator/alltypes_inputrecords.jsonl"));

final String sql = generator.updateTable(incrementalDedupStream, "", minTimestampForSync);
destinationHandler.execute(sql);

verifyRecords(
"sqlgenerator/alltypes_expectedrecords_raw.jsonl",
dumpRawTableRecords(streamId),
"sqlgenerator/alltypes_expectedrecords_final.jsonl",
dumpFinalTableRecords(streamId, ""));
}

/**
* Test JSON Types encounted for a String Type field.
*
Expand All @@ -432,7 +567,7 @@ public void jsonStringifyTypes() throws Exception {
insertRawTableRecords(
streamId,
BaseTypingDedupingTest.readRecords("sqlgenerator/json_types_in_string_inputrecords.jsonl"));
final String sql = generator.updateTable(incrementalDedupStream, "_foo");
final String sql = generator.updateTable(incrementalDedupStream, "_foo", Optional.empty());
destinationHandler.execute(sql);
verifyRecords(
"sqlgenerator/json_types_in_string_expectedrecords_raw.jsonl",
Expand All @@ -449,7 +584,7 @@ public void timestampFormats() throws Exception {
streamId,
BaseTypingDedupingTest.readRecords("sqlgenerator/timestampformats_inputrecords.jsonl"));

final String sql = generator.updateTable(incrementalAppendStream, "");
final String sql = generator.updateTable(incrementalAppendStream, "", Optional.empty());
destinationHandler.execute(sql);

DIFFER.diffFinalTableRecords(
Expand All @@ -465,7 +600,7 @@ public void incrementalDedup() throws Exception {
streamId,
BaseTypingDedupingTest.readRecords("sqlgenerator/incrementaldedup_inputrecords.jsonl"));

final String sql = generator.updateTable(incrementalDedupStream, "");
final String sql = generator.updateTable(incrementalDedupStream, "", Optional.empty());
destinationHandler.execute(sql);

verifyRecords(
Expand Down Expand Up @@ -518,7 +653,7 @@ public void incrementalDedupNoCursor() throws Exception {
}
""")));

final String sql = generator.updateTable(streamConfig, "");
final String sql = generator.updateTable(streamConfig, "", Optional.empty());
destinationHandler.execute(sql);

final List<JsonNode> actualRawRecords = dumpRawTableRecords(streamId);
Expand All @@ -541,7 +676,7 @@ public void incrementalAppend() throws Exception {
streamId,
BaseTypingDedupingTest.readRecords("sqlgenerator/incrementaldedup_inputrecords.jsonl"));

final String sql = generator.updateTable(incrementalAppendStream, "");
final String sql = generator.updateTable(incrementalAppendStream, "", Optional.empty());
destinationHandler.execute(sql);

verifyRecordCounts(
Expand Down Expand Up @@ -598,7 +733,7 @@ public void cdcImmediateDeletion() throws Exception {
}
""")));

final String sql = generator.updateTable(cdcIncrementalDedupStream, "");
final String sql = generator.updateTable(cdcIncrementalDedupStream, "", Optional.empty());
destinationHandler.execute(sql);

verifyRecordCounts(
Expand Down Expand Up @@ -632,7 +767,7 @@ public void cdcIdempotent() throws Exception {
}
""")));

final String sql = generator.updateTable(cdcIncrementalAppendStream, "");
final String sql = generator.updateTable(cdcIncrementalAppendStream, "", Optional.empty());
// Execute T+D twice
destinationHandler.execute(sql);
destinationHandler.execute(sql);
Expand All @@ -657,7 +792,7 @@ public void cdcComplexUpdate() throws Exception {
"",
BaseTypingDedupingTest.readRecords("sqlgenerator/cdcupdate_inputrecords_final.jsonl"));

final String sql = generator.updateTable(cdcIncrementalDedupStream, "");
final String sql = generator.updateTable(cdcIncrementalDedupStream, "", Optional.empty());
destinationHandler.execute(sql);

verifyRecordCounts(
Expand Down Expand Up @@ -689,7 +824,7 @@ public void testCdcOrdering_updateAfterDelete() throws Exception {
streamId,
BaseTypingDedupingTest.readRecords("sqlgenerator/cdcordering_updateafterdelete_inputrecords.jsonl"));

final String sql = generator.updateTable(cdcIncrementalDedupStream, "");
final String sql = generator.updateTable(cdcIncrementalDedupStream, "", Optional.empty());
destinationHandler.execute(sql);

verifyRecordCounts(
Expand Down Expand Up @@ -726,7 +861,7 @@ public void testCdcOrdering_insertAfterDelete() throws Exception {
"",
BaseTypingDedupingTest.readRecords("sqlgenerator/cdcordering_insertafterdelete_inputrecords_final.jsonl"));

final String sql = generator.updateTable(cdcIncrementalDedupStream, "");
final String sql = generator.updateTable(cdcIncrementalDedupStream, "", Optional.empty());
destinationHandler.execute(sql);

verifyRecordCounts(
Expand Down Expand Up @@ -819,7 +954,7 @@ public void weirdColumnNames() throws Exception {

final String createTable = generator.createTable(stream, "", false);
destinationHandler.execute(createTable);
final String updateTable = generator.updateTable(stream, "");
final String updateTable = generator.updateTable(stream, "", Optional.empty());
destinationHandler.execute(updateTable);

verifyRecords(
Expand Down Expand Up @@ -868,7 +1003,7 @@ public void noCrashOnSpecialCharacters(final String specialChars) throws Excepti

final String createTable = generator.createTable(stream, "", false);
destinationHandler.execute(createTable);
final String updateTable = generator.updateTable(stream, "");
final String updateTable = generator.updateTable(stream, "", Optional.empty());
// Not verifying anything about the data; let's just make sure we don't crash.
destinationHandler.execute(updateTable);
} finally {
Expand Down Expand Up @@ -903,7 +1038,7 @@ public void testReservedKeywords() throws Exception {

final String createTable = generator.createTable(stream, "", false);
destinationHandler.execute(createTable);
final String updateTable = generator.updateTable(stream, "");
final String updateTable = generator.updateTable(stream, "", Optional.empty());
destinationHandler.execute(updateTable);

DIFFER.diffFinalTableRecords(
Expand Down Expand Up @@ -978,7 +1113,7 @@ public void noColumns() throws Exception {

final String createTable = generator.createTable(stream, "", false);
destinationHandler.execute(createTable);
final String updateTable = generator.updateTable(stream, "");
final String updateTable = generator.updateTable(stream, "", Optional.empty());
destinationHandler.execute(updateTable);

verifyRecords(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import autovalue.shaded.kotlin.Pair;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.time.Instant;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -57,6 +58,7 @@ public class DefaultTyperDeduper<DialectTableDefinition> implements TyperDeduper
private final ParsedCatalog parsedCatalog;
private Set<StreamId> overwriteStreamsWithTmpTable;
private final Set<Pair<String, String>> streamsWithSuccessfulSetup;
private final Map<StreamId, Optional<Instant>> minExtractedAtByStream;
// We only want to run a single instance of T+D per stream at a time. These objects are used for
// synchronization per stream.
// Use a read-write lock because we need the same semantics:
Expand All @@ -81,6 +83,7 @@ public DefaultTyperDeduper(final SqlGenerator<DialectTableDefinition> sqlGenerat
this.parsedCatalog = parsedCatalog;
this.v1V2Migrator = v1V2Migrator;
this.v2TableMigrator = v2TableMigrator;
this.minExtractedAtByStream = new ConcurrentHashMap<>();
this.streamsWithSuccessfulSetup = ConcurrentHashMap.newKeySet(parsedCatalog.streams().size());
this.tdLocks = new ConcurrentHashMap<>();
this.internalTdLocks = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -148,6 +151,9 @@ private CompletableFuture<Optional<Exception>> prepareTablesFuture(final StreamC
// The table doesn't exist. Create it. Don't force.
destinationHandler.execute(sqlGenerator.createTable(stream, NO_SUFFIX, false));
}
final Optional<Instant> minTimestampForSync = destinationHandler.getMinTimestampForSync(stream.id());
minExtractedAtByStream.put(stream.id(), minTimestampForSync);

streamsWithSuccessfulSetup.add(new Pair<>(stream.id().originalNamespace(), stream.id().originalName()));

// Use fair locking. This slows down lock operations, but that performance hit is by far dwarfed
Expand Down Expand Up @@ -214,7 +220,7 @@ public CompletableFuture<Optional<Exception>> typeAndDedupeTask(final StreamConf
try {
LOGGER.info("Attempting typing and deduping for {}.{}", originalNamespace, originalName);
final String suffix = getFinalTableSuffix(streamConfig.id());
final String sql = sqlGenerator.updateTable(streamConfig, suffix);
final String sql = sqlGenerator.updateTable(streamConfig, suffix, minExtractedAtByStream.get(streamConfig.id()));
destinationHandler.execute(sql);
} finally {
LOGGER.info("Allowing other threads to proceed for {}.{}", originalNamespace, originalName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.base.destination.typing_deduping;

import java.time.Instant;
import java.util.Optional;

public interface DestinationHandler<DialectTableDefinition> {
Expand All @@ -12,6 +13,14 @@ public interface DestinationHandler<DialectTableDefinition> {

boolean isFinalTableEmpty(StreamId id) throws Exception;

/**
* Returns the highest timestamp such that all records with _airbyte_extracted equal to or earlier
* than that timestamp have non-null _airbyte_loaded_at.
* <p>
* If the raw table is empty or does not exist, return an empty optional.
*/
Optional<Instant> getMinTimestampForSync(StreamId id) throws Exception;

void execute(final String sql) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.integrations.base.destination.typing_deduping;

import java.time.Instant;
import java.util.Optional;

public interface SqlGenerator<DialectTableDefinition> {

String SOFT_RESET_SUFFIX = "_ab_soft_reset";
Expand Down Expand Up @@ -68,8 +71,12 @@ default ColumnId buildColumnId(final String name) {
* @param finalSuffix the suffix of the final table to write to. If empty string, writes to the
* final table directly. Useful for full refresh overwrite syncs, where we write the entire
* sync to a temp table and then swap it into the final table at the end.
* @param minRawTimestamp The latest _airbyte_extracted_at for which all raw records with that
* timestamp have already been typed+deduped. Implementations MAY use this value in a
* {@code _airbyte_extracted_at > minRawTimestamp} filter on the raw table to improve query
* performance.
*/
String updateTable(final StreamConfig stream, String finalSuffix);
String updateTable(final StreamConfig stream, String finalSuffix, Optional<Instant> minRawTimestamp);

/**
* Drop the previous final table, and rename the new final table to match the old final table.
Expand Down
Loading

0 comments on commit 6386f26

Please sign in to comment.