Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ def write_dag(
)
)

if dag_version and not has_task_instances:
if dag_version and not has_task_instances and serialized_dag_hash is not None:
# This is for dynamic DAGs that the hashes changes often. We should update
# the serialized dag, the dag_version and the dag_code instead of a new version
# if the dag_version is not associated with any task instances
Expand Down
21 changes: 21 additions & 0 deletions airflow-core/tests/unit/models/test_serialized_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,27 @@ def test_new_dag_versions_are_created_if_there_is_a_dagrun(self, dag_maker, sess
assert session.scalar(select(func.count()).select_from(DagVersion)) == 2
assert session.scalar(select(func.count()).select_from(SDM)) == 2

def test_new_dag_version_is_created_when_version_exists_but_serialized_dag_row_missing(
self, dag_maker, session
):
with dag_maker("dag1") as dag:
PythonOperator(task_id="task1", python_callable=lambda: None)
assert session.scalar(select(func.count()).select_from(SDM)) == 1
assert session.scalar(select(func.count()).select_from(DagVersion)) == 1

# Simulate the broken state: DagVersion present, serialized_dag row gone.
session.execute(delete(SDM).where(SDM.dag_id == dag.dag_id))
session.flush()
assert session.scalar(select(func.count()).select_from(SDM)) == 0
assert session.scalar(select(func.count()).select_from(DagVersion)) == 1

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am just not sure how this state you are simulating can arise under normal operation. The test simulates the issue by manually deleting the SerializedDagModel row while leaving the DagVersion row intact. Since DAGs are serialized during parsing, I would expect DagVersion and SerializedDagModel to be created together under normal operation.

Could you expand the issue/PR description to explain how this state can occur in practice? Is this intended to recover from metadata corruption, failed migrations, manual database modification, or some known failure during serialization?

Furthermore, have you ran into this specific failure mode yourself?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I've ran into this myself, but I am not sure what caused it, I am also running a custom application built on top of airflow so most likely it was not caused by airflow itself (I suspect it happened when upgrading from airflow 2 to 3 but there is lots of custom code, sometimes too tightly coupled to airflow and it would be a lot of effort to determine exactly how it happened)
I'd say that there being nothing preventing you from getting into this state + the user being able to manually break their dags by editing the db is reason enough to have this

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I've ran into this myself, but I am not sure what caused it, I am also running a custom application built on top of airflow so most likely it was not caused by airflow itself (I suspect it happened when upgrading from airflow 2 to 3 but there is lots of custom code, sometimes too tightly coupled to airflow and it would be a lot of effort to determine exactly how it happened) I'd say that there being nothing preventing you from getting into this state + the user being able to manually break their dags by editing the db is reason enough to have this

I would add all the detail you can in both the issue and the PR description (please include the what, why and how as well as any implications for backwards compatibility).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


SDM.write_dag(LazyDeserializedDAG.from_dag(dag), bundle_name="dag_maker")

# A new version and serialized row must have been created.
assert session.scalar(select(func.count()).select_from(DagVersion)) == 2
assert session.scalar(select(func.count()).select_from(SDM)) == 1
assert SDM.get(dag.dag_id, session=session) is not None

def test_example_dag_sorting_serialised_dag(self, session):
"""
This test asserts if different dag ids -- simple or complex, can be sorted
Expand Down
Loading