Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destinations snowflake+bigquery: Improve performance by filtering raw table on extracted_at #31191

Merged
merged 25 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airbyte.commons.string.Strings;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import io.airbyte.protocol.models.v0.SyncMode;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -381,7 +382,7 @@ public void incrementalDedupSameNameNamespace() throws Exception {
}
""")));

final String sql = generator.updateTable(stream, "");
final String sql = generator.updateTable(stream, "", Optional.empty());
destinationHandler.execute(sql);

final List<JsonNode> rawRecords = dumpRawTableRecords(streamId);
Expand Down Expand Up @@ -410,7 +411,7 @@ public void allTypes() throws Exception {
streamId,
BaseTypingDedupingTest.readRecords("sqlgenerator/alltypes_inputrecords.jsonl"));

final String sql = generator.updateTable(incrementalDedupStream, "_foo");
final String sql = generator.updateTable(incrementalDedupStream, "_foo", Optional.empty());
destinationHandler.execute(sql);

verifyRecords(
Expand All @@ -420,6 +421,62 @@ public void allTypes() throws Exception {
dumpFinalTableRecords(streamId, "_foo"));
}

@Test
public void minTimestampInNonexistentTable() throws Exception {
assertEquals(Optional.empty(), destinationHandler.getMinTimestampForSync(streamId));
}

/**
* Identical to {@link #allTypes()}, but queries for the min raw timestamp first. This verifies that
* if a previous sync doesn't fully type-and-dedupe a table, we still get those records on the next
* sync.
*/
@Test
public void handlePreexistingRecords() throws Exception {
createRawTable(streamId);
createFinalTable(incrementalDedupStream, "");
insertRawTableRecords(
streamId,
BaseTypingDedupingTest.readRecords("sqlgenerator/alltypes_inputrecords.jsonl"));

final Optional<Instant> minTimestampForSync = destinationHandler.getMinTimestampForSync(streamId);
assertTrue(minTimestampForSync.isPresent(), "After writing some raw records, the min timestamp should be present.");

final String sql = generator.updateTable(incrementalDedupStream, "", minTimestampForSync);
destinationHandler.execute(sql);

verifyRecords(
"sqlgenerator/alltypes_expectedrecords_raw.jsonl",
dumpRawTableRecords(streamId),
"sqlgenerator/alltypes_expectedrecords_final.jsonl",
dumpFinalTableRecords(streamId, ""));
}

/**
* Identical to {@link #handlePreexistingRecords()}, but queries for the min timestamp before
* inserting any raw records. This emulates a sync starting with an empty table.
*/
@Test
public void handleNoPreexistingRecords() throws Exception {
createRawTable(streamId);
final Optional<Instant> minTimestampForSync = destinationHandler.getMinTimestampForSync(streamId);
assertEquals(Optional.empty(), minTimestampForSync);

createFinalTable(incrementalDedupStream, "");
insertRawTableRecords(
streamId,
BaseTypingDedupingTest.readRecords("sqlgenerator/alltypes_inputrecords.jsonl"));

final String sql = generator.updateTable(incrementalDedupStream, "", minTimestampForSync);
destinationHandler.execute(sql);

verifyRecords(
"sqlgenerator/alltypes_expectedrecords_raw.jsonl",
dumpRawTableRecords(streamId),
"sqlgenerator/alltypes_expectedrecords_final.jsonl",
dumpFinalTableRecords(streamId, ""));
}

@Test
public void timestampFormats() throws Exception {
createRawTable(streamId);
Expand All @@ -428,7 +485,7 @@ public void timestampFormats() throws Exception {
streamId,
BaseTypingDedupingTest.readRecords("sqlgenerator/timestampformats_inputrecords.jsonl"));

final String sql = generator.updateTable(incrementalAppendStream, "");
final String sql = generator.updateTable(incrementalAppendStream, "", Optional.empty());
destinationHandler.execute(sql);

DIFFER.diffFinalTableRecords(
Expand All @@ -444,7 +501,7 @@ public void incrementalDedup() throws Exception {
streamId,
BaseTypingDedupingTest.readRecords("sqlgenerator/incrementaldedup_inputrecords.jsonl"));

final String sql = generator.updateTable(incrementalDedupStream, "");
final String sql = generator.updateTable(incrementalDedupStream, "", Optional.empty());
destinationHandler.execute(sql);

verifyRecords(
Expand Down Expand Up @@ -497,7 +554,7 @@ public void incrementalDedupNoCursor() throws Exception {
}
""")));

