Skip to content

AIP-76: Add PartitionAtRuntime authoring API to Task SDK#65447

Open
anishgirianish wants to merge 13 commits into
apache:mainfrom
anishgirianish:aip-76-partition-at-runtime-authoring-api
Open

AIP-76: Add PartitionAtRuntime authoring API to Task SDK#65447
anishgirianish wants to merge 13 commits into
apache:mainfrom
anishgirianish:aip-76-partition-at-runtime-authoring-api

Conversation

@anishgirianish
Copy link
Copy Markdown
Contributor

@anishgirianish anishgirianish commented Apr 17, 2026


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

First of three PRs decomposing draft #65300 for AIP-76, per the design agreed in #44146.

Adds the Task SDK producer surface for runtime partition keys:

  • PartitionAtRuntime() marker timetable, with a new partitioned_at_runtime: bool flag on BaseTimetable so downstream code can detect the marker without isinstance checks.
  • outlet_events kwarg injection inside @asset / @asset.multi — the parameter is populated with the runtime OutletEventAccessors, and self,context, and outlet_events are reserved parameter names that are not treated as inlet asset references.
  • OutletEventAccessor.partition_keys: set[str] plus accessor.add_partitions(key_or_keys) for recording one or more partition keys against an emitted asset event. Inside an @asset function the idiomatic call is outlet_events[self].add_partitions("us") (or outlet_events[self].partition_keys = {"us", "eu"}); self here is a plain Asset.
  • Worker-side wire format: _serialize_outlet_events now fans the recorded partition_keys out into one asset-event payload per partition_key (singular) on the existing column. Duplicate keys recorded within the same task instance collapse to a single event.

The server-side join, partition-aware consumption / fan-in by the scheduler — lands in PR 2, along with the example DAG and end-user docs in PR 3.

cc: @Lee-W

related: #44146
related: #65300


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

@anishgirianish anishgirianish force-pushed the aip-76-partition-at-runtime-authoring-api branch from 38335fb to e71d7e3 Compare April 17, 2026 22:21
@anishgirianish anishgirianish marked this pull request as draft April 17, 2026 23:26
@anishgirianish anishgirianish force-pushed the aip-76-partition-at-runtime-authoring-api branch from 939a95e to 08d1a66 Compare April 17, 2026 23:34
@anishgirianish anishgirianish marked this pull request as ready for review April 18, 2026 01:39
@anishgirianish anishgirianish force-pushed the aip-76-partition-at-runtime-authoring-api branch from 08d1a66 to ee3ee08 Compare April 18, 2026 01:40
Copy link
Copy Markdown
Member

@Lee-W Lee-W left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we also need airflow-core counter part for PartitionAtRuntime

Comment thread task-sdk/src/airflow/sdk/execution_time/task_runner.py
Comment thread task-sdk/src/airflow/sdk/__init__.py Outdated
@potiuk potiuk marked this pull request as draft April 22, 2026 19:35
@potiuk
Copy link
Copy Markdown
Member

potiuk commented Apr 22, 2026

@anishgirianish Converting to draft — this PR doesn't yet meet our Pull Request quality criteria.

  • Unresolved review comments (2 threads from maintainers): please walk through each unresolved review thread. Even if a suggestion looks incorrect or irrelevant — and some of them will be, especially any comments left by automated reviewers like GitHub Copilot — it is still the author's responsibility to respond: apply the fix, reply in-thread with a brief explanation of why the suggestion does not apply, or resolve the thread if the feedback is no longer relevant. Leaving threads unaddressed for weeks blocks the PR from moving forward.

See the linked criteria for how to fix each item, then mark the PR "Ready for review". This is not a rejection — just an invitation to bring the PR up to standard. No rush.


Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you.

@anishgirianish anishgirianish force-pushed the aip-76-partition-at-runtime-authoring-api branch 3 times, most recently from 2a7a684 to 5685717 Compare April 22, 2026 20:27
@potiuk
Copy link
Copy Markdown
Member

potiuk commented Apr 22, 2026

Quick follow-up to the triage comment above — one clarification on the "Unresolved review comments" item:

Once you believe a thread has been addressed — whether by pushing a fix, or by replying in-thread with an explanation of why the suggestion doesn't apply — please mark the thread as resolved yourself by clicking the "Resolve conversation" button at the bottom of each thread. Reviewers don't auto-close their own threads, so an addressed-but-unresolved thread reads as "still waiting on the author" and keeps the PR from moving forward. The author doing the resolve-click is the expected convention on this project.


Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you.

@anishgirianish
Copy link
Copy Markdown
Contributor Author

Quick follow-up to the triage comment above — one clarification on the "Unresolved review comments" item:

