Skip to content

Commit

Permalink
feat(dbt): support column lineage for self-referential incremental mo…
Browse files Browse the repository at this point in the history
…dels (#23024)

## Summary & Motivation
When writing an incremental model, you can use `{{ this }}` as a Jinja
variable to self-reference the current model when generating the next
batch of incremental rows. This made building column lineage fail, as we
did not also pass the schema of the current model to the `sqlglot`
optimization function.

This fixes that.

## How I Tested These Changes
pytest -- updated incremental test would fail if current model's schema
was not passed into `sqlglot` optimization.
  • Loading branch information
rexledesma authored Jul 16, 2024
1 parent 5e0e3fd commit c76651b
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -829,7 +829,16 @@ def _build_column_lineage_metadata(
# 1. Retrieve the current node's SQL file and its parents' column schemas.
sql_dialect = manifest["metadata"]["adapter_type"]
sqlglot_mapping_schema = MappingSchema(dialect=sql_dialect)
for parent_relation_name, parent_relation_metadata in event_history_metadata.parents.items():

parent_relation_metadata_by_relation_name = {
**event_history_metadata.parents,
# Include the current node's column schema to optimize self-referential models.
dbt_resource_props["relation_name"]: event_history_metadata.columns,
}
for (
parent_relation_name,
parent_relation_metadata,
) in parent_relation_metadata_by_relation_name.items():
sqlglot_mapping_schema.add_table(
table=to_table(parent_relation_name, dialect=sql_dialect),
column_mapping={
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,9 @@ def test_metadata_manifest_bigquery_fixture() -> Dict[str, Any]:
AssetKey(["incremental_orders"]): TableColumnLineage(
deps_by_column={
"order_id": [TableColumnDep(asset_key=AssetKey(["orders"]), column_name="order_id")],
"order_date": [
TableColumnDep(asset_key=AssetKey(["orders"]), column_name="order_date")
],
}
),
AssetKey(["customers"]): TableColumnLineage(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
{{
config(
materialized='incremental',
unique_key='order_id',
incremental_strategy='append',
materialized='incremental'
)
}}

select order_id from {{ ref('orders') }}
with max_order_date as (
select
max(order_date) as max_order_date
from {{ this }}
)

select
order_id,
order_date
from {{ ref('orders') }}

{% if is_incremental() %}

where order_date >= (select max_order_date from max_order_date)

{% endif %}

0 comments on commit c76651b

Please sign in to comment.