Skip to content
Open
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
473b066
feat: adding the option to add bundle version parameter on dag run tr…
itayweb Feb 6, 2026
ff00c06
removing code comments and adding the new parameter to the schema
itayweb Feb 6, 2026
7986f5c
feat: adding the new parameter to schema and types files
itayweb Feb 7, 2026
bf85c81
fix: fixing format and missing field in class
itayweb Feb 7, 2026
f804f59
fix: fixing the usage of bundle version in orm dagrun creation
itayweb Feb 8, 2026
f145651
feat: added migration file
itayweb Feb 12, 2026
22f5821
feat: adding unit test for bundle version parameter dagrun endpoint
itayweb Feb 18, 2026
05f720b
fix: fixing the unit test by removing unecessary asserts
itayweb Feb 18, 2026
1c39be7
fix: removing duplicate dag version check and raising exception accor…
itayweb Feb 20, 2026
a8953a9
fix: adding a raise exception when passing incorrect bundle version
itayweb Feb 20, 2026
e9d5e13
fix: verify exception message to raise accordingly
itayweb Mar 9, 2026
8acf8d0
fix: wiring to bundle version and moved to the correct version target
itayweb Mar 9, 2026
de92b93
fix: missed adding bundle_version field in 2 unit tests response asse…
itayweb Mar 10, 2026
2ea2cbd
fix: adding missing bundle version field in tests
itayweb Mar 10, 2026
aed386e
fix: handling dag and dag's tasks correctly when passing bundle version
itayweb Mar 10, 2026
3a68d9e
feat: added new airflow exception to handle no dag version has found
itayweb Mar 10, 2026
7ffcbaa
fix: assigning run dag to dag version when only using bundle version
itayweb Mar 11, 2026
292f3cc
fix: fixing assertion equal failing
itayweb Mar 11, 2026
770bfc4
fix: removed unecessary init files after prek hook added them
itayweb Mar 11, 2026
4073255
fix: adding support for all the dag-level callbacks
itayweb Mar 13, 2026
f472a96
fix: prek didn't updated datamodels for airflowctl
itayweb Apr 10, 2026
d128f41
fix: adding bundle version support also in asset route, fixing execut…
itayweb Apr 17, 2026
0ad1e72
fix: duplicate import and duplicate methods
itayweb Apr 19, 2026
b29b55e
Merge remote-tracking branch 'upstream/main' into feat-bundle-version…
itayweb Apr 20, 2026
69cacfc
Merge remote-tracking branch 'upstream/main' into feat-bundle-version…
itayweb Apr 29, 2026
de01c65
fix: remove duplicate DagVersion lookup and unintended dag swap for n…
itayweb May 2, 2026
8ab5493
fix: reverting uv lock due to unnecessary changes
itayweb May 2, 2026
9d1d28a
Merge remote-tracking branch 'upstream/main' into feat-bundle-version…
itayweb May 2, 2026
0f1c4c8
fix: fixing test to resolve ci errors
itayweb May 2, 2026
68e5d33
Merge remote-tracking branch 'upstream/main' into feat-bundle-version…
itayweb May 2, 2026
7d22da8
Merge remote-tracking branch 'upstream/main' into feat-bundle-version…
itayweb May 4, 2026
ab8682c
fix: restore uv.lock to upstream/main
itayweb May 4, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ class TriggerDAGRunPostBody(StrictBaseModel):
conf: dict | None = Field(default_factory=dict)
note: str | None = None
partition_key: str | None = None
bundle_version: str | None = None
Comment thread
pierrejeambrun marked this conversation as resolved.
Comment thread
itayweb marked this conversation as resolved.

@model_validator(mode="after")
def check_data_intervals(self):
Expand Down Expand Up @@ -141,6 +142,7 @@ def validate_context(self, dag: SerializedDAG) -> dict:
"conf": self.conf,
"note": self.note,
"partition_key": self.partition_key,
"bundle_version": self.bundle_version,
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13503,6 +13503,11 @@ components:
- type: string
- type: 'null'
title: Partition Key
bundle_version:
anyOf:
- type: string
- type: 'null'
title: Bundle Version
additionalProperties: false
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
AssetWatcherModel,
TaskOutletAssetReference,
)
from airflow.models.dag_version import DagVersion
from airflow.typing_compat import Unpack
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunTriggeredByType, DagRunType
Expand Down Expand Up @@ -427,7 +428,25 @@ def materialize_asset(
f"Dag with dag_id: '{dag_id}' does not allow asset materialization runs",
)

