Fix SQLite pragma teardown in DB migrations#64838
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst)
|
There was a problem hiding this comment.
Pull request overview
This PR aims to make SQLite foreign-key PRAGMA handling in Alembic migrations reliable by ensuring foreign_keys=ON is always restored (even when a migration errors), and by updating selected migration scripts to use a shared helper along with new regression tests.
Changes:
- Make
disable_sqlite_fkeys()exception-safe so PRAGMA teardown happens even on errors. - Update several existing migration scripts to use
disable_sqlite_fkeys()instead of manual PRAGMA toggles. - Add unit tests covering success/exception/no-op behavior of
disable_sqlite_fkeys().
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| airflow-core/src/airflow/migrations/utils.py | Makes the SQLite FK-disabling helper exception-safe via ExitStack callback teardown. |
| airflow-core/tests/unit/migrations/test_migration_utils.py | Adds regression tests ensuring PRAGMA is restored on both success and exception. |
| airflow-core/src/airflow/migrations/versions/0101_3_2_0_ui_improvements_for_deadlines.py | Switches SQLite FK handling to the shared helper (but currently narrows the disable scope). |
| airflow-core/src/airflow/migrations/versions/0084_3_1_0_add_last_parse_duration_to_dag_model.py | Replaces manual PRAGMA toggling with the helper in downgrade. |
| airflow-core/src/airflow/migrations/versions/0041_3_0_0_rename_dataset_as_asset.py | Replaces manual PRAGMA toggling with the helper in _drop_fkey_if_exists(). |
| airflow-core/src/airflow/migrations/versions/0017_2_9_2_fix_inconsistency_between_ORM_and_migration_files.py | Replaces manual PRAGMA toggling with the helper for SQLite table-recreation paths. |
jason810496
left a comment
There was a problem hiding this comment.
Thanks for the improvement.
|
@iting0321 Converting to draft — this PR doesn't yet meet our Pull Request quality criteria.
See the linked criteria for how to fix each item, then mark the PR "Ready for review". This is not a rejection — just an invitation to bring the PR up to standard. No rush. Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you. |
|
Quick follow-up to the triage comment above — one clarification on the "Unresolved review comments" item: Once you believe a thread has been addressed — whether by pushing a fix, or by replying in-thread with an explanation of why the suggestion doesn't apply — please mark the thread as resolved yourself by clicking the "Resolve conversation" button at the bottom of each thread. Reviewers don't auto-close their own threads, so an addressed-but-unresolved thread reads as "still waiting on the author" and keeps the PR from moving forward. The author doing the resolve-click is the expected convention on this project. Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you. |
… for utility functions
…MySQL in migration utilities
…r for table creation and data insertion
| conn = op.get_bind() | ||
|
|
||
| with op.batch_alter_table("deadline", schema=None) as batch_op: | ||
| batch_op.add_column(sa.Column("deadline_alert_id", sa.Uuid(), nullable=True)) | ||
| batch_op.add_column(sa.Column("created_at", UtcDateTime, nullable=True)) | ||
| batch_op.add_column(sa.Column("last_updated_at", UtcDateTime, nullable=True)) | ||
| batch_op.create_foreign_key( | ||
| batch_op.f("deadline_deadline_alert_id_fkey"), | ||
| "deadline_alert", | ||
| ["deadline_alert_id"], | ||
| ["id"], | ||
| ondelete="SET NULL", | ||
| ) | ||
|
|
||
| with op.batch_alter_table("deadline", schema=None) as batch_op: | ||
| batch_op.alter_column("created_at", existing_type=UtcDateTime, nullable=False) | ||
| batch_op.alter_column("last_updated_at", existing_type=UtcDateTime, nullable=False) | ||
|
|
||
| with op.batch_alter_table("deadline_alert", schema=None) as batch_op: | ||
| batch_op.create_foreign_key( | ||
| batch_op.f("deadline_alert_serialized_dag_id_fkey"), | ||
| "serialized_dag", | ||
| ["serialized_dag_id"], | ||
| ["id"], | ||
| ondelete="CASCADE", | ||
| # For migration/backcompat purposes if no timestamp is there from the migration, use now() | ||
| # then lock the columns down so all new entries require the timestamps to be provided. | ||
| now = timezone.utcnow() | ||
| conn.execute( | ||
| sa.text(""" | ||
| UPDATE deadline | ||
| SET created_at = :now, last_updated_at = :now | ||
| WHERE created_at IS NULL OR last_updated_at IS NULL | ||
| """), | ||
| {"now": now}, | ||
| ) |
| if dialect_name == "postgresql": | ||
| cols_array = ", ".join(f"'{c}'" for c in columns) | ||
| op.execute( |
| def _drop_fkey_if_exists(table, constraint_name): | ||
| conn = op.get_bind() | ||
| dialect_name = conn.dialect.name | ||
|
|
| self.executed.append(statement) | ||
|
|
||
|
|
||
| def test_disable_sqlite_fkeys_restores_pragma_on_success() -> None: |
There was a problem hiding this comment.
We no longer need this. we have a unit test for it in airflow-core/tests/unit/migrations/test_migration_utils.py
There was a problem hiding this comment.
we can merge tests there if needed
| return contextlib.nullcontext() | ||
|
|
||
|
|
||
| def create_index_if_not_exists(op, index_name, table_name, columns, unique=False) -> None: |
| else: | ||
| # SQLite requires foreign key constraints to be disabled during batch operations. | ||
| with disable_sqlite_fkeys(op): | ||
| try: |
There was a problem hiding this comment.
should we use ignore_sqlite_value_error?
| op.create_index(index_name, table_name, columns, unique=unique, if_not_exists=True) | ||
|
|
||
|
|
||
| def drop_index_if_exists(op, index_name, table_name) -> None: |
| op.drop_index(index_name, table_name=table_name, if_exists=True) | ||
|
|
||
|
|
||
| def drop_unique_constraints_on_columns(op, table_name, columns) -> None: |
| batch_op.drop_constraint(uq["name"], type_="unique") | ||
|
|
||
|
|
||
| def drop_unique_constraint_if_exists(op, table_name, constraint_name) -> None: |
What
This change improves migration utility reliability across SQLite, MySQL, and offline Alembic flows. It adds a shared SQLite foreign-key helper, updates affected migrations to use it, and hardens related helpers for offline-safe dialect detection and retry-safe MySQL stored procedures.
Why
To prevent SQLite migrations from leaving foreign keys disabled after failures, avoid offline Alembic crashes during dialect detection, and make MySQL migration helper reruns safer after interrupted executions.
Was generative AI tooling used to co-author this PR?