Skip to content

AIP-76: Hold Dag run until all upstream partitions arrive#64571

Open
Lee-W wants to merge 16 commits intoapache:mainfrom
astronomer:asset-partition-window
Open

AIP-76: Hold Dag run until all upstream partitions arrive#64571
Lee-W wants to merge 16 commits intoapache:mainfrom
astronomer:asset-partition-window

Conversation

@Lee-W
Copy link
Copy Markdown
Member

@Lee-W Lee-W commented Apr 1, 2026

Why

Closes: #59294

Why

Asset-partitioned Dags that aggregate many upstream slices into one downstream period (e.g., 60-minute-level events rolling up into one hourly Dag run) had no way to express that requirement — the scheduler would fire the downstream run as soon as any single upstream partition arrived.

This PR implements the rollup building block from AIP-76: a Window type that enumerates the full set of upstream partitions required for a downstream period, a RollupMapper that wires a source mapper to a window, and the scheduler logic to gate Dag runs until every required upstream key is present.

What

Partition mappers/windows

  • Add Window ABC and six concrete implementations (HourWindow, DayWindow, WeekWindow, MonthWindow, QuarterWindow, YearWindow) to both airflow-core and the Task SDK
  • Add RollupMapper that composes a source_mapper with a Window and exposes to_upstream(downstream_key) → frozenset[str]
  • Add decode_downstream / encode_upstream hooks to PartitionMapper and implement them in _BaseTemporalMapper; StartOfWeekMapper gets a regex-based override because %V is ambiguous with strptime.
  • Add week_start parameter to StartOfWeekMapper for non-Monday week starts

Scheduler

  • Rewrite _create_dagruns_for_partitioned_asset_dags to bulk-fetch serialized Dags and partition-key logs, removing N+1 queries, and cap per-tick work at MAX_PARTITION_DAG_RUNS_PER_TICK
  • Add _resolve_asset_partition_status / _check_rollup_asset_status to evaluate rollup satisfaction; non-rollup assets continue to satisfy immediately

Serialization

  • Add encode_window / decode_window and extend mapper encoder/decoder to round-trip RollupMapper and all Window subclasses

UI / API

  • Enrich next_run_assets endpoint with per-asset received_count, required_count, received_keys, required_keys, and is_rollup for partitioned Dags
  • Update AssetNode and AssetSchedule components to surface rollup progress (e.g. "12 / 24 received")
  • Add AssetProgressCell for inline progress in the Dags list

Was generative AI tooling used to co-author this PR?
  • Yes — Claude Sonnet 4.6

Generated-by: Claude Sonnet 4.6 following the guidelines

with DAG(
    dag_id="daily_team_a_rollup",
    schedule=PartitionedAssetTimetable(
        assets=team_a_player_stats,
        default_partition_mapper=RollupMapper(
            source_mapper=StartOfDayMapper(),
            window=DayWindow(),
        ),
    ),
    catchup=False,
    tags=["player-stats", "rollup"],
):
    """
    First rollup level: 24 hourly partitions of ``team_a_player_stats`` → one daily summary.

    ``StartOfDayMapper`` normalizes each upstream hourly timestamp (``%Y-%m-%dT%H:%M:%S``)
    to its day-start (``%Y-%m-%d``); ``DayWindow`` declares the downstream run needs
    all 24 hourly partitions before firing. Publishes ``daily_team_a`` so the
    monthly rollup below can consume it.
    """

    @task(outlets=[daily_team_a])
    def summarise_team_a_day(dag_run=None):
        """Produce the full-day rollup once every hour has arrived."""
        if TYPE_CHECKING:
            assert dag_run
        print(f"All 24 hourly partitions received. Day: {dag_run.partition_key}")

    summarise_team_a_day()
image image
  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

@boring-cyborg boring-cyborg Bot added area:Scheduler including HA (high availability) scheduler area:task-sdk labels Apr 1, 2026
@Lee-W Lee-W changed the title feat(AIP-76): window feat(AIP-76): implement to_upstream Apr 1, 2026
@kaxil kaxil requested a review from Copilot April 2, 2026 00:41
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Implements “rollup” support for partition mappers (AIP-76) by introducing a RollupMapper interface with to_upstream() and using it in the scheduler to wait for a complete set of upstream partition keys before creating partitioned asset-triggered DAG runs.

Changes:

  • Add RollupMapper base class (core + task SDK) with an abstract to_upstream() contract.
  • Implement to_upstream() for weekly and monthly temporal mappers (core + task SDK).
  • Update the scheduler’s partitioned-asset DAG-run creation logic to enforce rollup completeness when a mapper supports it.

Reviewed changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 11 comments.

