Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 24 additions & 12 deletions airflow-core/src/airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,17 +741,24 @@ def write_dag(
)
)

if getattr(result, "rowcount", 0) == 0:
# No rows updated - serialized DAG doesn't exist
return False
serialized_dag_existed = getattr(result, "rowcount", 0) > 0

if deadline_uuid_mapping:
updated_serialized_dag = session.scalar(
select(cls).where(cls.dag_version_id == dag_version.id)
)
if updated_serialized_dag:
updated_serialized_dag.deadline_alerts.clear()
cls._create_deadline_alert_records(updated_serialized_dag, deadline_uuid_mapping)
if serialized_dag_existed:
if deadline_uuid_mapping:
updated_serialized_dag = session.scalar(
select(cls).where(cls.dag_version_id == dag_version.id)
)
if updated_serialized_dag:
updated_serialized_dag.deadline_alerts.clear()
cls._create_deadline_alert_records(updated_serialized_dag, deadline_uuid_mapping)
else:
# The DagVersion exists but has no serialized_dag row to update — e.g. a prior write
# created the version without persisting the serialized DAG. Insert it for this
# version rather than returning early, which would otherwise leave the DAG
# permanently unserialized on every subsequent parse.
new_serialized_dag.dag_version = dag_version
session.add(new_serialized_dag)
cls._create_deadline_alert_records(new_serialized_dag, deadline_uuid_mapping)

# The dag_version and dag_code may not have changed, still we should
# do the below actions:
Expand All @@ -760,8 +767,13 @@ def write_dag(
dag_version.bundle_version = bundle_version
dag_version.version_data = version_data
session.merge(dag_version)
# Update the latest DagCode
DagCode.update_source_code(dag_id=dag.dag_id, fileloc=dag.fileloc, session=session)
# Refresh the latest DagCode; write it when this version has none yet (the repair case).
if serialized_dag_existed or session.scalar(
select(exists().where(DagCode.dag_version_id == dag_version.id))
):
DagCode.update_source_code(dag_id=dag.dag_id, fileloc=dag.fileloc, session=session)
else:
DagCode.write_code(dag_version, dag.fileloc, session=session)
return True

dagv = DagVersion.write_dag(
Expand Down
83 changes: 77 additions & 6 deletions airflow-core/tests/unit/models/test_serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -772,11 +772,17 @@ def __init__(self, *, task_id: str, **kwargs):
# Hashes should be identical
assert hash_1 == hash_2, "Hashes should be identical when dicts are sorted consistently"

def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session):
def test_dynamic_dag_update_reserializes_when_serdag_missing(self, dag_maker, session):
"""
Test that dynamic DAG update gracefully handles case where SerializedDagModel doesn't exist.
This preserves the null-check fix from PR #56422 and tests the direct UPDATE path.
A DagVersion with no SerializedDagModel row must be re-serialized, not skipped.

When a version exists (and has no task instances) but its serialized_dag row is
gone, the in-place UPDATE matches no rows. write_dag must insert the missing
serialized DAG (and its DagCode) for that version instead of returning False,
which would otherwise leave the DAG permanently unserialized on every parse.
"""
from airflow.models.dagcode import DagCode

with dag_maker(dag_id="test_missing_serdag", serialized=True, session=session) as dag:
EmptyOperator(task_id="task1")

Expand All @@ -796,23 +802,88 @@ def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session):
)
assert dag_version is not None

# Manually delete SerializedDagModel (simulates edge case)
# Manually delete SerializedDagModel and DagCode (simulates the orphaned-version edge case)
session.execute(delete(SDM).where(SDM.dag_id == "test_missing_serdag"))
session.execute(delete(DagCode).where(DagCode.dag_version_id == dag_version.id))
session.commit()

# Verify no SerializedDagModel exists
assert SDM.get("test_missing_serdag", session=session) is None

# Try to update - should return False gracefully (not crash)
# Re-write - should re-create the serialized DAG for the existing version and report True.
result = SDM.write_dag(
dag=lazy_dag,
bundle_name="test_bundle",
bundle_version=None,
min_update_interval=None,
session=session,
)
session.commit()

assert result is True
recreated = SDM.get("test_missing_serdag", session=session)
assert recreated is not None
assert recreated.dag_version_id == dag_version.id
# The serialized DAG was attached to the existing version, not a new one.
assert (
session.scalar(
select(func.count()).select_from(DagVersion).where(DagVersion.dag_id == "test_missing_serdag")
)
== 1
)
# DagCode for the version is written back so the repaired DAG is runnable.
assert (
session.scalar(
select(func.count()).select_from(DagCode).where(DagCode.dag_version_id == dag_version.id)
)
== 1
)

def test_dynamic_dag_update_reserializes_when_serdag_missing_but_code_present(self, dag_maker, session):
"""
Re-serialize a version whose serialized_dag row is gone while its DagCode remains.

The repair path must reuse the existing DagCode (refresh in place) rather than
writing a second row for the same version, which would violate the unique
constraint on ``dag_code.dag_version_id``.
"""
from airflow.models.dagcode import DagCode

with dag_maker(dag_id="test_missing_serdag_keep_code", serialized=True, session=session) as dag:
EmptyOperator(task_id="task1")

lazy_dag = LazyDeserializedDAG.from_dag(dag)
SDM.write_dag(dag=lazy_dag, bundle_name="test_bundle", bundle_version=None, session=session)
session.commit()

dag_version = session.scalar(
select(DagVersion).where(DagVersion.dag_id == "test_missing_serdag_keep_code").limit(1)
)
assert dag_version is not None

# Drop only the serialized_dag row; keep its DagCode.
session.execute(delete(SDM).where(SDM.dag_id == "test_missing_serdag_keep_code"))
session.commit()
assert SDM.get("test_missing_serdag_keep_code", session=session) is None

result = SDM.write_dag(
dag=lazy_dag,
bundle_name="test_bundle",
bundle_version=None,
min_update_interval=None,
session=session,
)
session.commit()

assert result is False # Should return False when SerializedDagModel is missing
assert result is True
assert SDM.get("test_missing_serdag_keep_code", session=session) is not None
# No duplicate DagCode row was created for the version.
assert (
session.scalar(
select(func.count()).select_from(DagCode).where(DagCode.dag_version_id == dag_version.id)
)
== 1
)

def test_dynamic_dag_update_success(self, dag_maker, session):
"""
Expand Down