Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

T&D: Reconsider Merge to replace insert and delete #30764

Closed
evantahler opened this issue Sep 26, 2023 · 6 comments
Closed

T&D: Reconsider Merge to replace insert and delete #30764

evantahler opened this issue Sep 26, 2023 · 6 comments
Assignees
Labels
team/destinations Destinations team's backlog

Comments

@evantahler
Copy link
Contributor

evantahler commented Sep 26, 2023

This is snowflake's version of an "Upsert"
... and maybe we can do CDC deletes as well?!

Steps:

  • try it out and measure. This might be faster... we need to check
    • Use source-faker with a lot of rows.
    • Maybe try it with our own BQ?
  • if it is faster, do it.
@evantahler evantahler added the team/destinations Destinations team's backlog label Sep 26, 2023
@aaronsteers
Copy link
Collaborator

aaronsteers commented Sep 26, 2023

Here is an example MERGE statement. Note: I haven't tried to compile this; it may have bugs or syntax errors.

This is the 'deduping' version of the query, where we need to replace old record versions and not just append those new record versions.

Show/Hide Query
MERGE INTO "V2-INTERNAL-STAGING"."USERS" AS DEST
  USING (
    SELECT
      CASE
        WHEN TYPEOF("_airbyte_data":"address") != 'OBJECT'
          THEN NULL
        ELSE "_airbyte_data":"address"
      END
      as "ADDRESS",
      (("_airbyte_data":"occupation")::text) as "OCCUPATION",
      (("_airbyte_data":"gender")::text) as "GENDER",
      (("_airbyte_data":"academic_degree")::text) as "ACADEMIC_DEGREE",
      TRY_CAST(("_airbyte_data":"weight")::text as NUMBER) as "WEIGHT",
      CASE
        WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{4}'
          THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZHTZM')
        WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{2}'
          THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZH')
        WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{4}'
          THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZHTZM')
        WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{2}'
          THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZH')
        ELSE TRY_CAST(("_airbyte_data":"created_at")::TEXT AS TIMESTAMP_TZ)
      END
      as "CREATED_AT",
      (("_airbyte_data":"language")::text) as "LANGUAGE",
      (("_airbyte_data":"telephone")::text) as "TELEPHONE",
      (("_airbyte_data":"title")::text) as "TITLE",
      CASE
        WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{4}'
          THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZHTZM')
        WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{2}'
          THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZH')
        WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{4}'
          THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZHTZM')
        WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{2}'
          THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZH')
        ELSE TRY_CAST(("_airbyte_data":"updated_at")::TEXT AS TIMESTAMP_TZ)
      END
      as "UPDATED_AT",
      (("_airbyte_data":"nationality")::text) as "NATIONALITY",
      (("_airbyte_data":"blood_type")::text) as "BLOOD_TYPE",
      (("_airbyte_data":"name")::text) as "NAME",
      TRY_CAST(("_airbyte_data":"id")::text as NUMBER) as "ID",
      TRY_CAST(("_airbyte_data":"age")::text as NUMBER) as "AGE",
      (("_airbyte_data":"email")::text) as "EMAIL",
      (("_airbyte_data":"height")::text) as "HEIGHT",
      ARRAY_CONSTRUCT_COMPACT(CASE
        WHEN (TYPEOF("_airbyte_data":"address") NOT IN ('NULL', 'NULL_VALUE'))
          AND (CASE
        WHEN TYPEOF("_airbyte_data":"address") != 'OBJECT'
          THEN NULL
        ELSE "_airbyte_data":"address"
      END
      IS NULL)
          THEN 'Problem with `address`'
        ELSE NULL
      END,
      CASE
        WHEN (TYPEOF("_airbyte_data":"occupation") NOT IN ('NULL', 'NULL_VALUE'))
          AND ((("_airbyte_data":"occupation")::text) IS NULL)
          THEN 'Problem with `occupation`'
        ELSE NULL
      END,
      CASE
        WHEN (TYPEOF("_airbyte_data":"gender") NOT IN ('NULL', 'NULL_VALUE'))
          AND ((("_airbyte_data":"gender")::text) IS NULL)
          THEN 'Problem with `gender`'
        ELSE NULL
      END,
      CASE
        WHEN (TYPEOF("_airbyte_data":"academic_degree") NOT IN ('NULL', 'NULL_VALUE'))
          AND ((("_airbyte_data":"academic_degree")::text) IS NULL)
          THEN 'Problem with `academic_degree`'
        ELSE NULL
      END,
      CASE
        WHEN (TYPEOF("_airbyte_data":"weight") NOT IN ('NULL', 'NULL_VALUE'))
          AND (TRY_CAST(("_airbyte_data":"weight")::text as NUMBER) IS NULL)
          THEN 'Problem with `weight`'
        ELSE NULL
      END,
      CASE
        WHEN (TYPEOF("_airbyte_data":"created_at") NOT IN ('NULL', 'NULL_VALUE'))
          AND (CASE
        WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{4}'
          THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZHTZM')
        WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{2}'
          THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZH')
        WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{4}'
          THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZHTZM')
        WHEN ("_airbyte_data":"created_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{2}'
          THEN TO_TIMESTAMP_TZ(("_airbyte_data":"created_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZH')
        ELSE TRY_CAST(("_airbyte_data":"created_at")::TEXT AS TIMESTAMP_TZ)
      END
      IS NULL)
          THEN 'Problem with `created_at`'
        ELSE NULL
      END,
      CASE
        WHEN (TYPEOF("_airbyte_data":"language") NOT IN ('NULL', 'NULL_VALUE'))
          AND ((("_airbyte_data":"language")::text) IS NULL)
          THEN 'Problem with `language`'
        ELSE NULL
      END,
      CASE
        WHEN (TYPEOF("_airbyte_data":"telephone") NOT IN ('NULL', 'NULL_VALUE'))
          AND ((("_airbyte_data":"telephone")::text) IS NULL)
          THEN 'Problem with `telephone`'
        ELSE NULL
      END,
      CASE
        WHEN (TYPEOF("_airbyte_data":"title") NOT IN ('NULL', 'NULL_VALUE'))
          AND ((("_airbyte_data":"title")::text) IS NULL)
          THEN 'Problem with `title`'
        ELSE NULL
      END,
      CASE
        WHEN (TYPEOF("_airbyte_data":"updated_at") NOT IN ('NULL', 'NULL_VALUE'))
          AND (CASE
        WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{4}'
          THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZHTZM')
        WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}(\\+|-)\\d{2}'
          THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SSTZH')
        WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{4}'
          THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZHTZM')
        WHEN ("_airbyte_data":"updated_at")::TEXT REGEXP '\\d{4}-\\d{2}-\\d{2}T(\\d{2}:){2}\\d{2}\\.\\d{1,7}(\\+|-)\\d{2}'
          THEN TO_TIMESTAMP_TZ(("_airbyte_data":"updated_at")::TEXT, 'YYYY-MM-DDTHH24:MI:SS.FFTZH')
        ELSE TRY_CAST(("_airbyte_data":"updated_at")::TEXT AS TIMESTAMP_TZ)
      END
      IS NULL)
          THEN 'Problem with `updated_at`'
        ELSE NULL
      END,
      CASE
        WHEN (TYPEOF("_airbyte_data":"nationality") NOT IN ('NULL', 'NULL_VALUE'))
          AND ((("_airbyte_data":"nationality")::text) IS NULL)
          THEN 'Problem with `nationality`'
        ELSE NULL
      END,
      CASE
        WHEN (TYPEOF("_airbyte_data":"blood_type") NOT IN ('NULL', 'NULL_VALUE'))
          AND ((("_airbyte_data":"blood_type")::text) IS NULL)
          THEN 'Problem with `blood_type`'
        ELSE NULL
      END,
      CASE
        WHEN (TYPEOF("_airbyte_data":"name") NOT IN ('NULL', 'NULL_VALUE'))
          AND ((("_airbyte_data":"name")::text) IS NULL)
          THEN 'Problem with `name`'
        ELSE NULL
      END,
      CASE
        WHEN (TYPEOF("_airbyte_data":"id") NOT IN ('NULL', 'NULL_VALUE'))
          AND (TRY_CAST(("_airbyte_data":"id")::text as NUMBER) IS NULL)
          THEN 'Problem with `id`'
        ELSE NULL
      END,
      CASE
        WHEN (TYPEOF("_airbyte_data":"age") NOT IN ('NULL', 'NULL_VALUE'))
          AND (TRY_CAST(("_airbyte_data":"age")::text as NUMBER) IS NULL)
          THEN 'Problem with `age`'
        ELSE NULL
      END,
      CASE
        WHEN (TYPEOF("_airbyte_data":"email") NOT IN ('NULL', 'NULL_VALUE'))
          AND ((("_airbyte_data":"email")::text) IS NULL)
          THEN 'Problem with `email`'
        ELSE NULL
      END,
      CASE
        WHEN (TYPEOF("_airbyte_data":"height") NOT IN ('NULL', 'NULL_VALUE'))
          AND ((("_airbyte_data":"height")::text) IS NULL)
          THEN 'Problem with `height`'
        ELSE NULL
      END) as "_airbyte_cast_errors",
      "_airbyte_raw_id",
      "_airbyte_extracted_at",
      CURRENT_TIME() as "_airbyte_loaded_at"
    FROM "airbyte_internal"."v2-internal-staging_raw__stream_users"
  ) STAGING
    ON STAGING.ID = DEST.ID
  WHEN NOT MATCHED THEN INSERT (
    "ADDRESS",
    "OCCUPATION",
    "GENDER",
    "ACADEMIC_DEGREE",
    "WEIGHT",
    "CREATED_AT",
    "LANGUAGE",
    "TELEPHONE",
    "TITLE",
    "UPDATED_AT",
    "NATIONALITY",
    "BLOOD_TYPE",
    "NAME",
    "ID",
    "AGE",
    "EMAIL",
    "HEIGHT",
    "_AIRBYTE_META",
    "_AIRBYTE_RAW_ID",
    "_AIRBYTE_EXTRACTED_AT",
    "_AIRBYTE_LOADED_AT"
  ) VALUES (
    STAGING."ADDRESS",
    STAGING."OCCUPATION",
    STAGING."GENDER",
    STAGING."ACADEMIC_DEGREE",
    STAGING."WEIGHT",
    STAGING."CREATED_AT",
    STAGING."LANGUAGE",
    STAGING."TELEPHONE",
    STAGING."TITLE",
    STAGING."UPDATED_AT",
    STAGING."NATIONALITY",
    STAGING."BLOOD_TYPE",
    STAGING."NAME",
    STAGING."ID",
    STAGING."AGE",
    STAGING."EMAIL",
    STAGING."HEIGHT",
    STAGING."_AIRBYTE_META",
    STAGING."_AIRBYTE_RAW_ID",
    STAGING."_AIRBYTE_EXTRACTED_AT",
    STAGING."_AIRBYTE_LOADED_AT"
  )
  WHEN MATCHED THEN
    UPDATE SET
      "ADDRESS" = STAGING."ADDRESS",
      "OCCUPATION" = STAGING."OCCUPATION",
      "GENDER" = STAGING."GENDER",
      "ACADEMIC_DEGREE" = STAGING."ACADEMIC_DEGREE",
      "WEIGHT" = STAGING."WEIGHT",
      "CREATED_AT" = STAGING."CREATED_AT",
      "LANGUAGE" = STAGING."LANGUAGE",
      "TELEPHONE" = STAGING."TELEPHONE",
      "TITLE" = STAGING."TITLE",
      "UPDATED_AT" = STAGING."UPDATED_AT",
      "NATIONALITY" = STAGING."NATIONALITY",
      "BLOOD_TYPE" = STAGING."BLOOD_TYPE",
      "NAME" = STAGING."NAME",
      "ID" = STAGING."ID",
      "AGE" = STAGING."AGE",
      "EMAIL" = STAGING."EMAIL",
      "HEIGHT" = STAGING."HEIGHT",
      "_AIRBYTE_META" = STAGING."_AIRBYTE_META",
      "_AIRBYTE_RAW_ID" = STAGING."_AIRBYTE_RAW_ID",
      "_AIRBYTE_EXTRACTED_AT" = STAGING."_AIRBYTE_EXTRACTED_AT",
      "_AIRBYTE_LOADED_AT" = STAGING."_AIRBYTE_LOADED_AT"
;

@evantahler
Copy link
Contributor Author

Previous work #24973

@edgao edgao self-assigned this Oct 13, 2023
@edgao
Copy link
Contributor

edgao commented Oct 13, 2023

initial experiment - manually ran a full refresh T+D query. First, current behavior:
image
(about 5.2GB total)

And comparing against a handwritten merge statement:
image
(4.3GB total)

up next: performance comparison for incremental load

the merge statement
MERGE edgao_test.purchases_final
USING (
  WITH intermediate_data AS (
    SELECT
  CAST(JSON_VALUE(`_airbyte_data`, '$."updated_at"') as TIMESTAMP) as `updated_at`,
  CAST(JSON_VALUE(`_airbyte_data`, '$."purchased_at"') as TIMESTAMP) as `purchased_at`,
  CAST(JSON_VALUE(`_airbyte_data`, '$."user_id"') as INT64) as `user_id`,
  CAST(JSON_VALUE(`_airbyte_data`, '$."returned_at"') as TIMESTAMP) as `returned_at`,
  CAST(JSON_VALUE(`_airbyte_data`, '$."product_id"') as INT64) as `product_id`,
  CAST(JSON_VALUE(`_airbyte_data`, '$."created_at"') as TIMESTAMP) as `created_at`,
  CAST(JSON_VALUE(`_airbyte_data`, '$."id"') as INT64) as `id`,
  CAST(JSON_VALUE(`_airbyte_data`, '$."added_to_cart_at"') as TIMESTAMP) as `added_to_cart_at`,
    [] AS column_errors,
    _airbyte_raw_id,
    _airbyte_extracted_at
    FROM `destinationsv2-perf-testing`.`edgao_test`.`purchases_raw`
    WHERE
      _airbyte_loaded_at IS NULL
      
  )
  SELECT
  `updated_at`,
  `purchased_at`,
  `user_id`,
  `returned_at`,
  `product_id`,
  `created_at`,
  `id`,
  `added_to_cart_at`,
    to_json(struct(COALESCE((SELECT ARRAY_AGG(unnested_column_errors IGNORE NULLS) FROM UNNEST(column_errors) unnested_column_errors), []) AS errors)) AS _airbyte_meta,
    _airbyte_raw_id,
    _airbyte_extracted_at
  FROM intermediate_data
) new_records
ON purchases_final.id = new_records.id
WHEN MATCHED AND purchases_final.updated_at < new_records.updated_at THEN
  UPDATE SET
    updated_at = new_records.`updated_at`,
    purchased_at = new_records.`purchased_at`,
    user_id = new_records.`user_id`,
    returned_at = new_records.`returned_at`,
    product_id = new_records.`product_id`,
    created_at = new_records.`created_at`,
    id = new_records.`id`,
    added_to_cart_at = new_records.`added_to_cart_at`,
    _airbyte_meta = new_records._airbyte_meta,
    _airbyte_raw_id = new_records._airbyte_raw_id,
    _airbyte_extracted_at = new_records._airbyte_extracted_at
WHEN NOT MATCHED THEN
  INSERT (
    `updated_at`,
    `purchased_at`,
    `user_id`,
    `returned_at`,
    `product_id`,
    `created_at`,
    `id`,
    `added_to_cart_at`,
    _airbyte_meta,
    _airbyte_raw_id,
    _airbyte_extracted_at
  ) VALUES (
    new_records.`updated_at`,
    new_records.`purchased_at`,
    new_records.`user_id`,
    new_records.`returned_at`,
    new_records.`product_id`,
    new_records.`created_at`,
    new_records.`id`,
    new_records.`added_to_cart_at`,
    new_records._airbyte_meta,
    new_records._airbyte_raw_id,
    new_records._airbyte_extracted_at
  );

@edgao
Copy link
Contributor

edgao commented Oct 16, 2023

incremental setup

update edgao_test.purchases_raw
set _airbyte_loaded_at = null, _airbyte_extracted_at = current_timestamp
where starts_with(_airbyte_raw_id, '1');
delete from edgao_test.purchases_final
where starts_with(_airbyte_raw_id, '1');

(these include the _airbyte_extracted_at > condition from #31191)

incremental old behavior:
image
(2.4GB)

incremental using merge:
image
(1.6GB)

@edgao
Copy link
Contributor

edgao commented Oct 16, 2023

tl;dr we're using one less statement (2 statements in CDC mode) so total time should be less, and we're saving a decent amount of bytes processed. Will productionize these queries for bigquery.

... less clear how to do the same for snowflake, that's a problem for later in the week :P

@edgao
Copy link
Contributor

edgao commented Oct 27, 2023

done in #31529 (bigquery) and #31683 (snowflake)

@edgao edgao closed this as completed Oct 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
team/destinations Destinations team's backlog
Projects
None yet
3 participants