diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index ce0333653114e..1a9bc5eb45454 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -723,7 +723,7 @@ def write_dag( ) ) - if dag_version and not has_task_instances: + if dag_version and not has_task_instances and serialized_dag_hash is not None: # This is for dynamic DAGs that the hashes changes often. We should update # the serialized dag, the dag_version and the dag_code instead of a new version # if the dag_version is not associated with any task instances diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py b/airflow-core/tests/unit/models/test_serialized_dag.py index a1ad63de5d688..a72c604872ea1 100644 --- a/airflow-core/tests/unit/models/test_serialized_dag.py +++ b/airflow-core/tests/unit/models/test_serialized_dag.py @@ -772,10 +772,12 @@ def __init__(self, *, task_id: str, **kwargs): # Hashes should be identical assert hash_1 == hash_2, "Hashes should be identical when dicts are sorted consistently" - def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session): + def test_new_dag_version_is_created_when_version_exists_but_serialized_dag_row_missing( + self, dag_maker, session + ): """ Test that dynamic DAG update gracefully handles case where SerializedDagModel doesn't exist. - This preserves the null-check fix from PR #56422 and tests the direct UPDATE path. + This adds the serialized dag entry """ with dag_maker(dag_id="test_missing_serdag", serialized=True, session=session) as dag: EmptyOperator(task_id="task1") @@ -803,7 +805,7 @@ def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session): # Verify no SerializedDagModel exists assert SDM.get("test_missing_serdag", session=session) is None - # Try to update - should return False gracefully (not crash) + # Try to update - should create the serialized dag result = SDM.write_dag( dag=lazy_dag, bundle_name="test_bundle", @@ -811,8 +813,11 @@ def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session): min_update_interval=None, session=session, ) + session.commit() - assert result is False # Should return False when SerializedDagModel is missing + serialized_dag = session.scalar(select(SDM).where(SDM.dag_id == "test_missing_serdag").limit(1)) + assert serialized_dag is not None + assert result is True def test_dynamic_dag_update_success(self, dag_maker, session): """