params = (body or MaterializeAssetBody()).validate_context(dag)
resolved_body = body or MaterializeAssetBody()

if resolved_body.bundle_version is not None:
if dag.disable_bundle_versioning:
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
f"DAG with dag_id: '{dag_id}' does not support bundle versioning",
)
dag_version = DagVersion.get_latest_version(
dag.dag_id, bundle_version=resolved_body.bundle_version, load_serialized_dag=True, session=session
)
if dag_version is None:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
f"DAG with dag_id: '{dag_id}' does not have a version for bundle_version '{resolved_body.bundle_version}'",
)
dag = dag_version.serialized_dag.dag

params = resolved_body.validate_context(dag)
return dag.create_dagrun(
run_id=params["run_id"],
logical_date=params["logical_date"],
Expand All @@ -439,6 +458,7 @@ def materialize_asset(
triggering_user_name=user.get_name(),
state=DagRunState.QUEUED,
partition_key=params["partition_key"],
bundle_version=resolved_body.bundle_version,
note=params["note"],
session=session,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
)
from airflow.api_fastapi.core_api.services.public.dag_run import DagRunWaiter
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.exceptions import AirflowException, AirflowNotFoundException
from airflow.listeners.listener import get_listener_manager
from airflow.models import DagModel, DagRun
from airflow.models.asset import AssetEvent
Expand Down Expand Up @@ -485,6 +486,23 @@ def trigger_dag_run(

try:
dag = get_latest_version_of_dag(dag_bag, dag_id, session)

if body.bundle_version is not None:
if dag.disable_bundle_versioning:
raise HTTPException(
status.HTTP_400_BAD_REQUEST,
f"DAG with dag_id: '{dag_id}' does not support bundle versioning",
)
dag_version = DagVersion.get_latest_version(
dag.dag_id, bundle_version=body.bundle_version, load_serialized_dag=True, session=session
Comment thread
itayweb marked this conversation as resolved.
)
if dag_version is None:
raise HTTPException(
status.HTTP_404_NOT_FOUND,
f"DAG with dag_id: '{dag_id}' does not have a version for bundle_version '{body.bundle_version}'",
)
dag = dag_version.serialized_dag.dag

params = body.validate_context(dag)
Comment thread
itayweb marked this conversation as resolved.

Comment thread
itayweb marked this conversation as resolved.
dag_run = dag.create_dagrun(
Expand All @@ -498,6 +516,7 @@ def trigger_dag_run(
triggering_user_name=user.get_name(),
state=DagRunState.QUEUED,
partition_key=params["partition_key"],
bundle_version=body.bundle_version,
session=session,
)

Expand All @@ -506,6 +525,10 @@ def trigger_dag_run(
current_user_id = user.get_id()
dag_run.note = (dag_run_note, current_user_id)
return dag_run
except AirflowNotFoundException as e:
raise HTTPException(status.HTTP_404_NOT_FOUND, str(e))
Comment thread
itayweb marked this conversation as resolved.
except AirflowException as e:
raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e))
except ValueError as e:
raise HTTPException(status.HTTP_400_BAD_REQUEST, str(e))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ class DagRun(StrictBaseModel):
consumed_asset_events: list[AssetEventDagRunReference]
partition_key: str | None
note: str | None = None
bundle_version: str | None = None

@model_validator(mode="before")
@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from airflow.api_fastapi.execution_api.versions.v2025_10_27 import MakeDagRunConfNullable
from airflow.api_fastapi.execution_api.versions.v2025_11_05 import AddTriggeringUserNameField
from airflow.api_fastapi.execution_api.versions.v2026_04_06 import (
AddBundleVersionField,
AddDagEndpoint,
AddDagRunDetailEndpoint,
AddNoteField,
Expand All @@ -47,6 +48,7 @@
AddPartitionKeyField,
MovePreviousRunEndpoint,
AddDagRunDetailEndpoint,
AddBundleVersionField,
MakeDagRunStartDateNullable,
ModifyDeferredTaskKwargsToJsonValue,
RemoveUpstreamMapIndexesField,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,22 @@ class AddDagEndpoint(VersionChange):
description = __doc__

instructions_to_migrate_to_previous_version = (endpoint("/dags/{dag_id}", ["GET"]).didnt_exist,)


class AddBundleVersionField(VersionChange):
"""Add the `bundle_version` field to DagRun model."""

description = __doc__

instructions_to_migrate_to_previous_version = (schema(DagRun).field("bundle_version").didnt_exist,)

@convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type]
def remove_bundle_version_from_dag_run(response: ResponseInfo) -> None: # type: ignore[misc]
"""Remove the `bundle_version` field from the dag_run object when converting to the previous version."""
if "dag_run" in response.body and isinstance(response.body["dag_run"], dict):
response.body["dag_run"].pop("bundle_version", None)

Comment thread
itayweb marked this conversation as resolved.
@convert_response_to_previous_version_for(DagRun) # type: ignore[arg-type]
def remove_bundle_version_from_dag_run_response(response: ResponseInfo) -> None: # type: ignore[misc]
Comment thread
itayweb marked this conversation as resolved.
"""Remove the `bundle_version` field from direct DagRun responses for previous API versions."""
response.body.pop("bundle_version", None)
4 changes: 4 additions & 0 deletions airflow-core/src/airflow/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@ class DagRunNotFound(AirflowNotFoundException):
"""Raise when a DAG Run is not available in the system."""


class DagVersionNotFound(AirflowNotFoundException):
"""Raised when a DagVersion for the given dag_id / bundle_version is not found."""


class DagRunAlreadyExists(AirflowBadRequest):
"""Raise when creating a DAG run for DAG which already has DAG run entry."""

Expand Down
7 changes: 7 additions & 0 deletions airflow-core/src/airflow/models/dag_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def _latest_version_select(
bundle_version: str | None = None,
load_dag_model: bool = False,
load_bundle_model: bool = False,
load_serialized_dag: bool = False,
) -> Select:
"""
Get the select object to get the latest version of the DAG.
Expand All @@ -165,6 +166,9 @@ def _latest_version_select(
if load_bundle_model:
query = query.options(joinedload(cls.bundle))

Comment thread
itayweb marked this conversation as resolved.
if load_serialized_dag:
query = query.options(joinedload(cls.serialized_dag))

query = query.order_by(cls.created_at.desc()).limit(1)
return query

Expand All @@ -177,6 +181,7 @@ def get_latest_version(
bundle_version: str | None = None,
load_dag_model: bool = False,
load_bundle_model: bool = False,
load_serialized_dag: bool = False,
session: Session = NEW_SESSION,
) -> DagVersion | None:
"""
Expand All @@ -186,6 +191,7 @@ def get_latest_version(
:param session: The database session.
:param load_dag_model: Whether to load the DAG model.
:param load_bundle_model: Whether to load the DagBundle model.
:param load_serialized_dag: Whether to eagerly load the serialized DAG.
:return: The latest version of the DAG or None if not found.
"""
return session.scalar(
Expand All @@ -194,6 +200,7 @@ def get_latest_version(
bundle_version=bundle_version,
load_dag_model=load_dag_model,
load_bundle_model=load_bundle_model,
load_serialized_dag=load_serialized_dag,
)
)

Expand Down
58 changes: 51 additions & 7 deletions airflow-core/src/airflow/serialization/definitions/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

from airflow._shared.timezones.timezone import coerce_datetime
from airflow.configuration import conf as airflow_conf
from airflow.exceptions import AirflowException, TaskNotFound
from airflow.exceptions import AirflowException, DagVersionNotFound, TaskNotFound
from airflow.models.dag import DagModel
from airflow.models.dag_version import DagVersion
from airflow.models.dagrun import DagRun
Expand Down Expand Up @@ -69,6 +69,17 @@

log = structlog.get_logger(__name__)

_DAG_CALLBACK_ATTRS = (
"sla_miss_callback",
"on_success_callback",
"on_failure_callback",
"on_retry_callback",
"on_execute_callback",
"on_skipped_callback",
"has_on_success_callback",
"has_on_failure_callback",
Comment thread
itayweb marked this conversation as resolved.
)


# TODO (GH-52141): Share definition with SDK?
class EdgeInfoType(TypedDict):
Expand Down Expand Up @@ -502,6 +513,7 @@ def create_dagrun(
partition_key: str | None = None,
partition_date: datetime.datetime | None = None,
note: str | None = None,
bundle_version: str | None = None,
session: Session = NEW_SESSION,
) -> DagRun:
"""
Expand Down Expand Up @@ -589,6 +601,7 @@ def create_dagrun(
partition_key=partition_key,
partition_date=partition_date,
note=note,
bundle_version=bundle_version,
session=session,
)

Expand Down Expand Up @@ -1160,14 +1173,39 @@ def _create_orm_dagrun(
partition_key: str | None = None,
partition_date: datetime.datetime | None = None,
note: str | None = None,
bundle_version: str | None = None,
Comment thread
itayweb marked this conversation as resolved.
session: Session = NEW_SESSION,
) -> DagRun:
bundle_version = None
resolved_bundle_version: str | None = None
use_resolved_dag = False
if not dag.disable_bundle_versioning:
bundle_version = session.scalar(
select(DagModel.bundle_version).where(DagModel.dag_id == dag.dag_id),
)
dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)
if bundle_version is not None:
resolved_bundle_version = bundle_version
dag_version = DagVersion.get_latest_version(
dag.dag_id, bundle_version=resolved_bundle_version, load_serialized_dag=True, session=session
)
if not dag_version:
raise DagVersionNotFound(
f"DAG with dag_id: '{dag.dag_id}' does not have a version for bundle_version '{bundle_version}'"
Comment thread
itayweb marked this conversation as resolved.
Outdated
)
use_resolved_dag = True
else:
resolved_bundle_version = session.scalar(
select(DagModel.bundle_version).where(DagModel.dag_id == dag.dag_id)
)
dag_version = DagVersion.get_latest_version(
dag.dag_id,
bundle_version=resolved_bundle_version,
load_serialized_dag=resolved_bundle_version is not None,
session=session,
)
if dag_version is None and resolved_bundle_version is not None:
dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)
elif dag_version is not None and resolved_bundle_version is not None:
use_resolved_dag = True
Comment thread
itayweb marked this conversation as resolved.
Outdated
else:
dag_version = DagVersion.get_latest_version(dag.dag_id, session=session)

if not dag_version:
raise AirflowException(f"Cannot create DagRun for DAG {dag.dag_id} because the dag is not serialized")

Expand All @@ -1185,7 +1223,7 @@ def _create_orm_dagrun(
triggered_by=triggered_by,
triggering_user_name=triggering_user_name,
backfill_id=backfill_id,
bundle_version=bundle_version,
bundle_version=resolved_bundle_version,
partition_key=partition_key,
partition_date=partition_date,
note=note,
Expand All @@ -1198,6 +1236,12 @@ def _create_orm_dagrun(
session.add(run)
session.flush()
run.dag = dag
if use_resolved_dag:
resolved_dag = dag_version.serialized_dag.dag
for attr in _DAG_CALLBACK_ATTRS:
if hasattr(dag, attr):
setattr(resolved_dag, attr, getattr(dag, attr)) # type: ignore[attr-defined]
run.dag = resolved_dag
# create the associated task instances
# state is None at the moment of creation
run.verify_integrity(session=session, dag_version_id=dag_version.id)
Expand Down
11 changes: 11 additions & 0 deletions airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6763,6 +6763,17 @@ export const $TriggerDAGRunPostBody = {
}
],
title: 'Partition Key'
},
bundle_version: {
anyOf: [
{
type: 'string'
},
{
type: 'null'
}
],
title: 'Bundle Version'
}
},
additionalProperties: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1617,6 +1617,7 @@ export type TriggerDAGRunPostBody = {
} | null;
note?: string | null;
partition_key?: string | null;
bundle_version?: string | null;
};

/**
Expand Down
Loading
Loading