Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airflow-core/docs/authoring-and-scheduling/assets.rst
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ Declaring an ``@asset`` automatically creates:
* A ``DAG`` with *dag_id* set to the function name.
* A task inside the ``DAG`` with *task_id* set to the function name, and *outlet* to the created ``Asset``.

The parameter names ``self``, ``context``, and ``outlet_events`` are **reserved** in an ``@asset`` function: they are populated by Airflow at runtime (with the asset itself, the execution context, and the outlet event accessor respectively) and are never treated as inlet asset references.

Attaching extra information to an emitting asset event
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from datetime import datetime

from pydantic import Field
from pydantic.types import JsonValue

from airflow.api_fastapi.core_api.base import BaseModel, StrictBaseModel
Expand Down Expand Up @@ -54,6 +55,7 @@ class AssetEventResponse(BaseModel):
source_run_id: str | None = None
source_map_index: int | None = None
partition_key: str | None = None
partition_keys: list[str] = Field(default_factory=list)


class AssetEventsResponse(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
AddDagEndpoint,
AddDagRunDetailEndpoint,
AddNoteField,
AddOutletPartitionKeysField,
AddPartitionKeyField,
AddRunAfterField,
AddTaskInstanceStartDateField,
Expand All @@ -54,6 +55,7 @@
Version(
"2026-04-06",
AddPartitionKeyField,
AddOutletPartitionKeysField,
MovePreviousRunEndpoint,
AddDagRunDetailEndpoint,
MakeDagRunStartDateNullable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,23 @@ def remove_partition_key_from_asset_events(response: ResponseInfo) -> None: # t
elem.pop("partition_key", None)


class AddOutletPartitionKeysField(VersionChange):
"""Add the `partition_keys` field to AssetEventResponse for runtime-partitioned outlet events."""

description = __doc__

instructions_to_migrate_to_previous_version = (
schema(AssetEventResponse).field("partition_keys").didnt_exist,
)

@convert_response_to_previous_version_for(AssetEventsResponse) # type: ignore[arg-type]
def remove_partition_keys_from_asset_events(response: ResponseInfo) -> None: # type: ignore[misc]
"""Remove the `partition_keys` field from each asset event when converting to the previous version."""
events = response.body["asset_events"]
for elem in events:
elem.pop("partition_keys", None)


class MovePreviousRunEndpoint(VersionChange):
"""Add new previous-run endpoint and migrate old endpoint."""

Expand Down
7 changes: 6 additions & 1 deletion airflow-core/src/airflow/serialization/encoders.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from airflow.sdk.definitions.partition_mappers.temporal import StartOfHourMapper
from airflow.sdk.definitions.timetables.assets import (
AssetTriggeredTimetable,
PartitionAtRuntime,
PartitionedAssetTimetable,
)
from airflow.sdk.definitions.timetables.simple import ContinuousTimetable, NullTimetable, OnceTimetable
Expand Down Expand Up @@ -293,6 +294,7 @@ class _Serializer:
MultipleCronTriggerTimetable: "airflow.timetables.trigger.MultipleCronTriggerTimetable",
NullTimetable: "airflow.timetables.simple.NullTimetable",
OnceTimetable: "airflow.timetables.simple.OnceTimetable",
PartitionAtRuntime: "airflow.timetables.simple.PartitionAtRuntime",
PartitionedAssetTimetable: "airflow.timetables.simple.PartitionedAssetTimetable",
}

Expand All @@ -318,7 +320,10 @@ def serialize_timetable(self, timetable: BaseTimetable | CoreTimetable) -> dict[
@serialize_timetable.register(ContinuousTimetable)
@serialize_timetable.register(NullTimetable)
@serialize_timetable.register(OnceTimetable)
def _(self, timetable: ContinuousTimetable | NullTimetable | OnceTimetable) -> dict[str, Any]:
@serialize_timetable.register(PartitionAtRuntime)
def _(
self, timetable: ContinuousTimetable | NullTimetable | OnceTimetable | PartitionAtRuntime
) -> dict[str, Any]:
return {}

@serialize_timetable.register
Expand Down
7 changes: 7 additions & 0 deletions airflow-core/src/airflow/timetables/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,13 @@ class Timetable(Protocol):
instead of the traditional logic based on logical dates and data intervals.
"""

partitioned_at_runtime: bool = False
"""Whether this timetable defers partition selection to task runtime.

*True* for :class:`~airflow.timetables.simple.PartitionAtRuntime`;
downstream code can branch on this flag instead of using ``isinstance``.
"""

@classmethod
def deserialize(cls, data: dict[str, Any]) -> Timetable:
"""
Expand Down
16 changes: 16 additions & 0 deletions airflow-core/src/airflow/timetables/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class NullTimetable(_TrivialTimetable):
"""

can_be_scheduled = False # TODO (GH-52141): Find a way to keep this and one in Core in sync.
partitioned_at_runtime = False
description: str = "Never, external triggers only"

@property
Expand Down Expand Up @@ -183,6 +184,21 @@ def next_dagrun_info(
return DagRunInfo.interval(start, end)


class PartitionAtRuntime(NullTimetable):
Comment thread
anishgirianish marked this conversation as resolved.
"""
Timetable that never schedules anything; partition keys are set at runtime.

This corresponds to ``schedule=PartitionAtRuntime()``.
"""

description: str = "Never, partition key(s) set at runtime"
partitioned_at_runtime = True

@property
def summary(self) -> str:
return "PartitionAtRuntime"


class AssetTriggeredTimetable(_TrivialTimetable):
"""
Timetable that never schedules anything.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ def test_get_by_asset(self, uri, name, client):
},
"timestamp": "2021-01-01T00:00:00Z",
"partition_key": None,
"partition_keys": [],
},
{
"id": 2,
Expand All @@ -141,6 +142,7 @@ def test_get_by_asset(self, uri, name, client):
"created_dagruns": [],
"timestamp": "2021-01-02T00:00:00Z",
"partition_key": None,
"partition_keys": [],
},
{
"id": 3,
Expand All @@ -158,6 +160,7 @@ def test_get_by_asset(self, uri, name, client):
"created_dagruns": [],
"timestamp": "2021-01-03T00:00:00Z",
"partition_key": None,
"partition_keys": [],
},
]
}
Expand Down Expand Up @@ -195,6 +198,7 @@ def test_get_by_asset_with_after_filter(self, uri, name, client):
"created_dagruns": [],
"timestamp": "2021-01-02T00:00:00Z",
"partition_key": None,
"partition_keys": [],
},
{
"id": 3,
Expand All @@ -212,6 +216,7 @@ def test_get_by_asset_with_after_filter(self, uri, name, client):
"created_dagruns": [],
"timestamp": "2021-01-03T00:00:00Z",
"partition_key": None,
"partition_keys": [],
},
]
}
Expand Down Expand Up @@ -249,6 +254,7 @@ def test_get_by_asset_with_before_filter(self, uri, name, client):
"created_dagruns": [],
"timestamp": "2021-01-01T00:00:00Z",
"partition_key": None,
"partition_keys": [],
},
{
"id": 2,
Expand All @@ -266,6 +272,7 @@ def test_get_by_asset_with_before_filter(self, uri, name, client):
"created_dagruns": [],
"timestamp": "2021-01-02T00:00:00Z",
"partition_key": None,
"partition_keys": [],
},
]
}
Expand Down Expand Up @@ -308,6 +315,7 @@ def test_get_by_asset_with_before_and_after_filters(self, uri, name, client):
"created_dagruns": [],
"timestamp": "2021-01-02T00:00:00Z",
"partition_key": None,
"partition_keys": [],
},
]
}
Expand Down Expand Up @@ -345,6 +353,7 @@ def test_get_by_asset_with_descending_order(self, uri, name, client):
"created_dagruns": [],
"timestamp": "2021-01-03T00:00:00Z",
"partition_key": None,
"partition_keys": [],
},
{
"id": 2,
Expand All @@ -362,6 +371,7 @@ def test_get_by_asset_with_descending_order(self, uri, name, client):
"created_dagruns": [],
"timestamp": "2021-01-02T00:00:00Z",
"partition_key": None,
"partition_keys": [],
},
{
"id": 1,
Expand All @@ -379,6 +389,7 @@ def test_get_by_asset_with_descending_order(self, uri, name, client):
"created_dagruns": [],
"timestamp": "2021-01-01T00:00:00Z",
"partition_key": None,
"partition_keys": [],
},
]
}
Expand Down Expand Up @@ -416,6 +427,7 @@ def test_get_by_asset_get_first(self, uri, name, client):
"created_dagruns": [],
"timestamp": "2021-01-01T00:00:00Z",
"partition_key": None,
"partition_keys": [],
},
]
}
Expand Down Expand Up @@ -453,6 +465,7 @@ def test_get_by_asset_get_last(self, uri, name, client):
"created_dagruns": [],
"timestamp": "2021-01-03T00:00:00Z",
"partition_key": None,
"partition_keys": [],
},
]
}
Expand Down Expand Up @@ -484,6 +497,7 @@ def test_get_by_asset(self, client):
"created_dagruns": [],
"timestamp": "2021-01-01T00:00:00Z",
"partition_key": None,
"partition_keys": [],
},
{
"id": 2,
Expand All @@ -501,6 +515,7 @@ def test_get_by_asset(self, client):
"created_dagruns": [],
"timestamp": "2021-01-02T00:00:00Z",
"partition_key": None,
"partition_keys": [],
},
{
"id": 3,
Expand All @@ -518,6 +533,7 @@ def test_get_by_asset(self, client):
"created_dagruns": [],
"timestamp": "2021-01-03T00:00:00Z",
"partition_key": None,
"partition_keys": [],
},
]
}
2 changes: 2 additions & 0 deletions task-sdk/docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ Timetables

