From 42b4093b778a99e02bc74957f5b75bfaa75393b7 Mon Sep 17 00:00:00 2001 From: Kunal Soni Date: Wed, 24 Jun 2026 16:56:07 +0000 Subject: [PATCH] Re-serialize DAG when SerializedDagModel is missing --- .../src/airflow/models/serialized_dag.py | 36 +++++--- .../tests/unit/models/test_serialized_dag.py | 83 +++++++++++++++++-- 2 files changed, 101 insertions(+), 18 deletions(-) diff --git a/airflow-core/src/airflow/models/serialized_dag.py b/airflow-core/src/airflow/models/serialized_dag.py index ce0333653114e..f9c6c6a326293 100644 --- a/airflow-core/src/airflow/models/serialized_dag.py +++ b/airflow-core/src/airflow/models/serialized_dag.py @@ -741,17 +741,24 @@ def write_dag( ) ) - if getattr(result, "rowcount", 0) == 0: - # No rows updated - serialized DAG doesn't exist - return False + serialized_dag_existed = getattr(result, "rowcount", 0) > 0 - if deadline_uuid_mapping: - updated_serialized_dag = session.scalar( - select(cls).where(cls.dag_version_id == dag_version.id) - ) - if updated_serialized_dag: - updated_serialized_dag.deadline_alerts.clear() - cls._create_deadline_alert_records(updated_serialized_dag, deadline_uuid_mapping) + if serialized_dag_existed: + if deadline_uuid_mapping: + updated_serialized_dag = session.scalar( + select(cls).where(cls.dag_version_id == dag_version.id) + ) + if updated_serialized_dag: + updated_serialized_dag.deadline_alerts.clear() + cls._create_deadline_alert_records(updated_serialized_dag, deadline_uuid_mapping) + else: + # The DagVersion exists but has no serialized_dag row to update — e.g. a prior write + # created the version without persisting the serialized DAG. Insert it for this + # version rather than returning early, which would otherwise leave the DAG + # permanently unserialized on every subsequent parse. + new_serialized_dag.dag_version = dag_version + session.add(new_serialized_dag) + cls._create_deadline_alert_records(new_serialized_dag, deadline_uuid_mapping) # The dag_version and dag_code may not have changed, still we should # do the below actions: @@ -760,8 +767,13 @@ def write_dag( dag_version.bundle_version = bundle_version dag_version.version_data = version_data session.merge(dag_version) - # Update the latest DagCode - DagCode.update_source_code(dag_id=dag.dag_id, fileloc=dag.fileloc, session=session) + # Refresh the latest DagCode; write it when this version has none yet (the repair case). + if serialized_dag_existed or session.scalar( + select(exists().where(DagCode.dag_version_id == dag_version.id)) + ): + DagCode.update_source_code(dag_id=dag.dag_id, fileloc=dag.fileloc, session=session) + else: + DagCode.write_code(dag_version, dag.fileloc, session=session) return True dagv = DagVersion.write_dag( diff --git a/airflow-core/tests/unit/models/test_serialized_dag.py b/airflow-core/tests/unit/models/test_serialized_dag.py index a1ad63de5d688..6924923612eea 100644 --- a/airflow-core/tests/unit/models/test_serialized_dag.py +++ b/airflow-core/tests/unit/models/test_serialized_dag.py @@ -772,11 +772,17 @@ def __init__(self, *, task_id: str, **kwargs): # Hashes should be identical assert hash_1 == hash_2, "Hashes should be identical when dicts are sorted consistently" - def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session): + def test_dynamic_dag_update_reserializes_when_serdag_missing(self, dag_maker, session): """ - Test that dynamic DAG update gracefully handles case where SerializedDagModel doesn't exist. - This preserves the null-check fix from PR #56422 and tests the direct UPDATE path. + A DagVersion with no SerializedDagModel row must be re-serialized, not skipped. + + When a version exists (and has no task instances) but its serialized_dag row is + gone, the in-place UPDATE matches no rows. write_dag must insert the missing + serialized DAG (and its DagCode) for that version instead of returning False, + which would otherwise leave the DAG permanently unserialized on every parse. """ + from airflow.models.dagcode import DagCode + with dag_maker(dag_id="test_missing_serdag", serialized=True, session=session) as dag: EmptyOperator(task_id="task1") @@ -796,14 +802,70 @@ def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session): ) assert dag_version is not None - # Manually delete SerializedDagModel (simulates edge case) + # Manually delete SerializedDagModel and DagCode (simulates the orphaned-version edge case) session.execute(delete(SDM).where(SDM.dag_id == "test_missing_serdag")) + session.execute(delete(DagCode).where(DagCode.dag_version_id == dag_version.id)) session.commit() # Verify no SerializedDagModel exists assert SDM.get("test_missing_serdag", session=session) is None - # Try to update - should return False gracefully (not crash) + # Re-write - should re-create the serialized DAG for the existing version and report True. + result = SDM.write_dag( + dag=lazy_dag, + bundle_name="test_bundle", + bundle_version=None, + min_update_interval=None, + session=session, + ) + session.commit() + + assert result is True + recreated = SDM.get("test_missing_serdag", session=session) + assert recreated is not None + assert recreated.dag_version_id == dag_version.id + # The serialized DAG was attached to the existing version, not a new one. + assert ( + session.scalar( + select(func.count()).select_from(DagVersion).where(DagVersion.dag_id == "test_missing_serdag") + ) + == 1 + ) + # DagCode for the version is written back so the repaired DAG is runnable. + assert ( + session.scalar( + select(func.count()).select_from(DagCode).where(DagCode.dag_version_id == dag_version.id) + ) + == 1 + ) + + def test_dynamic_dag_update_reserializes_when_serdag_missing_but_code_present(self, dag_maker, session): + """ + Re-serialize a version whose serialized_dag row is gone while its DagCode remains. + + The repair path must reuse the existing DagCode (refresh in place) rather than + writing a second row for the same version, which would violate the unique + constraint on ``dag_code.dag_version_id``. + """ + from airflow.models.dagcode import DagCode + + with dag_maker(dag_id="test_missing_serdag_keep_code", serialized=True, session=session) as dag: + EmptyOperator(task_id="task1") + + lazy_dag = LazyDeserializedDAG.from_dag(dag) + SDM.write_dag(dag=lazy_dag, bundle_name="test_bundle", bundle_version=None, session=session) + session.commit() + + dag_version = session.scalar( + select(DagVersion).where(DagVersion.dag_id == "test_missing_serdag_keep_code").limit(1) + ) + assert dag_version is not None + + # Drop only the serialized_dag row; keep its DagCode. + session.execute(delete(SDM).where(SDM.dag_id == "test_missing_serdag_keep_code")) + session.commit() + assert SDM.get("test_missing_serdag_keep_code", session=session) is None + result = SDM.write_dag( dag=lazy_dag, bundle_name="test_bundle", @@ -811,8 +873,17 @@ def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session): min_update_interval=None, session=session, ) + session.commit() - assert result is False # Should return False when SerializedDagModel is missing + assert result is True + assert SDM.get("test_missing_serdag_keep_code", session=session) is not None + # No duplicate DagCode row was created for the version. + assert ( + session.scalar( + select(func.count()).select_from(DagCode).where(DagCode.dag_version_id == dag_version.id) + ) + == 1 + ) def test_dynamic_dag_update_success(self, dag_maker, session): """