final String sql = generator.updateTable(streamConfig, "");
final String sql = generator.updateTable(streamConfig, "", Optional.empty());
destinationHandler.execute(sql);

final List<JsonNode> actualRawRecords = dumpRawTableRecords(streamId);
Expand All @@ -520,7 +577,7 @@ public void incrementalAppend() throws Exception {
streamId,
BaseTypingDedupingTest.readRecords("sqlgenerator/incrementaldedup_inputrecords.jsonl"));

final String sql = generator.updateTable(incrementalAppendStream, "");
final String sql = generator.updateTable(incrementalAppendStream, "", Optional.empty());
destinationHandler.execute(sql);

verifyRecordCounts(
Expand Down Expand Up @@ -577,7 +634,7 @@ public void cdcImmediateDeletion() throws Exception {
}
""")));

final String sql = generator.updateTable(cdcIncrementalDedupStream, "");
final String sql = generator.updateTable(cdcIncrementalDedupStream, "", Optional.empty());
destinationHandler.execute(sql);

verifyRecordCounts(
Expand Down Expand Up @@ -611,7 +668,7 @@ public void cdcIdempotent() throws Exception {
}
""")));

final String sql = generator.updateTable(cdcIncrementalAppendStream, "");
final String sql = generator.updateTable(cdcIncrementalAppendStream, "", Optional.empty());
// Execute T+D twice
destinationHandler.execute(sql);
destinationHandler.execute(sql);
Expand All @@ -636,7 +693,7 @@ public void cdcComplexUpdate() throws Exception {
"",
BaseTypingDedupingTest.readRecords("sqlgenerator/cdcupdate_inputrecords_final.jsonl"));

final String sql = generator.updateTable(cdcIncrementalDedupStream, "");
final String sql = generator.updateTable(cdcIncrementalDedupStream, "", Optional.empty());
destinationHandler.execute(sql);