.. autoapiclass:: airflow.sdk.MultipleCronTriggerTimetable

.. autoapiclass:: airflow.sdk.PartitionAtRuntime

.. autoapiclass:: airflow.sdk.PartitionedAssetTimetable


Expand Down
11 changes: 10 additions & 1 deletion task-sdk/src/airflow/sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"ObjectStoragePath",
"Param",
"ParamsDict",
"PartitionAtRuntime",
"PartitionedAssetTimetable",
"PartitionMapper",
"PokeReturnValue",
Expand Down Expand Up @@ -118,7 +119,13 @@
from airflow.sdk.bases.skipmixin import SkipMixin
from airflow.sdk.bases.xcom import BaseXCom
from airflow.sdk.configuration import AirflowSDKConfigParser
from airflow.sdk.definitions.asset import Asset, AssetAlias, AssetAll, AssetAny, AssetWatcher
from airflow.sdk.definitions.asset import (
Asset,
AssetAlias,
AssetAll,
AssetAny,
AssetWatcher,
)
from airflow.sdk.definitions.asset.decorators import asset
from airflow.sdk.definitions.asset.metadata import Metadata
from airflow.sdk.definitions.callback import AsyncCallback, SyncCallback
Expand Down Expand Up @@ -154,6 +161,7 @@
from airflow.sdk.definitions.template import literal
from airflow.sdk.definitions.timetables.assets import (
AssetOrTimeSchedule,
PartitionAtRuntime,
PartitionedAssetTimetable,
)
from airflow.sdk.definitions.timetables.events import EventsTimetable
Expand Down Expand Up @@ -215,6 +223,7 @@
"ObjectStoragePath": ".io.path",
"Param": ".definitions.param",
"ParamsDict": ".definitions.param",
"PartitionAtRuntime": ".definitions.timetables.assets",
"PartitionedAssetTimetable": ".definitions.timetables.assets",
"PartitionMapper": ".definitions.partition_mappers.base",
"PokeReturnValue": ".bases.sensor",
Expand Down
2 changes: 2 additions & 0 deletions task-sdk/src/airflow/sdk/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ from airflow.sdk.definitions.taskgroup import TaskGroup as TaskGroup
from airflow.sdk.definitions.template import literal as literal
from airflow.sdk.definitions.timetables.assets import (
AssetOrTimeSchedule,
PartitionAtRuntime,
PartitionedAssetTimetable,
)
from airflow.sdk.definitions.timetables.events import EventsTimetable
Expand Down Expand Up @@ -145,6 +146,7 @@ __all__ = [
"ObjectStoragePath",
"Param",
"PokeReturnValue",
"PartitionAtRuntime",
"PartitionedAssetTimetable",
"PartitionMapper",
"ProductMapper",
Expand Down
1 change: 1 addition & 0 deletions task-sdk/src/airflow/sdk/api/datamodels/_generated.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ class AssetEventResponse(BaseModel):
source_run_id: Annotated[str | None, Field(title="Source Run Id")] = None
source_map_index: Annotated[int | None, Field(title="Source Map Index")] = None
partition_key: Annotated[str | None, Field(title="Partition Key")] = None
partition_keys: Annotated[list[str] | None, Field(title="Partition Keys")] = None


class AssetEventsResponse(BaseModel):
Expand Down
8 changes: 8 additions & 0 deletions task-sdk/src/airflow/sdk/bases/timetable.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ class BaseTimetable:

asset_condition: BaseAsset | None = None

partitioned_at_runtime: bool = False
"""
Whether this timetable defers partition selection to task runtime.

*True* for :class:`~airflow.sdk.PartitionAtRuntime`; downstream code can
branch on this flag instead of using ``isinstance``.
"""

def validate(self) -> None:
"""
Validate the timetable is correctly specified.
Expand Down
Loading
Loading