Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ def write_dag(
)
)

if dag_version and not has_task_instances:
if dag_version and not has_task_instances and serialized_dag_hash is not None:
# This is for dynamic DAGs that the hashes changes often. We should update
# the serialized dag, the dag_version and the dag_code instead of a new version
# if the dag_version is not associated with any task instances
Expand Down
13 changes: 9 additions & 4 deletions airflow-core/tests/unit/models/test_serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -772,10 +772,12 @@ def __init__(self, *, task_id: str, **kwargs):
# Hashes should be identical
assert hash_1 == hash_2, "Hashes should be identical when dicts are sorted consistently"

def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session):
def test_new_dag_version_is_created_when_version_exists_but_serialized_dag_row_missing(
self, dag_maker, session
):
"""
Test that dynamic DAG update gracefully handles case where SerializedDagModel doesn't exist.
This preserves the null-check fix from PR #56422 and tests the direct UPDATE path.
This adds the serialized dag entry
"""
with dag_maker(dag_id="test_missing_serdag", serialized=True, session=session) as dag:
EmptyOperator(task_id="task1")
Expand Down Expand Up @@ -803,16 +805,19 @@ def test_dynamic_dag_update_preserves_null_check(self, dag_maker, session):
# Verify no SerializedDagModel exists
assert SDM.get("test_missing_serdag", session=session) is None

# Try to update - should return False gracefully (not crash)
# Try to update - should create the serialized dag
result = SDM.write_dag(
dag=lazy_dag,
bundle_name="test_bundle",
bundle_version=None,
min_update_interval=None,
session=session,
)
session.commit()

assert result is False # Should return False when SerializedDagModel is missing
serialized_dag = session.scalar(select(SDM).where(SDM.dag_id == "test_missing_serdag").limit(1))
assert serialized_dag is not None
assert result is True

def test_dynamic_dag_update_success(self, dag_maker, session):
"""
Expand Down