Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion airflow-core/docs/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
| ``a7f3b2c1d4e5`` (head) | ``b8f3e4a1d2c9`` | ``3.3.0`` | Add allow_producer_teams column to |
| ``c20871fbf23a`` (head) | ``a7f3b2c1d4e5`` | ``3.3.0`` | Add partition_mapper_info to DagModel. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``a7f3b2c1d4e5`` | ``b8f3e4a1d2c9`` | ``3.3.0`` | Add allow_producer_teams column to |
| | | | dag_schedule_asset_reference table. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``b8f3e4a1d2c9`` | ``fde9ed84d07b`` | ``3.3.0`` | Add retry_delay_override and retry_reason to task_instance. |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from datetime import datetime

from pydantic import Field

from airflow.api_fastapi.core_api.base import BaseModel


class NextRunAssetEventResponse(BaseModel):
"""One asset event in the ``next_run_assets`` payload."""

id: int
name: str | None
uri: str
last_update: datetime | None = None
received_count: int = 0
required_count: int = 1
received_keys: list[str] = Field(default_factory=list)
required_keys: list[str] = Field(default_factory=list)
is_rollup: bool = False


class NextRunAssetsResponse(BaseModel):
"""Response for the ``next_run_assets`` endpoint."""

asset_expression: dict | None = None
events: list[NextRunAssetEventResponse]
pending_partition_count: int | None = None
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ class PartitionedDagRunAssetResponse(BaseModel):
asset_name: str
asset_uri: str
received: bool
received_count: int
required_count: int
received_keys: list[str]
required_keys: list[str]
is_rollup: bool = False