Show a summary per file
File Description
task-sdk/src/airflow/sdk/definitions/partition_mappers/base.py Introduces SDK-side RollupMapper abstraction.
task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py Adds SDK to_upstream() for week/month temporal rollups.
airflow-core/src/airflow/timetables/base.py Adds get_partition_mapper() hook to the Timetable protocol.
airflow-core/src/airflow/partition_mappers/base.py Introduces core-side RollupMapper abstraction.
airflow-core/src/airflow/partition_mappers/temporal.py Adds core to_upstream() for week/month temporal rollups.
airflow-core/src/airflow/jobs/scheduler_job_runner.py Uses rollup mapper behavior to decide when partitioned asset-triggered DAG runs are ready.

Comment thread airflow-core/src/airflow/jobs/scheduler_job_runner.py Outdated
Comment thread airflow-core/src/airflow/jobs/scheduler_job_runner.py Outdated
Comment thread airflow-core/src/airflow/jobs/scheduler_job_runner.py Outdated
Comment thread airflow-core/src/airflow/jobs/scheduler_job_runner.py Outdated
Comment thread airflow-core/src/airflow/partition_mappers/temporal.py Outdated
Comment thread airflow-core/src/airflow/partition_mappers/temporal.py Outdated
Comment thread airflow-core/src/airflow/partition_mappers/temporal.py Outdated
Comment thread task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py Outdated
Comment thread task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py Outdated
Comment thread airflow-core/src/airflow/jobs/scheduler_job_runner.py Outdated
@Lee-W Lee-W force-pushed the asset-partition-window branch 2 times, most recently from e72dfa6 to e6d53f2 Compare April 7, 2026 09:57
@Lee-W
Copy link
Copy Markdown
Member Author

Lee-W commented Apr 7, 2026

from __future__ import annotations

from airflow.sdk import (
    DAG,
    Asset,
    CronPartitionTimetable,
    PartitionedAssetTimetable,
    WeeklyRollupMapper,
    task,
)

daily_sales = Asset(uri="file://incoming/sales/daily.csv", name="daily_sales")

# Upstream Dag: produces one partition per day (key format: "2024-01-15T00:00:00")
with DAG(
    dag_id="ingest_daily_sales",
    schedule=CronPartitionTimetable("0 0 * * *", timezone="UTC"),
):

    @task(outlets=[daily_sales])
    def ingest():
        pass

    ingest()


# Downstream Dag: runs once all 7 daily partitions for a week have arrived
with DAG(
    dag_id="weekly_sales_report",
    schedule=PartitionedAssetTimetable(
        assets=daily_sales,
        default_partition_mapper=WeeklyRollupMapper(),
    ),
    catchup=False,
):

    @task
    def generate_report(dag_run=None):
        # dag_run.partition_key will be the week key, e.g. "2024-01-15 (W03)"
        print(dag_run.partition_key)

    generate_report()

@Lee-W Lee-W force-pushed the asset-partition-window branch from 91c4ac3 to 93e82cb Compare April 7, 2026 11:48
@Lee-W
Copy link
Copy Markdown
Member Author

Lee-W commented Apr 7, 2026

The backend part is basically wrapped up, but the frontend and API side need some work. The UI is quite weired for these cases now

@Lee-W Lee-W force-pushed the asset-partition-window branch from 9934a73 to f52823c Compare April 10, 2026 09:09
@kaxil kaxil requested a review from Copilot April 10, 2026 19:55
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 21 out of 21 changed files in this pull request and generated 13 comments.

Comments suppressed due to low confidence (1)

airflow-core/src/airflow/api_fastapi/core_api/routes/ui/partitioned_dag_runs.py:1

  • Counting PartitionedAssetKeyLog.id can over-count when duplicate log rows exist for the same upstream partition key (e.g. retries/dup inserts), inflating total_received and potentially showing the run as satisfiable earlier than it should be. Consider counting distinct PartitionedAssetKeyLog.source_partition_key (and/or a distinct composite of (asset_id, source_partition_key)) to match the scheduler’s set-based satisfaction semantics.
# Licensed to the Apache Software Foundation (ASF) under one

