From c76651baad2f24a8f7ba2bcdbfa465bab59af8a2 Mon Sep 17 00:00:00 2001 From: Rex Ledesma Date: Tue, 16 Jul 2024 11:20:27 -0400 Subject: [PATCH] feat(dbt): support column lineage for self-referential incremental models (#23024) ## Summary & Motivation When writing an incremental model, you can use `{{ this }}` as a Jinja variable to self-reference the current model when generating the next batch of incremental rows. This made building column lineage fail, as we did not also pass the schema of the current model to the `sqlglot` optimization function. This fixes that. ## How I Tested These Changes pytest -- updated incremental test would fail if current model's schema was not passed into `sqlglot` optimization. --- .../dagster_dbt/core/resources_v2.py | 11 +++++++++- .../dbt_packages/test_columns_metadata.py | 3 +++ .../models/incremental_orders.sql | 21 +++++++++++++++---- 3 files changed, 30 insertions(+), 5 deletions(-) diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py b/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py index 00125729a9eb4..9a0f59989f95f 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py @@ -829,7 +829,16 @@ def _build_column_lineage_metadata( # 1. Retrieve the current node's SQL file and its parents' column schemas. sql_dialect = manifest["metadata"]["adapter_type"] sqlglot_mapping_schema = MappingSchema(dialect=sql_dialect) - for parent_relation_name, parent_relation_metadata in event_history_metadata.parents.items(): + + parent_relation_metadata_by_relation_name = { + **event_history_metadata.parents, + # Include the current node's column schema to optimize self-referential models. + dbt_resource_props["relation_name"]: event_history_metadata.columns, + } + for ( + parent_relation_name, + parent_relation_metadata, + ) in parent_relation_metadata_by_relation_name.items(): sqlglot_mapping_schema.add_table( table=to_table(parent_relation_name, dialect=sql_dialect), column_mapping={ diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/dbt_packages/test_columns_metadata.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/dbt_packages/test_columns_metadata.py index b479d4cd6562e..eab17db0fbedf 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/dbt_packages/test_columns_metadata.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/dbt_packages/test_columns_metadata.py @@ -325,6 +325,9 @@ def test_metadata_manifest_bigquery_fixture() -> Dict[str, Any]: AssetKey(["incremental_orders"]): TableColumnLineage( deps_by_column={ "order_id": [TableColumnDep(asset_key=AssetKey(["orders"]), column_name="order_id")], + "order_date": [ + TableColumnDep(asset_key=AssetKey(["orders"]), column_name="order_date") + ], } ), AssetKey(["customers"]): TableColumnLineage( diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/models/incremental_orders.sql b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/models/incremental_orders.sql index 4a187dc9778a3..c4e59e24cdfd8 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/models/incremental_orders.sql +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/dbt_projects/test_dagster_metadata/models/incremental_orders.sql @@ -1,9 +1,22 @@ {{ config( - materialized='incremental', - unique_key='order_id', - incremental_strategy='append', + materialized='incremental' ) }} -select order_id from {{ ref('orders') }} +with max_order_date as ( + select + max(order_date) as max_order_date + from {{ this }} +) + +select + order_id, + order_date +from {{ ref('orders') }} + +{% if is_incremental() %} + +where order_date >= (select max_order_date from max_order_date) + +{% endif %}