class PartitionedDagRunDetailResponse(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,7 @@ paths:
content:
application/json:
schema:
type: object
additionalProperties: true
title: Response Next Run Assets
$ref: '#/components/schemas/NextRunAssetsResponse'
'422':
description: Validation Error
content:
Expand Down Expand Up @@ -3069,6 +3067,77 @@ components:
- extra_menu_items
title: MenuItemCollectionResponse
description: Menu Item Collection serializer for responses.
NextRunAssetEventResponse:
properties:
id:
type: integer
title: Id
name:
anyOf:
- type: string
- type: 'null'
title: Name
uri:
type: string
title: Uri
last_update:
anyOf:
- type: string
format: date-time
- type: 'null'
title: Last Update
received_count:
type: integer
title: Received Count
default: 0
required_count:
type: integer
title: Required Count
default: 1
received_keys:
items:
type: string
type: array
title: Received Keys
required_keys:
items:
type: string
type: array
title: Required Keys
is_rollup:
type: boolean
title: Is Rollup
default: false
type: object
required:
- id
- name
- uri
title: NextRunAssetEventResponse
description: One asset event in the ``next_run_assets`` payload.
NextRunAssetsResponse:
properties:
asset_expression:
anyOf:
- additionalProperties: true
type: object
- type: 'null'
title: Asset Expression
events:
items:
$ref: '#/components/schemas/NextRunAssetEventResponse'
type: array
title: Events
pending_partition_count:
anyOf:
- type: integer
- type: 'null'
title: Pending Partition Count
type: object
required:
- events
title: NextRunAssetsResponse
description: Response for the ``next_run_assets`` endpoint.
NodeResponse:
properties:
id:
Expand Down Expand Up @@ -3152,12 +3221,36 @@ components:
received:
type: boolean
title: Received
received_count:
type: integer
title: Received Count
required_count:
type: integer
title: Required Count
received_keys:
items:
type: string
type: array
title: Received Keys
required_keys:
items:
type: string
type: array
title: Required Keys
is_rollup:
type: boolean
title: Is Rollup
default: false
type: object
required:
- asset_id
- asset_name
- asset_uri
- received
- received_count
- required_count
- received_keys
- required_keys
title: PartitionedDagRunAssetResponse
description: Asset info within a partitioned Dag run detail.
PartitionedDagRunCollectionResponse:
Expand Down
118 changes: 107 additions & 11 deletions airflow-core/src/airflow/api_fastapi/core_api/routes/ui/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,19 @@
# under the License.
from __future__ import annotations

from contextlib import suppress
from typing import TYPE_CHECKING, cast

from fastapi import Depends, HTTPException, status
from sqlalchemy import ColumnElement, and_, case, exists, func, select, true

from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.ui.assets import (
NextRunAssetEventResponse,
NextRunAssetsResponse,
)
from airflow.api_fastapi.core_api.routes.ui.partitioned_dag_runs import _load_timetable
from airflow.api_fastapi.core_api.security import requires_access_asset, requires_access_dag
from airflow.models import DagModel
from airflow.models.asset import (
Expand All @@ -32,6 +40,9 @@
PartitionedAssetKeyLog,
)

if TYPE_CHECKING:
from airflow.partition_mappers.base import RollupMapper

assets_router = AirflowRouter(tags=["Asset"])


Expand All @@ -42,7 +53,7 @@
def next_run_assets(
dag_id: str,
session: SessionDep,
) -> dict:
) -> NextRunAssetsResponse:
dag_model = DagModel.get_dagmodel(dag_id, session=session)
if dag_model is None:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} was not found")
Expand All @@ -55,7 +66,7 @@ def next_run_assets(
pending_partition_count: int | None = None

queued_expr: ColumnElement[int]
if is_partitioned := dag_model.timetable_summary == "Partitioned Asset":
if is_partitioned := dag_model.timetable_partitioned:
pending_partition_count = session.scalar(
select(func.count())
.select_from(AssetPartitionDagRun)
Expand Down Expand Up @@ -90,7 +101,7 @@ def next_run_assets(
AssetModel.id,
AssetModel.uri,
AssetModel.name,
func.max(AssetEvent.timestamp).label("lastUpdate"),
func.max(AssetEvent.timestamp).label("last_update"),
queued_expr.label("queued"),
)
.join(DagScheduleAssetReference, DagScheduleAssetReference.asset_id == AssetModel.id)
Expand All @@ -110,12 +121,97 @@ def next_run_assets(
isouter=True,
)

events = [dict(info._mapping) for info in session.execute(query)]
for event in events:
if not event.pop("queued", None):
event["lastUpdate"] = None
raw_rows = list(session.execute(query))

if not is_partitioned:
events = [
NextRunAssetEventResponse(
id=row.id,
name=row.name,
uri=row.uri,
last_update=row.last_update if row.queued else None,
)
for row in raw_rows
]
return NextRunAssetsResponse(asset_expression=dag_model.asset_expression, events=events)

# Partitioned Dags: enrich with per-asset received/required counts and rollup flag.
pending_apdr = session.execute(
select(AssetPartitionDagRun.id, AssetPartitionDagRun.partition_key)
.where(
AssetPartitionDagRun.target_dag_id == dag_id,
AssetPartitionDagRun.created_dag_run_id.is_(None),
)
.order_by(AssetPartitionDagRun.created_at.desc())
.limit(1)
).one_or_none()

has_rollup_mappers = dag_model.has_rollup_mappers

data: dict = {"asset_expression": dag_model.asset_expression, "events": events}
if pending_partition_count is not None:
data["pending_partition_count"] = pending_partition_count
return data
if pending_apdr is None:
# No pending APDR yet — mark rollup assets so the UI can handle them
# correctly (e.g. skip "Asset Triggered" in favour of the asset name view).
# Reads from the cached partition_mapper_info so no timetable load is needed.
events = [
NextRunAssetEventResponse(
id=row.id,
name=row.name,
uri=row.uri,
last_update=row.last_update if row.queued else None,
is_rollup=has_rollup_mappers and dag_model.is_rollup_asset(name=row.name, uri=row.uri),
)
for row in raw_rows
]
return NextRunAssetsResponse(
asset_expression=dag_model.asset_expression,
events=events,
pending_partition_count=pending_partition_count,
)

# Collect received upstream partition keys per asset for this partition run.
# Use a set to deduplicate: multiple events for the same key count as one.
received_keys_by_asset: dict[int, set[str]] = {}
for log_row in session.execute(
select(
PartitionedAssetKeyLog.asset_id,
PartitionedAssetKeyLog.source_partition_key,
).where(PartitionedAssetKeyLog.asset_partition_dag_run_id == pending_apdr.id)
):
received_keys_by_asset.setdefault(log_row.asset_id, set()).add(log_row.source_partition_key or "")

# The timetable is only needed to call ``to_upstream`` for rollup mappers.
# When the cached info shows no rollup mappers, skip loading it entirely.
rollup_timetable = _load_timetable(dag_id, session) if has_rollup_mappers else None

events = []
for row in raw_rows:
received_keys = sorted(received_keys_by_asset.get(row.id, set()))
required_keys: list[str] = [pending_apdr.partition_key]
is_rollup = has_rollup_mappers and dag_model.is_rollup_asset(name=row.name, uri=row.uri)
if is_rollup and rollup_timetable is not None:
with suppress(Exception):
mapper = rollup_timetable.get_partition_mapper(name=row.name, uri=row.uri)
required_keys = sorted(cast("RollupMapper", mapper).to_upstream(pending_apdr.partition_key))
received_count = len(received_keys)
required_count = len(required_keys)
# Only surface last_update once all required upstream keys have arrived.
last_update = row.last_update if row.queued and received_count >= required_count else None
events.append(
NextRunAssetEventResponse(
id=row.id,
name=row.name,
uri=row.uri,
last_update=last_update,
received_count=received_count,
required_count=required_count,
received_keys=received_keys,
required_keys=required_keys,
is_rollup=is_rollup,
)
)

return NextRunAssetsResponse(
asset_expression=dag_model.asset_expression,
events=events,
pending_partition_count=pending_partition_count,
)
Loading
Loading