Skip to content

AIP-76: Consume task-emitted partition keys on asset events#66782

Draft
anishgirianish wants to merge 4 commits into
apache:mainfrom
anishgirianish:aip-76-partition-at-runtime-server-consumer
Draft

AIP-76: Consume task-emitted partition keys on asset events#66782
anishgirianish wants to merge 4 commits into
apache:mainfrom
anishgirianish:aip-76-partition-at-runtime-server-consumer

Conversation

@anishgirianish
Copy link
Copy Markdown
Contributor

@anishgirianish anishgirianish commented May 12, 2026


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Tasks can record per-emission partition keys via outlet_events[asset].add_partitions(...) (shipped in #65447).
Persist each on the matching AssetEvent row, fanning runtime fan-out emissions into one event per key, and back-fill
DagRun.partition_key when the task emitted exactly one key on a run that had none.

closes: #58474
related: #44146 #65300

cc @Lee-W


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

Comment thread airflow-core/src/airflow/models/taskinstance.py Outdated
Comment thread airflow-core/src/airflow/models/taskinstance.py
Comment thread airflow-core/src/airflow/models/taskinstance.py
@anishgirianish anishgirianish force-pushed the aip-76-partition-at-runtime-server-consumer branch from 40b2e65 to 5d12baa Compare May 13, 2026 18:14
@anishgirianish anishgirianish requested a review from Lee-W May 13, 2026 18:17
@anishgirianish
Copy link
Copy Markdown
Contributor Author

Hi @Lee-W, thank you so much for the review. I have addressed the feedback in the latest push. I would like to request you for your re-review whenever you get a chance. Thank you.

Comment thread airflow-core/src/airflow/models/taskinstance.py Outdated
Comment thread airflow-core/src/airflow/models/taskinstance.py Outdated
@anishgirianish anishgirianish force-pushed the aip-76-partition-at-runtime-server-consumer branch from 5d12baa to e8cdd96 Compare May 15, 2026 05:03
@anishgirianish anishgirianish force-pushed the aip-76-partition-at-runtime-server-consumer branch from e8cdd96 to 07b810b Compare May 15, 2026 07:18
Copy link
Copy Markdown
Member

@Lee-W Lee-W left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly good but i'll change it to draft as this kinda blocked by the other PR

# and the DagRun did not already have one set. This lets a task that
# discovers the partition at runtime (rather than via params) act as
# the source of truth for the DagRun-level key.
if len(runtime_pks) == 1 and ti.dag_run.partition_key is None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still thinking whether it's the best way 🤔 but we can keep it for now. I'll create an issue these days and we can discuss there

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, thank you! Happy to discuss on the issue whenever you open it.

@Lee-W Lee-W requested a review from uranusjr May 15, 2026 08:13
@Lee-W Lee-W moved this to In Review in AIP-76 Asset Partitioning May 15, 2026
@anishgirianish anishgirianish marked this pull request as draft May 15, 2026 15:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: In Review

Development

Successfully merging this pull request may close these issues.

Manipulate partitioned asset events from task context

2 participants