Skip to content

AIP-76: Consume task-emitted partition keys on asset events#66782

Open
anishgirianish wants to merge 1 commit into
apache:mainfrom
anishgirianish:aip-76-partition-at-runtime-server-consumer
Open

AIP-76: Consume task-emitted partition keys on asset events#66782
anishgirianish wants to merge 1 commit into
apache:mainfrom
anishgirianish:aip-76-partition-at-runtime-server-consumer

Conversation

@anishgirianish
Copy link
Copy Markdown
Contributor

@anishgirianish anishgirianish commented May 12, 2026


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Tasks can record per-emission partition keys via outlet_events[asset].add_partitions(...) (shipped in #65447).
Persist each on the matching AssetEvent row, fanning runtime fan-out emissions into one event per key, and back-fill
DagRun.partition_key when the task emitted exactly one key on a run that had none.

closes: #58474
related: #44146 #65300

cc @Lee-W


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

# tell the truth if for some reason it touches a different partition?
# https://github.com/apache/airflow/issues/58474
partition_key = ti.dag_run.partition_key
events_by_asset: dict[SerializedAssetUniqueKey, list[tuple[dict, str | None]]] = defaultdict(list)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be easier to read this way

Suggested change
events_by_asset: dict[SerializedAssetUniqueKey, list[tuple[dict, str | None]]] = defaultdict(list)
payloads_by_asset: dict[SerializedAssetUniqueKey, list[OutletEventPayload] = defaultdict(list)
 class OutletEventPayload(NamedTuple):
      extra: dict
      partition_key: str | None

partition_key = ti.dag_run.partition_key
events_by_asset: dict[SerializedAssetUniqueKey, list[tuple[dict, str | None]]] = defaultdict(list)
for outlet_event in outlet_events:
if "source_alias_name" in outlet_event:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if "source_alias_name" in outlet_event:
# Alias-emitted events are handled separately further down via
# register_asset_change_for_alias, which uses the DagRun-level
# partition_key. Per-emission partition keys do not fan out through
# the alias path — emission via an alias produces one event per
# resolved asset, all carrying the same dag_run_partition_key.
if "source_alias_name" in outlet_event:

runtime_pks: set[str] = {
pk for events in events_by_asset.values() for _, pk in events if pk is not None
}
if len(runtime_pks) == 1 and ti.dag_run.partition_key is None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also add a comment here to explain what we're doing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Manipulate partitioned asset events from task context

2 participants