Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destinations v2: Do not dedup raw table #31520

Merged
merged 15 commits into from
Oct 25, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -659,13 +659,11 @@ public void incrementalDedupNoCursor() throws Exception {
final List<JsonNode> actualRawRecords = dumpRawTableRecords(streamId);
final List<JsonNode> actualFinalRecords = dumpFinalTableRecords(streamId, "");
verifyRecordCounts(
1,
2,
actualRawRecords,
1,
actualFinalRecords);
assertAll(
() -> assertEquals("bar", actualRawRecords.get(0).get("_airbyte_data").get("string").asText()),
() -> assertEquals("bar", actualFinalRecords.get(0).get(generator.buildColumnId("string").name()).asText()));
assertEquals("bar", actualFinalRecords.get(0).get(generator.buildColumnId("string").name()).asText());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's no longer any reason to assert the raw records, so only assert the final record.

}

@Test
Expand Down Expand Up @@ -796,10 +794,9 @@ public void cdcComplexUpdate() throws Exception {
destinationHandler.execute(sql);

verifyRecordCounts(
// We keep the newest raw record per PK
7,
11,
dumpRawTableRecords(streamId),
5,
6,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dumpFinalTableRecords(streamId, ""));
}

Expand Down Expand Up @@ -828,7 +825,7 @@ public void testCdcOrdering_updateAfterDelete() throws Exception {
destinationHandler.execute(sql);

verifyRecordCounts(
1,
2,
dumpRawTableRecords(streamId),
0,
dumpFinalTableRecords(streamId, ""));
Expand Down Expand Up @@ -865,7 +862,7 @@ public void testCdcOrdering_insertAfterDelete() throws Exception {
destinationHandler.execute(sql);

verifyRecordCounts(
1,
2,
dumpRawTableRecords(streamId),
1,
dumpFinalTableRecords(streamId, ""));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.base.destination.typing_deduping;

import static org.junit.jupiter.api.Assertions.assertAll;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -270,7 +272,7 @@ public void fullRefreshAppend() throws Exception {

runSync(catalog, messages2);

final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl");
final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_append_raw.jsonl");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's no longer any difference between append and dedup raw records, so merge their expectedrecords files.

final List<JsonNode> expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl");
verifySyncResult(expectedRawRecords2, expectedFinalRecords2);
}
Expand Down Expand Up @@ -309,7 +311,7 @@ public void incrementalAppend() throws Exception {

runSync(catalog, messages2);

final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl");
final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_append_raw.jsonl");
final List<JsonNode> expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl");
verifySyncResult(expectedRawRecords2, expectedFinalRecords2);
}
Expand Down Expand Up @@ -346,7 +348,7 @@ public void incrementalDedup() throws Exception {

runSync(catalog, messages2);

final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_raw.jsonl");
final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_append_raw.jsonl");
final List<JsonNode> expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_final.jsonl");
verifySyncResult(expectedRawRecords2, expectedFinalRecords2);
}
Expand Down Expand Up @@ -381,7 +383,7 @@ public void incrementalDedupDefaultNamespace() throws Exception {

runSync(catalog, messages2);

final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_raw.jsonl");
final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_append_raw.jsonl");
final List<JsonNode> expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_incremental_dedup_final.jsonl");
verifySyncResult(expectedRawRecords2, expectedFinalRecords2, null, streamName);
}
Expand Down Expand Up @@ -437,7 +439,7 @@ public void testIncrementalSyncDropOneColumn() throws Exception {
runSync(catalog, messages2);

// The raw data is unaffected by the schema, but the final table should not have a `name` column.
final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_raw.jsonl");
final List<JsonNode> expectedRawRecords2 = readRecords("dat/sync2_expectedrecords_append_raw.jsonl");
final List<JsonNode> expectedFinalRecords2 = readRecords("dat/sync2_expectedrecords_fullrefresh_append_final.jsonl").stream()
.peek(record -> ((ObjectNode) record).remove(getSqlGenerator().buildColumnId("name").name()))
.toList();
Expand Down Expand Up @@ -518,12 +520,12 @@ public void incrementalDedupIdenticalName() throws Exception {
runSync(catalog, messages2);

verifySyncResult(
readRecords("dat/sync2_expectedrecords_incremental_dedup_raw.jsonl"),
readRecords("dat/sync2_expectedrecords_append_raw.jsonl"),
readRecords("dat/sync2_expectedrecords_incremental_dedup_final.jsonl"),
namespace1,
streamName);
verifySyncResult(
readRecords("dat/sync2_expectedrecords_incremental_dedup_raw2.jsonl"),
readRecords("dat/sync2_expectedrecords_append_raw2.jsonl"),
readRecords("dat/sync2_expectedrecords_incremental_dedup_final2.jsonl"),
namespace2,
streamName);
Expand Down Expand Up @@ -585,16 +587,15 @@ public void identicalNameSimultaneousSync() throws Exception {
// And this will dump sync2's entire stdout to our stdout
endSync(sync2);

verifySyncResult(
readRecords("dat/sync1_expectedrecords_dedup_raw.jsonl"),
readRecords("dat/sync1_expectedrecords_dedup_final.jsonl"),
namespace1,
streamName);
verifySyncResult(
readRecords("dat/sync1_expectedrecords_dedup_raw2.jsonl"),
readRecords("dat/sync1_expectedrecords_dedup_final2.jsonl"),
namespace2,
streamName);
// For simplicity, don't verify the raw table. Assume that if the final table is correct, then
// the raw data is correct. This is generally a safe assumption.
assertAll(
() -> DIFFER.diffFinalTableRecords(
readRecords("dat/sync1_expectedrecords_dedup_final.jsonl"),
dumpFinalTableRecords(namespace1, streamName)),
() -> DIFFER.diffFinalTableRecords(
readRecords("dat/sync1_expectedrecords_dedup_final2.jsonl"),
dumpFinalTableRecords(namespace2, streamName)));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
{"_airbyte_raw_id": "4d8674a5-eb6e-41ca-a310-69c64c88d101", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 0, "id2": 100, "updated_at": "2023-01-01T05:00:00Z", "_ab_cdc_deleted_at": null, "string": "zombie_returned"}}
// CDC generally outputs an explicit null for deleted_at, but verify that we can also handle the case where deleted_at is unset.
{"_airbyte_raw_id": "f0b59e49-8c74-4101-9f14-cb4d1193fd5a", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 4, "id2": 100, "updated_at": "2023-01-01T06:00:00Z", "string": "charlie"}}
// Verify that we can handle weird values in deleted_at
// Invalid values in _ab_cdc_deleted_at result in the record NOT being deleted. This behavior is up for debate, but it's an extreme edge case so not a high priority.
{"_airbyte_raw_id": "d4e1d989-c115-403c-9e68-5d320e6376bb", "_airbyte_extracted_at": "2023-01-01T00:00:00Z", "_airbyte_data": {"id1": 5, "id2": 100, "updated_at": "2023-01-01T07:00:00Z", "_ab_cdc_deleted_at": {}, "string": "david1"}}
Loading