Once you believe a thread has been addressed — whether by pushing a fix, or by replying in-thread with an explanation of why the suggestion doesn't apply — please mark the thread as resolved yourself by clicking the "Resolve conversation" button at the bottom of each thread. Reviewers don't auto-close their own threads, so an addressed-but-unresolved thread reads as "still waiting on the author" and keeps the PR from moving forward. The author doing the resolve-click is the expected convention on this project.

Sure, thank you.

@anishgirianish anishgirianish force-pushed the aip-76-partition-at-runtime-authoring-api branch from 5685717 to a9ceef5 Compare April 22, 2026 23:12
@anishgirianish
Copy link
Copy Markdown
Contributor Author

I think we also need airflow-core counter part for PartitionAtRuntime

Done, added PartitionAtRuntime(NullTimetable) in airflow/timetables/simple.py and registered it in serialization/encoders.py.

Thank you

@anishgirianish anishgirianish marked this pull request as ready for review April 23, 2026 00:30
@anishgirianish
Copy link
Copy Markdown
Contributor Author

@Lee-W Thank you so much for the review. addressed all three comments, ready for another look whenever you get a chance, thanks.

@anishgirianish anishgirianish requested a review from Lee-W April 23, 2026 00:32
@potiuk potiuk added the ready for maintainer review Set after triaging when all criteria pass. label Apr 27, 2026
@Lee-W Lee-W moved this to In Progress in AIP-76 Asset Partitioning Apr 28, 2026
@Lee-W Lee-W moved this from In Progress to In Review in AIP-76 Asset Partitioning Apr 28, 2026
@anishgirianish anishgirianish force-pushed the aip-76-partition-at-runtime-authoring-api branch from 927f68b to 7573eef Compare April 29, 2026 12:12
Copy link
Copy Markdown
Member

@Lee-W Lee-W left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm understanding it correctly, we're only adding the interface but not actually making it work yet? (since we did not touch the code in scheduler and use partitioned_at_runtime)

Comment thread task-sdk/src/airflow/sdk/definitions/asset/__init__.py Outdated
Comment thread airflow-core/src/airflow/timetables/simple.py
Comment thread task-sdk/src/airflow/sdk/execution_time/context.py Outdated
Comment thread task-sdk/tests/task_sdk/definitions/test_dag.py Outdated
Comment thread task-sdk/src/airflow/sdk/definitions/asset/decorators.py Outdated
Copy link
Copy Markdown
Member

@Lee-W Lee-W left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mostly good from my end and would need @uranusjr 's help for another round.

also please take a look at airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py and contributing-docs/19_execution_api_versioning.rst. it's the last missing piece from me :)

Comment thread task-sdk/src/airflow/sdk/definitions/asset/decorators.py Outdated
Comment thread task-sdk/src/airflow/sdk/definitions/asset/decorators.py Outdated
@anishgirianish anishgirianish force-pushed the aip-76-partition-at-runtime-authoring-api branch 2 times, most recently from 93b5805 to 6e9a635 Compare May 4, 2026 16:40
@anishgirianish anishgirianish requested a review from Lee-W May 5, 2026 05:38
Comment thread airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset_event.py Outdated
Comment thread task-sdk/src/airflow/sdk/definitions/asset/decorators.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/task_runner.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/context.py Outdated
Comment thread airflow-core/src/airflow/api_fastapi/execution_api/datamodels/asset_event.py Outdated
Comment thread airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py Outdated
Comment thread task-sdk/src/airflow/sdk/definitions/asset/decorators.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/context.py Outdated
Comment thread task-sdk/src/airflow/sdk/execution_time/task_runner.py Outdated
@anishgirianish anishgirianish force-pushed the aip-76-partition-at-runtime-authoring-api branch 2 times, most recently from 30b21ec to 7587a39 Compare May 11, 2026 16:25
@anishgirianish anishgirianish force-pushed the aip-76-partition-at-runtime-authoring-api branch from 7587a39 to 4318045 Compare May 11, 2026 17:55
@anishgirianish anishgirianish requested a review from uranusjr May 11, 2026 23:21
@anishgirianish
Copy link
Copy Markdown
Contributor Author

Hi @uranusjr , @Lee-W thank you so much for the review. I have addressed all the feedback and updated the pr. Would like to request you for your re-reivew whenever you get a chance.

Thank you

Copy link
Copy Markdown
Member

@Lee-W Lee-W left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Maybe we can start work on the scheduler side, based on the current PR design, on a separate branch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:task-sdk ready for maintainer review Set after triaging when all criteria pass.

Projects

Status: In Review

Development

Successfully merging this pull request may close these issues.

4 participants