Comment thread task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py Outdated
Comment thread task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py Outdated
Comment thread task-sdk/src/airflow/sdk/definitions/partition_mappers/base.py Outdated
Comment thread airflow-core/src/airflow/partition_mappers/temporal.py
Comment thread airflow-core/src/airflow/partition_mappers/temporal.py Outdated
Comment thread airflow-core/src/airflow/ui/src/components/AssetProgressCell.tsx Outdated
Comment thread airflow-core/tests/unit/partition_mappers/test_temporal.py Outdated
Comment thread airflow-core/tests/unit/partition_mappers/test_temporal.py
Comment thread airflow-core/src/airflow/api_fastapi/core_api/routes/ui/partitioned_dag_runs.py Outdated
Comment thread airflow-core/src/airflow/jobs/scheduler_job_runner.py
@Lee-W Lee-W force-pushed the asset-partition-window branch 9 times, most recently from a64d06a to 9064515 Compare April 17, 2026 12:12
@Lee-W Lee-W marked this pull request as ready for review April 20, 2026 06:32
@Lee-W Lee-W force-pushed the asset-partition-window branch 4 times, most recently from c7a53ce to 6da11ca Compare May 6, 2026 00:57
@Lee-W Lee-W requested a review from uranusjr May 6, 2026 04:19
@Lee-W Lee-W force-pushed the asset-partition-window branch from 6da11ca to dba6e01 Compare May 6, 2026 04:20
return NextRunAssetsResponse(asset_expression=dag_model.asset_expression, events=events)

# Partitioned Dags: enrich with per-asset received/required counts and rollup flag.
timetable = _load_timetable(dag_id, session)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do’t like we need to load the timetable here… Is there a way to pre-calculate this somewhere?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you think if we save this to DagModel.partition_mapper_info?

 {
    "name, uri": {
      "is_rollup": true,
      "runtime_partitioned": false,
    },
    "name, uri": {
      "is_rollup": false,
      "runtime_partitioned": false,
    }
  }

something like this?

Comment on lines 217 to 220
dag_timetables_assets: dict[
str, tuple[PartitionedAssetTimetable | None, list[AssetNameUri], dict[int, AssetNameUri]]
] = {did: _load_timetable_and_assets(did, session) for did in unique_dag_ids}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same problem

TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT = "stuck in queued reschedule"
""":meta private:"""

MAX_PARTITION_DAG_RUNS_PER_TICK = 500
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What’s the worst can happen if this is not the best value? (In other words, should we provide a way to change this?)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What’s the worst can happen if this is not the best value

The DagRun that waits for 501 Assets will never be triggered.

should we provide a way to change this?

I think we should, but I'm thinking of doing it in another PR.

Comment thread airflow-core/src/airflow/jobs/scheduler_job_runner.py Outdated
@Lee-W Lee-W force-pushed the asset-partition-window branch 2 times, most recently from 2447637 to 93b3bdf Compare May 6, 2026 12:22
@Lee-W Lee-W requested a review from jedcunningham as a code owner May 6, 2026 13:24
@Lee-W Lee-W force-pushed the asset-partition-window branch from a6a5bc4 to 2955f55 Compare May 6, 2026 14:26
Lee-W added 16 commits May 7, 2026 23:50
this is still not ideal, but at least it's not super wrong now
…t ordering

StartOfWeekMapper and StartOfQuarterMapper now derive their decode_downstream
regex from output_format itself, so users can re-order strftime directives
and {name} placeholders (e.g. "Q{quarter}/%Y") without having to override
decode_downstream. Malformed output_format — empty {}, non-identifier
placeholder names, duplicate %X directives, duplicate {name} placeholders —
raises ValueError at mapper construction instead of an opaque re.error from
deep inside a scheduler tick or UI route.
…ag_runs list

Drop the SQL "count distinct assets with any log" subquery and always
compute total_received via the Python rollup-aware helper. The list
endpoint previously returned different numbers for the same APDR
depending on whether the caller filtered by dag_id (rollup-aware,
counts upstream window keys) or queried globally (SQL approximation,
counts assets with any log) — same field, different semantics, very
confusing for any UI consumer.

The N+1 cost of per-Dag timetable loads was already paid in the
global branch for total_required, so adding a single batched log
fetch keeps the existing query budget while making the contract
identical across both views. _compute_received_count now skips
asset_ids that are no longer required (active=False) so the relaxed
log query doesn't over-count.
StartOfWeekMapper now always uses ISO weeks (Monday) and
StartOfMonthMapper always emits the 1st of the month. Custom
fiscal boundaries can still be expressed by pairing a user-defined
source mapper with the existing windows.
The next_run_assets and partitioned_dag_runs endpoints used to load
and deserialize the full timetable on every request just to read
mapper attributes (is_rollup) and required-key counts. Cache mapper
metadata per asset on DagModel during Dag sync via a new
``partition_mapper_info`` JSON column, so the UI resolves mapper
attributes from the cache and only loads the timetable when
``to_upstream`` evaluation for rollup mappers is actually needed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Scheduler including HA (high availability) scheduler area:task-sdk

Projects

Status: In Review

Development

Successfully merging this pull request may close these issues.

Implement rollup (many-to-one partition mapper)

3 participants