Skip to content

Commit

Permalink
Automated Commit - Formatting Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Oct 18, 2023
1 parent c7f5e2f commit d6b581a
Showing 1 changed file with 36 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -442,9 +441,9 @@ private String updateTableQueryBuilder(final StreamConfig stream,
}

private String insertNewRecords(final StreamConfig stream,
final String finalSuffix,
final boolean forceSafeCasting,
final Optional<Instant> minRawTimestamp) {
final String finalSuffix,
final boolean forceSafeCasting,
final Optional<Instant> minRawTimestamp) {
final String columnList = stream.columns().keySet().stream().map(quotedColumnId -> quotedColumnId.name(QUOTE) + ",").collect(joining("\n"));
final String extractNewRawRecords = extractNewRawRecords(stream, forceSafeCasting, minRawTimestamp);

Expand Down Expand Up @@ -490,12 +489,16 @@ private String upsertNewRecords(final StreamConfig stream,
cursorComparison =
// First, compare the cursors.
"(target_table." + cursor + " < new_record." + cursor
// Then, break ties with extracted_at. (also explicitly check for both new_record and final table having null cursor
// because NULL != NULL in SQL)
+ " OR (target_table." + cursor + " = new_record." + cursor + " AND target_table._airbyte_extracted_at < new_record._airbyte_extracted_at)"
+ " OR (target_table." + cursor + " IS NULL AND new_record." + cursor + " IS NULL AND target_table._airbyte_extracted_at < new_record._airbyte_extracted_at)"
// Or, if the final table has null cursor but new_record has non-null cursor, then take the new record.
+ " OR (target_table." + cursor + " IS NULL AND new_record." + cursor + " IS NOT NULL))";
// Then, break ties with extracted_at. (also explicitly check for both new_record and final table
// having null cursor
// because NULL != NULL in SQL)
+ " OR (target_table." + cursor + " = new_record." + cursor
+ " AND target_table._airbyte_extracted_at < new_record._airbyte_extracted_at)"
+ " OR (target_table." + cursor + " IS NULL AND new_record." + cursor
+ " IS NULL AND target_table._airbyte_extracted_at < new_record._airbyte_extracted_at)"
// Or, if the final table has null cursor but new_record has non-null cursor, then take the new
// record.
+ " OR (target_table." + cursor + " IS NULL AND new_record." + cursor + " IS NOT NULL))";
} else {
// If there's no cursor, then we just take the most-recently-emitted record
cursorComparison = "target_table._airbyte_extracted_at < new_record._airbyte_extracted_at";
Expand Down Expand Up @@ -531,29 +534,29 @@ private String upsertNewRecords(final StreamConfig stream,
"cdcSkipInsertClause", cdcSkipInsertClause,
"column_list", columnList,
"newRecordColumnList", newRecordColumnList)).replace(
"""
MERGE ${project_id}.${final_table_id} target_table
USING (
${extractNewRawRecords}
) new_record
ON ${pkEquivalent}
${cdcDeleteClause}
WHEN MATCHED AND ${cursorComparison} THEN UPDATE SET
${columnAssignments}
_airbyte_meta = new_record._airbyte_meta,
_airbyte_raw_id = new_record._airbyte_raw_id,
_airbyte_extracted_at = new_record._airbyte_extracted_at
WHEN NOT MATCHED ${cdcSkipInsertClause} THEN INSERT (
${column_list}
_airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
) VALUES (
${newRecordColumnList}
new_record._airbyte_meta,
new_record._airbyte_raw_id,
new_record._airbyte_extracted_at
);""");
"""
MERGE ${project_id}.${final_table_id} target_table
USING (
${extractNewRawRecords}
) new_record
ON ${pkEquivalent}
${cdcDeleteClause}
WHEN MATCHED AND ${cursorComparison} THEN UPDATE SET
${columnAssignments}
_airbyte_meta = new_record._airbyte_meta,
_airbyte_raw_id = new_record._airbyte_raw_id,
_airbyte_extracted_at = new_record._airbyte_extracted_at
WHEN NOT MATCHED ${cdcSkipInsertClause} THEN INSERT (
${column_list}
_airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
) VALUES (
${newRecordColumnList}
new_record._airbyte_meta,
new_record._airbyte_raw_id,
new_record._airbyte_extracted_at
);""");
}

/**
Expand Down

0 comments on commit d6b581a

Please sign in to comment.