From 5a2b5f420519d90b459eab75707c269c6c29487b Mon Sep 17 00:00:00 2001 From: Kalin Stoyanov Date: Wed, 24 Jun 2026 16:49:14 +0300 Subject: [PATCH 1/2] Serialize dag if it is not serialized even if it does not have task instances --- .../src/airflow/models/serialized_dag.py | 2 +- .../tests/unit/models/test_serialized_dag.py | 21 +++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index ce0333653114e..1a9bc5eb45454 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -723,7 +723,7 @@ def write_dag( ) ) - if dag_version and not has_task_instances: + if dag_version and not has_task_instances and serialized_dag_hash is not None: # This is for dynamic DAGs that the hashes changes often. We should update # the serialized dag, the dag_version and the dag_code instead of a new version # if the dag_version is not associated with any task instances diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py b/airflow-core/tests/unit/models/test_serialized_dag.py index a1ad63de5d688..67c5922ad84fe 100644 --- a/airflow-core/tests/unit/models/test_serialized_dag.py +++ b/airflow-core/tests/unit/models/test_serialized_dag.py @@ -423,6 +423,27 @@ def test_new_dag_versions_are_created_if_there_is_a_dagrun(self, dag_maker, sess assert session.scalar(select(func.count()).select_from(DagVersion)) == 2 assert session.scalar(select(func.count()).select_from(SDM)) == 2 + def test_new_dag_version_is_created_when_version_exists_but_serialized_dag_row_missing( + self, dag_maker, session + ): + with dag_maker("dag1") as dag: + PythonOperator(task_id="task1", python_callable=lambda: None) + assert session.scalar(select(func.count()).select_from(SDM)) == 1 + assert session.scalar(select(func.count()).select_from(DagVersion)) == 1 + + # Simulate the broken state: DagVersion present, serialized_dag row gone. + session.execute(delete(SDM).where(SDM.dag_id == dag.dag_id)) + session.flush() + assert session.scalar(select(func.count()).select_from(SDM)) == 0 + assert session.scalar(select(func.count()).select_from(DagVersion)) == 1 + + SDM.write_dag(LazyDeserializedDAG.from_dag(dag), bundle_name="dag_maker") + + # A new version and serialized row must have been created. + assert session.scalar(select(func.count()).select_from(DagVersion)) == 2 + assert session.scalar(select(func.count()).select_from(SDM)) == 1 + assert SDM.get(dag.dag_id, session=session) is not None + def test_example_dag_sorting_serialised_dag(self, session): """ This test asserts if different dag ids -- simple or complex, can be sorted From f2a8147db6511ed36bc8980609ed62136294b937 Mon Sep 17 00:00:00 2001 From: Kalin Stoyanov Date: Thu, 25 Jun 2026 14:45:07 +0300 Subject: [PATCH 2/2] update unit test --- .../tests/unit/models/test_serialized_dag.py | 34 +++++-------------- 1 file changed, 9 insertions(+), 25 deletions(-) diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py b/airflow-core/tests/unit/models/test_serialized_dag.py index 67c5922ad84fe..a72c604872ea1 100644 --- a/airflow-core/tests/unit/models/test_serialized_dag.py +++ b/airflow-core/tests/unit/models/test_serialized_dag.py @@ -423,27 +423,6 @@ def test_new_dag_versions_are_created_if_there_is_a_dagrun(self, dag_maker, sess assert session.scalar(select(func.count()).select_from(DagVersion)) == 2 assert session.scalar(select(func.count()).select_from(SDM)) == 2 - def test_new_dag_version_is_created_when_version_exists_but_serialized_dag_row_missing( - self, dag_maker, session - ): - with dag_maker("dag1") as dag: - PythonOperator(task_id="task1", python_callable=lambda: None) - assert session.scalar(select(func.count()).select_from(SDM)) == 1 - assert session.scalar(select(func.count()).select_from(DagVersion)) == 1 - - # Simulate the broken state: DagVersion present, serialized_dag row gone. - session.execute(delete(SDM).where(SDM.dag_id == dag.dag_id)) - session.flush() - assert session.scalar(select(func.count()).select_from(SDM)) == 0 - assert session.scalar(select(func.count()).select_from(DagVersion)) == 1 - - SDM.write_dag(LazyDeserializedDAG.from_dag(dag), bundle_name="dag_maker") - - # A new version and serialized row must have been created. - assert session.scalar(select(func.count()).select_from(DagVersion)) == 2 - assert session.scalar(select(func.count()).select_from(SDM)) == 1 - assert SDM.get(dag.dag_id, session=session) is not None - def test_example_dag_sorting_serialised_dag(self, session): """ This test asserts if different dag ids -- simple or complex, can be sorted @@ -793,10 +772,12 @@ def __init__(self, *, task_id: str, **kwargs): # Hashes should be identical assert hash_1 == hash_2, "Hashes should be identical when dicts are sorted consistently" - def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session): + def test_new_dag_version_is_created_when_version_exists_but_serialized_dag_row_missing( + self, dag_maker, session + ): """ Test that dynamic DAG update gracefully handles case where SerializedDagModel doesn't exist. - This preserves the null-check fix from PR #56422 and tests the direct UPDATE path. + This adds the serialized dag entry """ with dag_maker(dag_id="test_missing_serdag", serialized=True, session=session) as dag: EmptyOperator(task_id="task1") @@ -824,7 +805,7 @@ def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session): # Verify no SerializedDagModel exists assert SDM.get("test_missing_serdag", session=session) is None - # Try to update - should return False gracefully (not crash) + # Try to update - should create the serialized dag result = SDM.write_dag( dag=lazy_dag, bundle_name="test_bundle", @@ -832,8 +813,11 @@ def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session): min_update_interval=None, session=session, ) + session.commit() - assert result is False # Should return False when SerializedDagModel is missing + serialized_dag = session.scalar(select(SDM).where(SDM.dag_id == "test_missing_serdag").limit(1)) + assert serialized_dag is not None + assert result is True def test_dynamic_dag_update_success(self, dag_maker, session): """