verifyRecordCounts(
Expand Down Expand Up @@ -668,7 +725,7 @@ public void testCdcOrdering_updateAfterDelete() throws Exception {
streamId,
BaseTypingDedupingTest.readRecords("sqlgenerator/cdcordering_updateafterdelete_inputrecords.jsonl"));

final String sql = generator.updateTable(cdcIncrementalDedupStream, "");
final String sql = generator.updateTable(cdcIncrementalDedupStream, "", Optional.empty());
destinationHandler.execute(sql);

verifyRecordCounts(
Expand Down Expand Up @@ -705,7 +762,7 @@ public void testCdcOrdering_insertAfterDelete() throws Exception {
"",
BaseTypingDedupingTest.readRecords("sqlgenerator/cdcordering_insertafterdelete_inputrecords_final.jsonl"));

final String sql = generator.updateTable(cdcIncrementalDedupStream, "");
final String sql = generator.updateTable(cdcIncrementalDedupStream, "", Optional.empty());
destinationHandler.execute(sql);

verifyRecordCounts(
Expand Down Expand Up @@ -798,7 +855,7 @@ public void weirdColumnNames() throws Exception {

final String createTable = generator.createTable(stream, "", false);
destinationHandler.execute(createTable);
final String updateTable = generator.updateTable(stream, "");
final String updateTable = generator.updateTable(stream, "", Optional.empty());
destinationHandler.execute(updateTable);

verifyRecords(
Expand Down Expand Up @@ -847,7 +904,7 @@ public void noCrashOnSpecialCharacters(final String specialChars) throws Excepti

final String createTable = generator.createTable(stream, "", false);
destinationHandler.execute(createTable);
final String updateTable = generator.updateTable(stream, "");
final String updateTable = generator.updateTable(stream, "", Optional.empty());
// Not verifying anything about the data; let's just make sure we don't crash.
destinationHandler.execute(updateTable);
} finally {
Expand Down Expand Up @@ -882,7 +939,7 @@ public void testReservedKeywords() throws Exception {

final String createTable = generator.createTable(stream, "", false);
destinationHandler.execute(createTable);
final String updateTable = generator.updateTable(stream, "");
final String updateTable = generator.updateTable(stream, "", Optional.empty());
destinationHandler.execute(updateTable);

DIFFER.diffFinalTableRecords(
Expand Down Expand Up @@ -957,7 +1014,7 @@ public void noColumns() throws Exception {

final String createTable = generator.createTable(stream, "", false);
destinationHandler.execute(createTable);
final String updateTable = generator.updateTable(stream, "");
final String updateTable = generator.updateTable(stream, "", Optional.empty());
destinationHandler.execute(updateTable);

verifyRecords(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import autovalue.shaded.kotlin.Pair;
import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.time.Instant;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -57,6 +58,7 @@ public class DefaultTyperDeduper<DialectTableDefinition> implements TyperDeduper
private final ParsedCatalog parsedCatalog;
private Set<StreamId> overwriteStreamsWithTmpTable;
private final Set<Pair<String, String>> streamsWithSuccessfulSetup;
private final Map<StreamId, Optional<Instant>> minExtractedAtByStream;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are so many parallel maps of stream -> (something)... I'll take some time tomorrow to shove those into a struct or something.

// We only want to run a single instance of T+D per stream at a time. These objects are used for
// synchronization per stream.
// Use a read-write lock because we need the same semantics:
Expand All @@ -81,6 +83,7 @@ public DefaultTyperDeduper(final SqlGenerator<DialectTableDefinition> sqlGenerat
this.parsedCatalog = parsedCatalog;
this.v1V2Migrator = v1V2Migrator;
this.v2TableMigrator = v2TableMigrator;
this.minExtractedAtByStream = new ConcurrentHashMap<>();
this.streamsWithSuccessfulSetup = ConcurrentHashMap.newKeySet(parsedCatalog.streams().size());
this.tdLocks = new ConcurrentHashMap<>();
this.internalTdLocks = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -148,6 +151,9 @@ private CompletableFuture<Optional<Exception>> prepareTablesFuture(final StreamC
// The table doesn't exist. Create it. Don't force.
destinationHandler.execute(sqlGenerator.createTable(stream, NO_SUFFIX, false));
}
final Optional<Instant> minTimestampForSync = destinationHandler.getMinTimestampForSync(stream.id());
minExtractedAtByStream.put(stream.id(), minTimestampForSync);

streamsWithSuccessfulSetup.add(new Pair<>(stream.id().originalNamespace(), stream.id().originalName()));

// Use fair locking. This slows down lock operations, but that performance hit is by far dwarfed
Expand Down Expand Up @@ -214,7 +220,7 @@ public CompletableFuture<Optional<Exception>> typeAndDedupeTask(final StreamConf
try {
LOGGER.info("Attempting typing and deduping for {}.{}", originalNamespace, originalName);
final String suffix = getFinalTableSuffix(streamConfig.id());
final String sql = sqlGenerator.updateTable(streamConfig, suffix);
final String sql = sqlGenerator.updateTable(streamConfig, suffix, minExtractedAtByStream.get(streamConfig.id()));
destinationHandler.execute(sql);
} finally {
LOGGER.info("Allowing other threads to proceed for {}.{}", originalNamespace, originalName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.base.destination.typing_deduping;

import java.time.Instant;
import java.util.Optional;

public interface DestinationHandler<DialectTableDefinition> {
Expand All @@ -12,6 +13,14 @@ public interface DestinationHandler<DialectTableDefinition> {

boolean isFinalTableEmpty(StreamId id) throws Exception;

/**
* Returns the highest timestamp such that all records with _airbyte_extracted equal to or earlier
* than that timestamp have non-null _airbyte_loaded_at.
* <p>
* If the raw table is empty or does not exist, return an empty optional.
*/
Optional<Instant> getMinTimestampForSync(StreamId id) throws Exception;

void execute(final String sql) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.integrations.base.destination.typing_deduping;

import java.time.Instant;
import java.util.Optional;

public interface SqlGenerator<DialectTableDefinition> {

String SOFT_RESET_SUFFIX = "_ab_soft_reset";
Expand Down Expand Up @@ -68,8 +71,12 @@ default ColumnId buildColumnId(final String name) {
* @param finalSuffix the suffix of the final table to write to. If empty string, writes to the
* final table directly. Useful for full refresh overwrite syncs, where we write the entire
* sync to a temp table and then swap it into the final table at the end.
* @param minRawTimestamp The latest _airbyte_extracted_at for which all raw records with that
* timestamp have already been typed+deduped. Implementations MAY use this value in a
* {@code _airbyte_extracted_at > minRawTimestamp} filter on the raw table to improve query
* performance.
*/
String updateTable(final StreamConfig stream, String finalSuffix);
String updateTable(final StreamConfig stream, String finalSuffix, Optional<Instant> minRawTimestamp);

/**
* Drop the previous final table, and rename the new final table to match the old final table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.mockito.Mockito.when;

import io.airbyte.protocol.models.v0.DestinationSyncMode;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -32,9 +33,10 @@ public class DefaultTyperDeduperTest {
private TyperDeduper typerDeduper;

@BeforeEach
void setup() {
void setup() throws Exception {
sqlGenerator = spy(new MockSqlGenerator());
destinationHandler = mock(DestinationHandler.class);
when(destinationHandler.getMinTimestampForSync(any())).thenReturn(Optional.empty());
migrator = new NoOpDestinationV1V2Migrator<>();

final ParsedCatalog parsedCatalog = new ParsedCatalog(List.of(
Expand Down Expand Up @@ -141,6 +143,7 @@ void existingEmptyTableMatchingSchema() throws Exception {
*/
@Test
void existingNonemptyTable() throws Exception {
when(destinationHandler.getMinTimestampForSync(any())).thenReturn(Optional.of(Instant.parse("2023-01-01T12:34:56Z")));
when(destinationHandler.findExistingTable(any())).thenReturn(Optional.of("foo"));
when(destinationHandler.isFinalTableEmpty(any())).thenReturn(false);

Expand All @@ -155,11 +158,11 @@ void existingNonemptyTable() throws Exception {

typerDeduper.typeAndDedupe("overwrite_ns", "overwrite_stream", false);
// NB: no airbyte_tmp suffix on the non-overwrite streams
verify(destinationHandler).execute("UPDATE TABLE overwrite_ns.overwrite_stream_airbyte_tmp");
verify(destinationHandler).execute("UPDATE TABLE overwrite_ns.overwrite_stream_airbyte_tmp WITH extracted_at > 2023-01-01T12:34:56Z");
typerDeduper.typeAndDedupe("append_ns", "append_stream", false);
verify(destinationHandler).execute("UPDATE TABLE append_ns.append_stream");
verify(destinationHandler).execute("UPDATE TABLE append_ns.append_stream WITH extracted_at > 2023-01-01T12:34:56Z");
typerDeduper.typeAndDedupe("dedup_ns", "dedup_stream", false);
verify(destinationHandler).execute("UPDATE TABLE dedup_ns.dedup_stream");
verify(destinationHandler).execute("UPDATE TABLE dedup_ns.dedup_stream WITH extracted_at > 2023-01-01T12:34:56Z");
verifyNoMoreInteractions(ignoreStubs(destinationHandler));
clearInvocations(destinationHandler);

Expand All @@ -174,6 +177,7 @@ void existingNonemptyTable() throws Exception {
*/
@Test
void existingNonemptyTableMatchingSchema() throws Exception {
when(destinationHandler.getMinTimestampForSync(any())).thenReturn(Optional.of(Instant.now()));
when(destinationHandler.findExistingTable(any())).thenReturn(Optional.of("foo"));
when(destinationHandler.isFinalTableEmpty(any())).thenReturn(false);
when(sqlGenerator.existingSchemaMatchesStreamConfig(any(), any())).thenReturn(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package io.airbyte.integrations.base.destination.typing_deduping;

import java.time.Instant;
import java.util.Optional;

/**
* Basic SqlGenerator mock. See {@link DefaultTyperDeduperTest} for example usage.
*/
Expand Down Expand Up @@ -35,8 +38,11 @@ public String softReset(final StreamConfig stream) {
}

@Override
public String updateTable(final StreamConfig stream, final String finalSuffix) {
return "UPDATE TABLE " + stream.id().finalTableId("", finalSuffix);
public String updateTable(final StreamConfig stream, final String finalSuffix, final Optional<Instant> minRawTimestamp) {
final String timestampFilter = minRawTimestamp
.map(timestamp -> " WITH extracted_at > " + timestamp)
edgao marked this conversation as resolved.
Show resolved Hide resolved
.orElse("");
return "UPDATE TABLE " + stream.id().finalTableId("", finalSuffix) + timestampFilter;
}

@Override
Expand Down
Loading