Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination BigQuery + Snowflake : Disable type and dedupe step controlled by flag in Connector spec #31686

Merged
merged 25 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -109,6 +110,19 @@ public abstract class BaseTypingDedupingTest {
*/
protected abstract List<JsonNode> dumpRawTableRecords(String streamNamespace, String streamName) throws Exception;

/**
* Utility method for tests to check if table exists
*
* @param streamNamespace
* @param streamName
* @return
* @throws Exception
*/
protected boolean checkTableExists(String streamNamespace, String streamName) {
// Implementation is specific to destination's tests.
return true;
edgao marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* For a given stream, return the records that exist in the destination's final table. Each record
* must be in the format {"_airbyte_raw_id": "...", "_airbyte_extracted_at": "...", "_airbyte_meta":
Expand Down Expand Up @@ -180,6 +194,16 @@ protected JsonNode getConfig() {
return config;
}

/**
* Override this method only when skipping T&D and only compare raw tables and skip final table
* comparison. For every other case it should always return false.
*
* @return
*/
protected boolean disableFinalTableComparison() {
edgao marked this conversation as resolved.
Show resolved Hide resolved
return false;
}

@BeforeEach
public void setup() throws Exception {
config = generateConfig();
Expand Down Expand Up @@ -230,7 +254,7 @@ public void fullRefreshOverwrite() throws Exception {

final List<JsonNode> expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl");
verifySyncResult(expectedRawRecords1, expectedFinalRecords1);
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison());

// Second sync
final List<AirbyteMessage> messages2 = readMessages("dat/sync2_messages.jsonl");
Expand All @@ -239,7 +263,7 @@ public void fullRefreshOverwrite() throws Exception {

final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_raw.jsonl");
final List<JsonNode> expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_overwrite_final.jsonl");
verifySyncResult(expectedRawRecords2, expectedFinalRecords2);
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison());
}

/**
Expand All @@ -265,7 +289,7 @@ public void fullRefreshAppend() throws Exception {

final List<JsonNode> expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl");
verifySyncResult(expectedRawRecords1, expectedFinalRecords1);
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison());

// Second sync
final List<AirbyteMessage> messages2 = readMessages("dat/sync2_messages.jsonl");
Expand All @@ -274,7 +298,7 @@ public void fullRefreshAppend() throws Exception {

final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl");
verifySyncResult(expectedRawRecords2, expectedFinalRecords2);
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison());
}

/**
Expand Down Expand Up @@ -304,7 +328,7 @@ public void incrementalAppend() throws Exception {

final List<JsonNode> expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl");
verifySyncResult(expectedRawRecords1, expectedFinalRecords1);
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison());

// Second sync
final List<AirbyteMessage> messages2 = readMessages("dat/sync2_messages.jsonl");
Expand All @@ -313,7 +337,7 @@ public void incrementalAppend() throws Exception {

final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl");
verifySyncResult(expectedRawRecords2, expectedFinalRecords2);
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison());
}

/**
Expand Down Expand Up @@ -341,7 +365,7 @@ public void incrementalDedup() throws Exception {

final List<JsonNode> expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_dedup_final.jsonl");
verifySyncResult(expectedRawRecords1, expectedFinalRecords1);
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison());

// Second sync
final List<AirbyteMessage> messages2 = readMessages("dat/sync2_messages.jsonl");
Expand All @@ -350,7 +374,7 @@ public void incrementalDedup() throws Exception {

final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_final.jsonl");
verifySyncResult(expectedRawRecords2, expectedFinalRecords2);
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison());
}

/**
Expand All @@ -376,7 +400,7 @@ public void incrementalDedupDefaultNamespace() throws Exception {

final List<JsonNode> expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_dedup_final.jsonl");
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, null, streamName);
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, null, streamName, disableFinalTableComparison());

// Second sync
final List<AirbyteMessage> messages2 = readMessages("dat/sync2_messages.jsonl", null, streamName);
Expand All @@ -385,7 +409,7 @@ public void incrementalDedupDefaultNamespace() throws Exception {

final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_final.jsonl");
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, null, streamName);
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, null, streamName, disableFinalTableComparison());
}

@Test
Expand Down Expand Up @@ -428,7 +452,7 @@ public void testIncrementalSyncDropOneColumn() throws Exception {

final List<JsonNode> expectedRawRecords1 = readRecords("dat/sync1_expectedrecords_raw.jsonl");
final List<JsonNode> expectedFinalRecords1 = readRecords("dat/sync1_expectedrecords_nondedup_final.jsonl");
verifySyncResult(expectedRawRecords1, expectedFinalRecords1);
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison());

// Second sync
final List<AirbyteMessage> messages2 = readMessages("dat/sync2_messages.jsonl");
Expand All @@ -443,7 +467,7 @@ public void testIncrementalSyncDropOneColumn() throws Exception {
final List<JsonNode> expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl").stream()
.peek(record -> ((ObjectNode) record).remove(getSqlGenerator().buildColumnId("name").name()))
.toList();
verifySyncResult(expectedRawRecords2, expectedFinalRecords2);
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison());
}

@Test
Expand Down Expand Up @@ -505,12 +529,12 @@ public void incrementalDedupIdenticalName() throws Exception {
readRecords("dat/sync1_expectedrecords_raw.jsonl"),
readRecords("dat/sync1_expectedrecords_dedup_final.jsonl"),
namespace1,
streamName);
streamName, disableFinalTableComparison());
verifySyncResult(
readRecords("dat/sync1_expectedrecords_raw2.jsonl"),
readRecords("dat/sync1_expectedrecords_dedup_final2.jsonl"),
namespace2,
streamName);
streamName, disableFinalTableComparison());

// Second sync
final List<AirbyteMessage> messages2 = Stream.concat(
Expand All @@ -523,12 +547,12 @@ public void incrementalDedupIdenticalName() throws Exception {
readRecords("dat/sync2_expectedrecords_raw.jsonl"),
readRecords("dat/sync2_expectedrecords_incremental_dedup_final.jsonl"),
namespace1,
streamName);
streamName, disableFinalTableComparison());
verifySyncResult(
readRecords("dat/sync2_expectedrecords_raw2.jsonl"),
readRecords("dat/sync2_expectedrecords_incremental_dedup_final2.jsonl"),
namespace2,
streamName);
streamName, disableFinalTableComparison());
}

/**
Expand Down Expand Up @@ -642,7 +666,7 @@ public void incrementalDedupChangeCursor() throws Exception {

final List<JsonNode> expectedRawRecords1 = readRecords("dat/sync1_cursorchange_expectedrecords_dedup_raw.jsonl");
final List<JsonNode> expectedFinalRecords1 = readRecords("dat/sync1_cursorchange_expectedrecords_dedup_final.jsonl");
verifySyncResult(expectedRawRecords1, expectedFinalRecords1);
verifySyncResult(expectedRawRecords1, expectedFinalRecords1, disableFinalTableComparison());

// Second sync
final List<AirbyteMessage> messages2 = readMessages("dat/sync2_messages.jsonl");
Expand All @@ -653,7 +677,7 @@ public void incrementalDedupChangeCursor() throws Exception {

final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_cursorchange_expectedrecords_incremental_dedup_raw.jsonl");
final List<JsonNode> expectedFinalRecords2 = readRecords("dat/sync2_cursorchange_expectedrecords_incremental_dedup_final.jsonl");
verifySyncResult(expectedRawRecords2, expectedFinalRecords2);
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, disableFinalTableComparison());
}

@Test
Expand All @@ -679,18 +703,26 @@ public void testDataTypes() throws Exception {
// this test probably needs some configuration per destination to specify what values are supported?
}

protected void verifySyncResult(final List<JsonNode> expectedRawRecords, final List<JsonNode> expectedFinalRecords) throws Exception {
verifySyncResult(expectedRawRecords, expectedFinalRecords, streamNamespace, streamName);
protected void verifySyncResult(final List<JsonNode> expectedRawRecords,
final List<JsonNode> expectedFinalRecords,
boolean disableFinalTableComparison)
throws Exception {
verifySyncResult(expectedRawRecords, expectedFinalRecords, streamNamespace, streamName, disableFinalTableComparison);
}

private void verifySyncResult(final List<JsonNode> expectedRawRecords,
final List<JsonNode> expectedFinalRecords,
final String streamNamespace,
final String streamName)
final String streamName,
boolean disableFinalTableComparison)
throws Exception {
final List<JsonNode> actualRawRecords = dumpRawTableRecords(streamNamespace, streamName);
final List<JsonNode> actualFinalRecords = dumpFinalTableRecords(streamNamespace, streamName);
DIFFER.verifySyncResult(expectedRawRecords, actualRawRecords, expectedFinalRecords, actualFinalRecords);
if (disableFinalTableComparison) {
DIFFER.diffRawTableRecords(expectedRawRecords, actualRawRecords);
} else {
final List<JsonNode> actualFinalRecords = dumpFinalTableRecords(streamNamespace, streamName);
DIFFER.verifySyncResult(expectedRawRecords, actualRawRecords, expectedFinalRecords, actualFinalRecords);
}
}

public static List<JsonNode> readRecords(final String filename) throws IOException {
Expand Down Expand Up @@ -725,7 +757,15 @@ protected void runSync(final ConfiguredAirbyteCatalog catalog, final List<Airbyt
}

protected void runSync(final ConfiguredAirbyteCatalog catalog, final List<AirbyteMessage> messages, final String imageName) throws Exception {
final AirbyteDestination destination = startSync(catalog, imageName);
runSync(catalog, messages, imageName, Function.identity());
}

protected void runSync(final ConfiguredAirbyteCatalog catalog,
final List<AirbyteMessage> messages,
final String imageName,
Function<JsonNode, JsonNode> configTransformer)
throws Exception {
final AirbyteDestination destination = startSync(catalog, imageName, configTransformer);
pushMessages(messages, destination);
endSync(destination);
}
Expand All @@ -735,6 +775,22 @@ protected AirbyteDestination startSync(final ConfiguredAirbyteCatalog catalog) t
}

protected AirbyteDestination startSync(final ConfiguredAirbyteCatalog catalog, final String imageName) throws Exception {
return startSync(catalog, imageName, Function.identity());
}

/**
*
* @param catalog
* @param imageName
* @param configTransformer - test specific config overrides or additions can be performed with this
* function
* @return
* @throws Exception
*/
protected AirbyteDestination startSync(final ConfiguredAirbyteCatalog catalog,
final String imageName,
Function<JsonNode, JsonNode> configTransformer)
throws Exception {
synchronized (this) {
catalog.getStreams().forEach(s -> streamsToTearDown.add(AirbyteStreamNameNamespacePair.fromAirbyteStream(s.getStream())));
}
Expand All @@ -750,11 +806,11 @@ protected AirbyteDestination startSync(final ConfiguredAirbyteCatalog catalog, f
localRoot.toString(),
"host",
Collections.emptyMap());

final JsonNode transformedConfig = configTransformer.apply(config);
final WorkerDestinationConfig destinationConfig = new WorkerDestinationConfig()
.withConnectionId(UUID.randomUUID())
.withCatalog(convertProtocolObject(catalog, io.airbyte.protocol.models.ConfiguredAirbyteCatalog.class))
.withDestinationConnectionConfiguration(config);
.withDestinationConnectionConfiguration(transformedConfig);

final AirbyteDestination destination = new DefaultAirbyteDestination(new AirbyteIntegrationLauncher(
"0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseDestinationV1V2Migrator<DialectTableDefinition> implements DestinationV1V2Migrator {
public abstract class BaseDestinationV1V2Migrator<DialectTableDefinition> implements DestinationV1V2Migrator<DialectTableDefinition> {

protected static final Logger LOGGER = LoggerFactory.getLogger(BaseDestinationV1V2Migrator.class);

@Override
public void migrateIfNecessary(
final SqlGenerator sqlGenerator,
final DestinationHandler destinationHandler,
final SqlGenerator<DialectTableDefinition> sqlGenerator,
final DestinationHandler<DialectTableDefinition> destinationHandler,
final StreamConfig streamConfig)
throws TableNotMigratedException, UnexpectedSchemaException, Exception {
throws Exception {
LOGGER.info("Assessing whether migration is necessary for stream {}", streamConfig.id().finalName());
if (shouldMigrate(streamConfig)) {
LOGGER.info("Starting v2 Migration for stream {}", streamConfig.id().finalName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
* <li>{@link #typeAndDedupe(String, String, boolean)} as needed throughout the sync</li>
* <li>{@link #commitFinalTables()} once at the end of the sync</li>
* </ol>
* Note that createFinalTables initializes some internal state. The other methods will throw an
* Note that #prepareTables() initializes some internal state. The other methods will throw an
* exception if that method was not called.
*/
public class DefaultTyperDeduper<DialectTableDefinition> implements TyperDeduper {
Expand All @@ -54,7 +54,7 @@ public class DefaultTyperDeduper<DialectTableDefinition> implements TyperDeduper
private final DestinationHandler<DialectTableDefinition> destinationHandler;

private final DestinationV1V2Migrator<DialectTableDefinition> v1V2Migrator;
private final V2TableMigrator<DialectTableDefinition> v2TableMigrator;
private final V2TableMigrator v2TableMigrator;
private final ParsedCatalog parsedCatalog;
private Set<StreamId> overwriteStreamsWithTmpTable;
private final Set<Pair<String, String>> streamsWithSuccessfulSetup;
Expand All @@ -76,7 +76,7 @@ public DefaultTyperDeduper(final SqlGenerator<DialectTableDefinition> sqlGenerat
final DestinationHandler<DialectTableDefinition> destinationHandler,
final ParsedCatalog parsedCatalog,
final DestinationV1V2Migrator<DialectTableDefinition> v1V2Migrator,
final V2TableMigrator<DialectTableDefinition> v2TableMigrator,
final V2TableMigrator v2TableMigrator,
final int defaultThreadCount) {
this.sqlGenerator = sqlGenerator;
this.destinationHandler = destinationHandler;
Expand All @@ -97,7 +97,7 @@ public DefaultTyperDeduper(
final ParsedCatalog parsedCatalog,
final DestinationV1V2Migrator<DialectTableDefinition> v1V2Migrator,
final int defaultThreadCount) {
this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2TableMigrator<>(), defaultThreadCount);
this(sqlGenerator, destinationHandler, parsedCatalog, v1V2Migrator, new NoopV2TableMigrator(), defaultThreadCount);
}

public void prepareTables() throws Exception {
Expand Down
Loading
Loading