Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination bigquery: Use merge statement to do T+D #31529

Merged
merged 17 commits into from
Oct 27, 2023
Merged

Conversation

edgao
Copy link
Contributor

@edgao edgao commented Oct 18, 2023

part of #30764

The merge statement replaces three statements in the previous version:

  • Insert new raw records to the final table
  • Dedup the final table
  • Execute CDC deletions
Example Bigquery SQL MERGE
MERGE `dataline-integration-testing`.`sql_generator_test_wtnwk`.`users_final` target_table
USING (
  -- First, fetch the new records from the raw table. We fetch any new records with null loaded_at, OR non-null cdc_deleted_at.
  -- This select query also dedups the new records, because our MERGE logic only works if we have at most one new row per PK.
  WITH intermediate_data AS (
    SELECT
      CAST(JSON_VALUE(`_airbyte_data`, '$."id1"') as INT64) as `id1`,
      CAST(JSON_VALUE(`_airbyte_data`, '$."id2"') as INT64) as `id2`,
      CAST(JSON_VALUE(`_airbyte_data`, '$."updated_at"') as TIMESTAMP) as `updated_at`,
      CAST(JSON_VALUE(`_airbyte_data`, '$."_ab_cdc_deleted_at"') as TIMESTAMP) as `_ab_cdc_deleted_at`,
      [] AS column_errors,
      _airbyte_raw_id,
      _airbyte_extracted_at
    FROM `dataline-integration-testing`.`sql_generator_test_wtnwk`.`users_raw`
    WHERE (
        _airbyte_loaded_at IS NULL
        OR (
          _airbyte_loaded_at IS NOT NULL
          AND JSON_VALUE(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL
        )
    ) AND _airbyte_extracted_at > '2023-10-19T20:04:49.148Z'
  ), new_records AS (
    SELECT
      `id1`,
      `id2`,
      `updated_at`,
      `_ab_cdc_deleted_at`,
      to_json(struct(COALESCE((SELECT ARRAY_AGG(unnested_column_errors IGNORE NULLS) FROM UNNEST(column_errors) unnested_column_errors), []) AS errors)) AS _airbyte_meta,
      _airbyte_raw_id,
      _airbyte_extracted_at
    FROM intermediate_data
  ), numbered_rows AS (
    SELECT *, row_number() OVER (
      PARTITION BY `id1`,`id2` ORDER BY `updated_at` DESC NULLS LAST, `_airbyte_extracted_at` DESC
    ) AS row_number
    FROM new_records
  )
  SELECT
    `id1`,
    `id2`,
    `updated_at`,
    `_ab_cdc_deleted_at`,
    _airbyte_meta,
    _airbyte_raw_id,
    _airbyte_extracted_at
  FROM numbered_rows
  WHERE row_number = 1
) new_record
-- PK equivalency: For each column in the PK, check that they're equal or both null.
-- This maintains existing behavior of deduping NULL values.
ON
  (target_table.`id1` = new_record.`id1` OR (target_table.`id1` IS NULL AND new_record.`id1` IS NULL))
  AND (target_table.`id2` = new_record.`id2` OR (target_table.`id2` IS NULL AND new_record.`id2` IS NULL))
-- First, check if we need to do a CDC deletion.
WHEN MATCHED
  -- If our new record is a deletion record
  AND new_record._ab_cdc_deleted_at IS NOT NULL
  -- And our new record is actually newer than the existing record.
  AND (
    -- This is the obvious case - the cursors are both non-null and the new record has a higher cursor
    target_table.`updated_at` < new_record.`updated_at`
    -- This is the tiebreaker case - the cursors are equal, so we compare extracted_at
    OR (target_table.`updated_at` = new_record.`updated_at` AND target_table._airbyte_extracted_at < new_record._airbyte_extracted_at)
    -- This is a weird tiebreaker case - both cursors are null; we break the tie using extracted_at
    OR (target_table.`updated_at` IS NULL AND new_record.`updated_at` IS NULL AND target_table._airbyte_extracted_at < new_record._airbyte_extracted_at)
    -- This is the cursor-change case. The final table's record has a null cursor, but the new record has non-null cursor.
    -- In this case, we assume that the incoming record is newer.
    -- This is useful for if a user e.g. renames their cursor: Old records will not have the new cursor field,
    -- but new records _will_ have the new cursor field.
    OR (target_table.`updated_at` IS NULL AND new_record.`updated_at` IS NOT NULL)
  )
  THEN DELETE
-- Otherwise, check if we can do an update.
WHEN MATCHED
  -- This is the same cursor comparison as the CDC branch.
  AND (
    target_table.`updated_at` < new_record.`updated_at`
    OR (target_table.`updated_at` = new_record.`updated_at` AND target_table._airbyte_extracted_at < new_record._airbyte_extracted_at)
    OR (target_table.`updated_at` IS NULL AND new_record.`updated_at` IS NULL AND target_table._airbyte_extracted_at < new_record._airbyte_extracted_at)
    OR (target_table.`updated_at` IS NULL AND new_record.`updated_at` IS NOT NULL)
  )
  THEN UPDATE SET
    `id1` = new_record.`id1`,
    `id2` = new_record.`id2`,
    `updated_at` = new_record.`updated_at`,
    `_ab_cdc_deleted_at` = new_record.`_ab_cdc_deleted_at`,
    _airbyte_meta = new_record._airbyte_meta,
    _airbyte_raw_id = new_record._airbyte_raw_id,
    _airbyte_extracted_at = new_record._airbyte_extracted_at
-- If the new row doesn't match an existing row, then we can just insert it to the final table.
WHEN NOT MATCHED AND new_record._ab_cdc_deleted_at IS NULL
  THEN INSERT (
      `id1`,
      `id2`,
      `updated_at`,
      `_ab_cdc_deleted_at`,
      _airbyte_meta,
      _airbyte_raw_id,
      _airbyte_extracted_at
    ) VALUES (
      new_record.`id1`,
      new_record.`id2`,
      new_record.`updated_at`,
      new_record.`_ab_cdc_deleted_at`,
      new_record._airbyte_meta,
      new_record._airbyte_raw_id,
      new_record._airbyte_extracted_at
    );

performance/cost notes

I ran a source-faker sync with 10M records to populate a dataset, and then ran some 1M record syncs to simulate incremental updates. tl;dr this is a significant improvement in most cases

Performance on master: (i.e. still deduping raw tables AND still using insert+delete)

  • initial sync: 51.199s, 17.5GiB
  • incrementals:
    • 39.659s, 4GiB
    • 43.146s, 4GiB

Performance from this branch: (i.e. not deduping raw tables AND using merge)

  • initial sync: 46.639s, 10.8GiB
  • incrementals:
    • 33.541s, 1.5GiB
    • 31.207s, 1.6GiB

However, running with source-faker emitting 10M records (i.e. rewriting the entire table) in incremental mode resulted in longer T+D times (but still lower costs). IMO that's fine; typical incremental syncs aren't rewriting the entire dataset (and they should probably just use full refresh/overwrite in that case anyway)

@vercel
Copy link

vercel bot commented Oct 18, 2023

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
airbyte-docs ❌ Failed (Inspect) Oct 27, 2023 3:38pm

@octavia-squidington-iii octavia-squidington-iii added the area/connectors Connector related issues label Oct 18, 2023
@github-actions
Copy link
Contributor

Before Merging a Connector Pull Request

Wow! What a great pull request you have here! 🎉

To merge this PR, ensure the following has been done/considered for each connector added or updated:

  • PR name follows PR naming conventions
  • Breaking changes are considered. If a Breaking Change is being introduced, ensure an Airbyte engineer has created a Breaking Change Plan.
  • Connector version has been incremented in the Dockerfile and metadata.yaml according to our Semantic Versioning for Connectors guidelines
  • You've updated the connector's metadata.yaml file any other relevant changes, including a breakingChanges entry for major version bumps. See metadata.yaml docs
  • Secrets in the connector's spec are annotated with airbyte_secret
  • All documentation files are up to date. (README.md, bootstrap.md, docs.md, etc...)
  • Changelog updated in docs/integrations/<source or destination>/<name>.md with an entry for the new version. See changelog example
  • Migration guide updated in docs/integrations/<source or destination>/<name>-migrations.md with an entry for the new version, if the version is a breaking change. See migration guide example
  • If set, you've ensured the icon is present in the platform-internal repo. (Docs)

If the checklist is complete, but the CI check is failing,

  1. Check for hidden checklists in your PR description

  2. Toggle the github label checklist-action-run on/off to re-run the checklist CI.

@edgao edgao force-pushed the edgao/td_via_merge branch from 60a309a to d6b581a Compare October 18, 2023 21:41
@edgao edgao changed the title Destinations v2: Use merge statements to do T+D Destination bigquery: Use merge statement to do T+D Oct 19, 2023
@edgao edgao changed the title Destination bigquery: Use merge statement to do T+D [wip] Destination bigquery: Use merge statement to do T+D Oct 19, 2023
@edgao edgao marked this pull request as ready for review October 19, 2023 23:11
@edgao edgao requested a review from a team as a code owner October 19, 2023 23:11
@airbyte-oss-build-runner

This comment was marked as outdated.

@edgao edgao changed the title [wip] Destination bigquery: Use merge statement to do T+D Destination bigquery: Use merge statement to do T+D Oct 19, 2023
@edgao edgao changed the title Destination bigquery: Use merge statement to do T+D [wip] Destination bigquery: Use merge statement to do T+D Oct 19, 2023
@edgao edgao marked this pull request as draft October 19, 2023 23:57
@edgao edgao changed the title [wip] Destination bigquery: Use merge statement to do T+D Destination bigquery: Use merge statement to do T+D Oct 20, 2023
@edgao edgao marked this pull request as ready for review October 20, 2023 21:37
@airbyte-oss-build-runner

This comment was marked as outdated.

Copy link
Contributor

@cynthiaxyin cynthiaxyin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IANA SQL expert but overall LGTM

cdcDeleteClause = "WHEN MATCHED AND new_record._ab_cdc_deleted_at IS NOT NULL AND " + cursorComparison + " THEN DELETE";
// And skip insertion entirely if there's no matching record.
// (This is possible if a single T+D batch contains both an insertion and deletion for the same PK)
cdcSkipInsertClause = "AND new_record._ab_cdc_deleted_at IS NULL";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To confirm my understanding, in the scenario of WHEN NOT MATCHED AND new_record._ab_cdc_deleted_at IS NOT NULL, we don't insert (or do anything else) bc the record has been deleted?

I'm also confused by the naming cdcSkipInsertClause which is then immediately followed by an INSERT on L549.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

confirm my understanding

correct. E.g. if a single T+D batch includes a record for INSERT INTO users ... and also DELETE FROM users ..., then new_record will have deleted_at=, and we don't want to insert it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cdcSkipInsertClause

yeah I'm not great at naming :/ how about... cdcDeletionClause? idk

@octavia-squidington-iv octavia-squidington-iv requested a review from a team October 25, 2023 04:23
Base automatically changed from edgao/skip_raw_dedup to master October 25, 2023 16:20
@edgao
Copy link
Contributor Author

edgao commented Oct 25, 2023

prerelease publish running https://github.com/airbytehq/airbyte/actions/runs/6643634205

@airbyte-oss-build-runner
Copy link
Collaborator

destination-bigquery test report (commit 8230aa9d08) - ❌

⏲️ Total pipeline duration: 08mn13s

Step Result
Build connector tar
Java Connector Unit Tests
Build destination-bigquery docker image for platform(s) linux/amd64
Java Connector Integration Tests
Validate metadata for destination-bigquery
Connector version semver check
Connector version increment check
QA checks

🔗 View the logs here

☁️ View runs for commit in Dagger Cloud

Please note that tests are only run on PR ready for review. Please set your PR to draft mode to not flood the CI engine and upstream service on following commits.
You can run the same pipeline locally on this branch with the airbyte-ci tool with the following command

airbyte-ci connectors --name=destination-bigquery test

@edgao edgao enabled auto-merge (squash) October 27, 2023 15:35
@octavia-squidington-iii octavia-squidington-iii added the area/documentation Improvements or additions to documentation label Oct 27, 2023
@edgao edgao merged commit 45505dd into master Oct 27, 2023
11 of 13 checks passed
@edgao edgao deleted the edgao/td_via_merge branch October 27, 2023 15:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/connectors Connector related issues area/documentation Improvements or additions to documentation connectors/destination/bigquery
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants