diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py index 481cc0387fed1..a5def0ff769c6 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py @@ -141,6 +141,7 @@ class TriggerDAGRunPostBody(StrictBaseModel): conf: dict | None = Field(default_factory=dict) note: str | None = None partition_key: str | None = None + bundle_version: str | None = None @model_validator(mode="after") def check_data_intervals(self): @@ -176,6 +177,7 @@ def validate_context(self, dag: SerializedDAG) -> dict: "conf": self.conf, "note": self.note, "partition_key": self.partition_key, + "bundle_version": self.bundle_version, } diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 5ed96855c24ac..6e21b511ffb3d 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -13479,6 +13479,11 @@ components: - type: string - type: 'null' title: Partition Key + bundle_version: + anyOf: + - type: string + - type: 'null' + title: Bundle Version additionalProperties: false type: object title: MaterializeAssetBody @@ -14924,6 +14929,11 @@ components: - type: string - type: 'null' title: Partition Key + bundle_version: + anyOf: + - type: string + - type: 'null' + title: Bundle Version additionalProperties: false type: object required: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py index e5d9d4bf228da..703e57b87d8f1 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/assets.py @@ -80,6 +80,7 @@ AssetWatcherModel, TaskOutletAssetReference, ) +from airflow.models.dag_version import DagVersion from airflow.typing_compat import Unpack from airflow.utils.state import DagRunState from airflow.utils.types import DagRunTriggeredByType, DagRunType @@ -435,7 +436,26 @@ def materialize_asset( f"Dag with dag_id: '{dag_id}' does not allow asset materialization runs", ) - params = (body or MaterializeAssetBody()).validate_context(dag) + resolved_body = body or MaterializeAssetBody() + + resolved_dag_version = None + if resolved_body.bundle_version is not None: + if dag.disable_bundle_versioning: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + f"DAG with dag_id: '{dag_id}' does not support bundle versioning", + ) + resolved_dag_version = DagVersion.get_latest_version( + dag.dag_id, bundle_version=resolved_body.bundle_version, load_serialized_dag=True, session=session + ) + if resolved_dag_version is None: + raise HTTPException( + status.HTTP_404_NOT_FOUND, + f"DAG with dag_id: '{dag_id}' does not have a version for bundle_version '{resolved_body.bundle_version}'", + ) + dag = resolved_dag_version.serialized_dag.dag + + params = resolved_body.validate_context(dag) return dag.create_dagrun( run_id=params["run_id"], logical_date=params["logical_date"], @@ -447,6 +467,8 @@ def materialize_asset( triggering_user_name=user.get_name(), state=DagRunState.QUEUED, partition_key=params["partition_key"], + bundle_version=resolved_body.bundle_version, + dag_version=resolved_dag_version, note=params["note"], session=session, ) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index a40676f8e186f..9d8ffd2d89c07 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -99,6 +99,7 @@ ) from airflow.api_fastapi.core_api.services.public.dag_run import DagRunWaiter from airflow.api_fastapi.logging.decorators import action_logging +from airflow.exceptions import AirflowException, AirflowNotFoundException from airflow.listeners.listener import get_listener_manager from airflow.models import DagModel, DagRun from airflow.models.asset import AssetEvent @@ -599,6 +600,24 @@ def trigger_dag_run( try: dag = get_latest_version_of_dag(dag_bag, dag_id, session) + + resolved_dag_version = None + if body.bundle_version is not None: + if dag.disable_bundle_versioning: + raise HTTPException( + status.HTTP_400_BAD_REQUEST, + f"DAG with dag_id: '{dag_id}' does not support bundle versioning", + ) + resolved_dag_version = DagVersion.get_latest_version( + dag.dag_id, bundle_version=body.bundle_version, load_serialized_dag=True, session=session + ) + if resolved_dag_version is None: + raise HTTPException( + status.HTTP_404_NOT_FOUND, + f"DAG with dag_id: '{dag_id}' does not have a version for bundle_version '{body.bundle_version}'", + ) + dag = resolved_dag_version.serialized_dag.dag + params = body.validate_context(dag) dag_run = dag.create_dagrun( @@ -612,6 +631,8 @@ def trigger_dag_run( triggering_user_name=user.get_name(), state=DagRunState.QUEUED, partition_key=params["partition_key"], + bundle_version=body.bundle_version, + dag_version=resolved_dag_version, session=session, ) @@ -620,6 +641,10 @@ def trigger_dag_run( current_user_id = user.get_id() dag_run.note = (dag_run_note, current_user_id) return dag_run + except AirflowNotFoundException as e: + raise HTTPException(status.HTTP_404_NOT_FOUND, str(e)) + except AirflowException as e: + raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) except ValueError as e: raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e)) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py index a0d9739080118..8421b24a8f8e0 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py @@ -308,6 +308,7 @@ class DagRun(StrictBaseModel): consumed_asset_events: list[AssetEventDagRunReference] partition_key: str | None note: str | None = None + bundle_version: str | None = None team_name: str | None = None @model_validator(mode="before") diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py index dfa27f53ebd91..b7a4433c32bfe 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/__init__.py @@ -29,6 +29,7 @@ from airflow.api_fastapi.execution_api.versions.v2025_10_27 import MakeDagRunConfNullable from airflow.api_fastapi.execution_api.versions.v2025_11_05 import AddTriggeringUserNameField from airflow.api_fastapi.execution_api.versions.v2026_04_06 import ( + AddBundleVersionField, AddDagEndpoint, AddDagRunDetailEndpoint, AddNoteField, @@ -56,6 +57,7 @@ AddPartitionKeyField, MovePreviousRunEndpoint, AddDagRunDetailEndpoint, + AddBundleVersionField, MakeDagRunStartDateNullable, ModifyDeferredTaskKwargsToJsonValue, RemoveUpstreamMapIndexesField, diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py index 5a3b0f0f5fc2d..6deb3489c42b6 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py @@ -202,6 +202,26 @@ class AddDagEndpoint(VersionChange): instructions_to_migrate_to_previous_version = (endpoint("/dags/{dag_id}", ["GET"]).didnt_exist,) +class AddBundleVersionField(VersionChange): + """Add the `bundle_version` field to DagRun model.""" + + description = __doc__ + + instructions_to_migrate_to_previous_version = (schema(DagRun).field("bundle_version").didnt_exist,) + + @convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type] + def remove_bundle_version_from_ti_run_context(response: ResponseInfo) -> None: # type: ignore[misc] + """Remove the `bundle_version` field from the dag_run object when converting to the previous version.""" + if "dag_run" in response.body and isinstance(response.body["dag_run"], dict): + response.body["dag_run"].pop("bundle_version", None) + + @convert_response_to_previous_version_for(DagRun) # type: ignore[arg-type] + def remove_bundle_version_from_dag_run_response(response: ResponseInfo) -> None: # type: ignore[misc] + """Remove the `bundle_version` field from direct DagRun responses for previous API versions.""" + if isinstance(response.body, dict): + response.body.pop("bundle_version", None) + + class AddRunAfterField(VersionChange): """Add run_after parameter to TriggerDAGRunPayload Model.""" diff --git a/airflow-core/src/airflow/exceptions.py b/airflow-core/src/airflow/exceptions.py index f9a891472a3e9..89b7837f269e1 100644 --- a/airflow-core/src/airflow/exceptions.py +++ b/airflow-core/src/airflow/exceptions.py @@ -137,6 +137,10 @@ class DagRunNotFound(AirflowNotFoundException): """Raise when a DAG Run is not available in the system.""" +class DagVersionNotFound(AirflowNotFoundException): + """Raised when a DagVersion for the given dag_id / bundle_version is not found.""" + + class DagRunAlreadyExists(AirflowBadRequest): """Raise when creating a DAG run for DAG which already has DAG run entry.""" diff --git a/airflow-core/src/airflow/models/dag_version.py b/airflow-core/src/airflow/models/dag_version.py index cfdb301d0410e..dfe52ed1aff06 100644 --- a/airflow-core/src/airflow/models/dag_version.py +++ b/airflow-core/src/airflow/models/dag_version.py @@ -148,6 +148,7 @@ def _latest_version_select( bundle_version: str | None = None, load_dag_model: bool = False, load_bundle_model: bool = False, + load_serialized_dag: bool = False, ) -> Select: """ Get the select object to get the latest version of the DAG. @@ -165,6 +166,9 @@ def _latest_version_select( if load_bundle_model: query = query.options(joinedload(cls.bundle)) + if load_serialized_dag: + query = query.options(joinedload(cls.serialized_dag)) + query = query.order_by(cls.created_at.desc()).limit(1) return query @@ -177,6 +181,7 @@ def get_latest_version( bundle_version: str | None = None, load_dag_model: bool = False, load_bundle_model: bool = False, + load_serialized_dag: bool = False, session: Session = NEW_SESSION, ) -> DagVersion | None: """ @@ -186,6 +191,7 @@ def get_latest_version( :param session: The database session. :param load_dag_model: Whether to load the DAG model. :param load_bundle_model: Whether to load the DagBundle model. + :param load_serialized_dag: Whether to eagerly load the serialized DAG. :return: The latest version of the DAG or None if not found. """ return session.scalar( @@ -194,6 +200,7 @@ def get_latest_version( bundle_version=bundle_version, load_dag_model=load_dag_model, load_bundle_model=load_bundle_model, + load_serialized_dag=load_serialized_dag, ) ) diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py b/airflow-core/src/airflow/serialization/definitions/dag.py index dad34ddd0e692..818ceed48d105 100644 --- a/airflow-core/src/airflow/serialization/definitions/dag.py +++ b/airflow-core/src/airflow/serialization/definitions/dag.py @@ -33,7 +33,7 @@ from airflow._shared.observability.metrics import stats from airflow._shared.timezones.timezone import coerce_datetime from airflow.configuration import conf as airflow_conf -from airflow.exceptions import AirflowException, NodeNotFound, TaskNotFound +from airflow.exceptions import AirflowException, DagVersionNotFound, NodeNotFound, TaskNotFound from airflow.models.dag import DagModel from airflow.models.dag_version import DagVersion from airflow.models.dagrun import DagRun @@ -69,6 +69,21 @@ log = structlog.get_logger(__name__) +# Callbacks are not serialized, so when running an older bundle version we copy them +# from the live dag to the resolved dag so dag-level event hooks still fire correctly. +# Other DAG-level attrs (max_active_runs, catchup, params, …) intentionally come from +# the old serialized version, since the caller asked for that specific historical snapshot. +_DAG_CALLBACK_ATTRS = ( + "sla_miss_callback", + "on_success_callback", + "on_failure_callback", + "on_retry_callback", + "on_execute_callback", + "on_skipped_callback", + "has_on_success_callback", + "has_on_failure_callback", +) + # TODO (GH-52141): Share definition with SDK? class EdgeInfoType(TypedDict): @@ -510,6 +525,8 @@ def create_dagrun( partition_key: str | None = None, partition_date: datetime.datetime | None = None, note: str | None = None, + bundle_version: str | None = None, + dag_version: DagVersion | None = None, session: Session = NEW_SESSION, ) -> DagRun: """ @@ -597,6 +614,8 @@ def create_dagrun( partition_key=partition_key, partition_date=partition_date, note=note, + bundle_version=bundle_version, + dag_version=dag_version, session=session, ) @@ -1288,14 +1307,32 @@ def _create_orm_dagrun( partition_key: str | None = None, partition_date: datetime.datetime | None = None, note: str | None = None, + bundle_version: str | None = None, + dag_version: DagVersion | None = None, session: Session = NEW_SESSION, ) -> DagRun: - bundle_version = None - if not dag.disable_bundle_versioning: - bundle_version = session.scalar( - select(DagModel.bundle_version).where(DagModel.dag_id == dag.dag_id), + resolved_bundle_version: str | None = None + use_resolved_dag = False + if dag_version is not None: + resolved_bundle_version = bundle_version + use_resolved_dag = True + elif bundle_version is not None and not dag.disable_bundle_versioning: + resolved_bundle_version = bundle_version + dag_version = DagVersion.get_latest_version( + dag.dag_id, bundle_version=bundle_version, load_serialized_dag=True, session=session ) - dag_version = DagVersion.get_latest_version(dag.dag_id, session=session) + if not dag_version: + raise DagVersionNotFound( + f"DAG with dag_id: '{dag.dag_id}' does not have a version for bundle_version '{bundle_version}'" + ) + use_resolved_dag = True + else: + if not dag.disable_bundle_versioning: + resolved_bundle_version = session.scalar( + select(DagModel.bundle_version).where(DagModel.dag_id == dag.dag_id) + ) + dag_version = DagVersion.get_latest_version(dag.dag_id, session=session) + if not dag_version: raise AirflowException(f"Cannot create DagRun for DAG {dag.dag_id} because the dag is not serialized") @@ -1313,7 +1350,7 @@ def _create_orm_dagrun( triggered_by=triggered_by, triggering_user_name=triggering_user_name, backfill_id=backfill_id, - bundle_version=bundle_version, + bundle_version=resolved_bundle_version, partition_key=partition_key, partition_date=partition_date, note=note, @@ -1326,6 +1363,12 @@ def _create_orm_dagrun( session.add(run) session.flush() run.dag = dag + if use_resolved_dag: + resolved_dag = dag_version.serialized_dag.dag + for attr in _DAG_CALLBACK_ATTRS: + if hasattr(dag, attr): + setattr(resolved_dag, attr, getattr(dag, attr)) # type: ignore[attr-defined] + run.dag = resolved_dag # create the associated task instances # state is None at the moment of creation run.verify_integrity(session=session, dag_version_id=dag_version.id) diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index bc4c7179c2dbb..d7a7ae47b2267 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -4665,6 +4665,17 @@ export const $MaterializeAssetBody = { } ], title: 'Partition Key' + }, + bundle_version: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Bundle Version' } }, additionalProperties: false, @@ -6858,6 +6869,17 @@ export const $TriggerDAGRunPostBody = { } ], title: 'Partition Key' + }, + bundle_version: { + anyOf: [ + { + type: 'string' + }, + { + type: 'null' + } + ], + title: 'Bundle Version' } }, additionalProperties: false, diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index 5b216da03a58b..3da9cec923355 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1188,6 +1188,7 @@ export type MaterializeAssetBody = { } | null; note?: string | null; partition_key?: string | null; + bundle_version?: string | null; }; /** @@ -1655,6 +1656,7 @@ export type TriggerDAGRunPostBody = { } | null; note?: string | null; partition_key?: string | null; + bundle_version?: string | null; }; /** diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py index 26fe24888ac29..7321070a1f4c8 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_assets.py @@ -1551,6 +1551,57 @@ def test_should_respond_403_when_user_cannot_trigger_dag(self, test_client): user=mock.ANY, ) + def test_should_respond_with_bundle_version(self, test_client, session, dag_maker): + """Test that asset materialization respects bundle_version parameter.""" + bundle_name = "testing_bundle" + asset = session.get(AssetModel, 1).to_serialized() + + with dag_maker( + self.DAG_ASSET1_ID, + bundle_name=bundle_name, + bundle_version="v1", + schedule=None, + session=session, + ): + EmptyOperator(task_id="task_v1", outlets=asset) + + with dag_maker( + self.DAG_ASSET1_ID, + bundle_name=bundle_name, + bundle_version="v2", + schedule=None, + session=session, + ): + EmptyOperator(task_id="task_v2", outlets=asset) + + response = test_client.post("/assets/1/materialize", json={"bundle_version": "v1"}) + assert response.status_code == 200 + assert response.json()["bundle_version"] == "v1" + + response = test_client.post("/assets/1/materialize", json={"bundle_version": "invalid_version"}) + assert response.status_code == 404 + assert ( + f"DAG with dag_id: '{self.DAG_ASSET1_ID}' does not have a version for bundle_version 'invalid_version'" + in response.json()["detail"] + ) + + with dag_maker( + self.DAG_ASSET1_ID, + bundle_name=bundle_name, + bundle_version="v3", + schedule=None, + session=session, + ): + EmptyOperator(task_id="task_v3", outlets=asset) + dag_maker.dag.disable_bundle_versioning = True + + response = test_client.post("/assets/1/materialize", json={"bundle_version": "v1"}) + assert response.status_code == 400 + assert ( + f"DAG with dag_id: '{self.DAG_ASSET1_ID}' does not support bundle versioning" + in response.json()["detail"] + ) + class TestGetAssetQueuedEvents(TestQueuedEventEndpoint): @pytest.mark.usefixtures("time_freezer") diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py index ea3523fb07535..87311b5ce5e83 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py @@ -2444,6 +2444,114 @@ def test_custom_timetable_generate_run_id_for_manual_trigger(self, dag_maker, te run = session.scalars(select(DagRun).where(DagRun.run_id == run_id_without_logical_date)).one() assert run.dag_id == custom_dag_id + @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle") + def test_trigger_dag_run_with_bundle_version(self, test_client, session, dag_maker): + """Test triggering a DAG run with a specific bundle version.""" + from tests_common.test_utils.dag import sync_dag_to_db + + dag_id = "test_bundle_version_dag" + bundle_name = "testing_bundle" + + with dag_maker( + dag_id=dag_id, + bundle_name=bundle_name, + bundle_version="v1", + session=session, + ) as dag1: + EmptyOperator(task_id="task_1") + sync_dag_to_db(dag1, bundle_name=bundle_name) + + with dag_maker( + dag_id=dag_id, + bundle_name=bundle_name, + bundle_version="v2", + session=session, + ) as dag2: + EmptyOperator(task_id="task_1") + EmptyOperator(task_id="task_2") + sync_dag_to_db(dag2, bundle_name=bundle_name) + + response = test_client.post( + f"/dags/{dag_id}/dagRuns", json={"logical_date": "2024-01-01T00:00:00Z", "bundle_version": "v1"} + ) + assert response.status_code == 200 + assert response.json()["dag_versions"][0]["bundle_version"] == "v1" + + response = test_client.post( + f"/dags/{dag_id}/dagRuns", + json={ + "logical_date": "2024-01-02T00:00:00Z", + }, + ) + assert response.status_code == 200 + assert response.json()["dag_versions"][0]["bundle_version"] == "v2" + + response = test_client.post( + f"/dags/{dag_id}/dagRuns", + json={"logical_date": "2024-01-03T00:00:00Z", "bundle_version": "invalid_version"}, + ) + assert response.status_code == 404 + assert ( + f"DAG with dag_id: '{dag_id}' does not have a version for bundle_version 'invalid_version'" + in response.json()["detail"] + ) + + dag2.disable_bundle_versioning = True + sync_dag_to_db(dag2, bundle_name=bundle_name) + + response = test_client.post( + f"/dags/{dag_id}/dagRuns", json={"logical_date": "2024-01-04T00:00:00Z", "bundle_version": "v1"} + ) + assert response.status_code == 400 + assert f"DAG with dag_id: '{dag_id}' does not support bundle versioning" in response.json()["detail"] + + def test_trigger_dag_run_bundle_version_validates_against_old_param_schema( + self, test_client, session, dag_maker + ): + """Conf is validated against the requested bundle version's param schema, not the live dag's.""" + from tests_common.test_utils.dag import sync_dag_to_db + + dag_id = "test_bundle_param_schema_dag" + bundle_name = "param_schema_bundle" + + with dag_maker( + dag_id=dag_id, + bundle_name=bundle_name, + bundle_version="v1", + session=session, + params={"env": Param("staging", type="string", enum=["staging", "prod"])}, + ) as dag1: + EmptyOperator(task_id="task_1") + sync_dag_to_db(dag1, bundle_name=bundle_name) + + with dag_maker( + dag_id=dag_id, + bundle_name=bundle_name, + bundle_version="v2", + session=session, + params={"env": Param("dev", type="string", enum=["dev", "staging", "prod"])}, + ) as dag2: + EmptyOperator(task_id="task_1") + sync_dag_to_db(dag2, bundle_name=bundle_name) + + # "dev" is valid for v2 but not for v1's enum — triggering v1 should reject it. + response = test_client.post( + f"/dags/{dag_id}/dagRuns", + json={"logical_date": "2024-02-01T00:00:00Z", "bundle_version": "v1", "conf": {"env": "dev"}}, + ) + assert response.status_code == 400 + + # "staging" is valid for both v1 and v2 — triggering v1 should accept it. + response = test_client.post( + f"/dags/{dag_id}/dagRuns", + json={ + "logical_date": "2024-02-02T00:00:00Z", + "bundle_version": "v1", + "conf": {"env": "staging"}, + }, + ) + assert response.status_code == 200 + class TestWaitDagRun: # The way we init async engine does not work well with FastAPI app init. diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py index b84b993f1d63d..93401785336e7 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py @@ -328,6 +328,7 @@ def test_get_state(self, client, session, dag_maker): "state": "success", "triggering_user_name": None, "note": None, + "bundle_version": None, "team_name": None, } diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index 94b3fed0e071b..32b8df4cb749d 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -234,6 +234,7 @@ def test_ti_run_state_to_running( "consumed_asset_events": [], "partition_key": None, "note": None, + "bundle_version": None, "team_name": None, }, "task_reschedule_count": 0, diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_04_06/test_dag_runs.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_04_06/test_dag_runs.py index a3c7ff77d14f1..b6e585a01f031 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_04_06/test_dag_runs.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/v2026_04_06/test_dag_runs.py @@ -21,6 +21,7 @@ from airflow._shared.timezones import timezone from airflow.utils.state import DagRunState +from airflow.utils.types import DagRunType pytestmark = pytest.mark.db_test @@ -61,3 +62,24 @@ def test_get_previous_dag_run_redirect(ver_client, session, dag_maker): assert result["dag_id"] == "test_dag_id" assert result["run_id"] == "run2" # Most recent before 2025-01-10 assert result["state"] == "failed" + + +def test_get_dag_run_includes_bundle_version(client, session, dag_maker): + """The GET /{dag_id}/{run_id} endpoint added in 2026-04-06 exposes bundle_version.""" + dag_id = "test_dag_id_bundle" + run_id = "test_run_bundle" + + with dag_maker(dag_id=dag_id, schedule=None, session=session, serialized=True): + pass + + dag_maker.create_dagrun( + run_id=run_id, + logical_date=None, + run_type=DagRunType.MANUAL, + state=DagRunState.SUCCESS, + ) + session.commit() + + response = client.get(f"/execution/dag-runs/{dag_id}/{run_id}") + assert response.status_code == 200 + assert "bundle_version" in response.json() diff --git a/airflow-core/tests/unit/models/test_dag.py b/airflow-core/tests/unit/models/test_dag.py index b34ab12dd4aef..c4e8e2e5114be 100644 --- a/airflow-core/tests/unit/models/test_dag.py +++ b/airflow-core/tests/unit/models/test_dag.py @@ -3990,6 +3990,134 @@ def hello(): assert dr.bundle_version == expected +def test_create_dagrun_uses_resolved_bundle_version_for_integrity(dag_maker, session, clear_dags): + """ + When no explicit bundle_version is passed, the live dag drives TI creation and + created_dag_version points to the latest serialized version. DagRun.bundle_version + still records the DagModel.bundle_version for auditing purposes. + """ + with dag_maker( + dag_id="test_dag_bundle_version_integrity", + session=session, + serialized=True, + bundle_version="v1", + ) as _dag_v1: + EmptyOperator(task_id="t1") + + with dag_maker( + dag_id="test_dag_bundle_version_integrity", + session=session, + serialized=True, + bundle_version="v2", + ) as dag_v2: + EmptyOperator(task_id="t1") + EmptyOperator(task_id="t2") + + dag_model = session.scalar(select(DagModel).where(DagModel.dag_id == dag_v2.dag_id)) + dag_model.bundle_version = "v1" + session.commit() + + dr = dag_v2.create_dagrun( + run_id="bundle_version_integrity", + run_after=pendulum.now(), + run_type="manual", + triggered_by=DagRunTriggeredByType.TEST, + state=None, + ) + + # DagRun.bundle_version records the DagModel value at trigger time (audit field). + assert dr.bundle_version == "v1" + # created_dag_version reflects the latest serialized version (v2), not the DagModel audit value. + assert dr.created_dag_version.bundle_version == "v2" + # TIs come from the live dag (dag_v2 with t1+t2), not from the old serialized version. + assert {ti.task_id for ti in dr.get_task_instances(session=session)} == {"t1", "t2"} + + +def test_create_dagrun_callbacks_copied_to_resolved_bundle_version(dag_maker, session, clear_dags): + """Callbacks from the live dag are copied to the resolved (older) dag version.""" + with dag_maker( + dag_id="test_dag_callbacks_bundle_version", + session=session, + serialized=True, + bundle_version="v1", + ) as _dag_v1: + EmptyOperator(task_id="t1") + + with dag_maker( + dag_id="test_dag_callbacks_bundle_version", + session=session, + serialized=True, + bundle_version="v2", + ) as dag_v2: + EmptyOperator(task_id="t1") + EmptyOperator(task_id="t2") + + def some_callable(context): + pass + + dag_v2.on_failure_callback = some_callable + dag_v2.on_success_callback = some_callable + dag_v2.has_on_failure_callback = True + dag_v2.has_on_success_callback = True + + dr = dag_v2.create_dagrun( + run_id="callbacks_bundle_version", + run_after=pendulum.now(), + run_type="manual", + triggered_by=DagRunTriggeredByType.TEST, + state=None, + bundle_version="v1", + ) + + assert dr.dag.has_on_failure_callback is True + assert dr.dag.has_on_success_callback is True + assert dr.dag.on_failure_callback is some_callable + assert dr.dag.on_success_callback is some_callable + + +def test_create_dagrun_without_bundle_version_uses_live_dag(dag_maker, session, clear_dags): + """ + When no explicit bundle_version is passed, TIs are created from the live dag even if + DagModel.bundle_version points to an older version. This confirms backfills and other + callers that don't pass bundle_version are unaffected by the bundle_version feature. + """ + with dag_maker( + dag_id="test_dag_backfill_bundle_version", + session=session, + serialized=True, + bundle_version="v1", + ) as _dag_v1: + EmptyOperator(task_id="t1") + + with dag_maker( + dag_id="test_dag_backfill_bundle_version", + session=session, + serialized=True, + bundle_version="v2", + ) as dag_v2: + EmptyOperator(task_id="t1") + EmptyOperator(task_id="t2") + + dag_model = session.scalar(select(DagModel).where(DagModel.dag_id == dag_v2.dag_id)) + dag_model.bundle_version = "v1" + session.commit() + + dr = dag_v2.create_dagrun( + run_id="no_bundle_version_uses_live_dag", + run_after=pendulum.now(), + run_type="manual", + triggered_by=DagRunTriggeredByType.TEST, + state=None, + ) + + # TIs come from the live dag (dag_v2), not from the v1 serialized version. + assert {ti.task_id for ti in dr.get_task_instances(session=session)} == {"t1", "t2"} + # created_dag_version reflects the latest serialization (v2). + assert dr.created_dag_version.bundle_version == "v2" + # DagRun.bundle_version still records the DagModel value at trigger time. + assert dr.bundle_version == "v1" + + def test_get_run_data_interval(): with DAG("dag", schedule=None, start_date=DEFAULT_DATE) as dag: EmptyOperator(task_id="empty_task") diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index 276c8699de058..01fa2c4453816 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -645,6 +645,7 @@ class MaterializeAssetBody(BaseModel): conf: Annotated[dict[str, Any] | None, Field(title="Conf")] = None note: Annotated[str | None, Field(title="Note")] = None partition_key: Annotated[str | None, Field(title="Partition Key")] = None + bundle_version: Annotated[str | None, Field(title="Bundle Version")] = None class NewTaskResponse(BaseModel): @@ -933,6 +934,7 @@ class TriggerDAGRunPostBody(BaseModel): conf: Annotated[dict[str, Any] | None, Field(title="Conf")] = None note: Annotated[str | None, Field(title="Note")] = None partition_key: Annotated[str | None, Field(title="Partition Key")] = None + bundle_version: Annotated[str | None, Field(title="Bundle Version")] = None class TriggerResponse(BaseModel): diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index b5b100154c389..32eb410f7fd3d 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -694,6 +694,7 @@ class DagRun(BaseModel): consumed_asset_events: Annotated[list[AssetEventDagRunReference], Field(title="Consumed Asset Events")] partition_key: Annotated[str | None, Field(title="Partition Key")] = None note: Annotated[str | None, Field(title="Note")] = None + bundle_version: Annotated[str | None, Field(title="Bundle Version")] = None team_name: Annotated[str | None, Field(title="Team Name")] = None diff --git a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py index 3695af1fff592..1c555f352113f 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_supervisor.py +++ b/task-sdk/tests/task_sdk/execution_time/test_supervisor.py @@ -2248,6 +2248,7 @@ class RequestTestCase: "triggering_user_name": None, "type": "DagRunResult", "note": None, + "bundle_version": None, "team_name": None, }, client_mock=ClientMock( @@ -2300,6 +2301,7 @@ class RequestTestCase: "conf": None, "triggering_user_name": None, "note": None, + "bundle_version": None, "team_name": None, }, "type": "PreviousDagRunResult",