Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions .github/workflows/ci-amd-arm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,46 @@ jobs:
RUFF_FORMAT: "github"
INCLUDE_MYPY_VOLUME: "false"

migration-round-trip:
timeout-minutes: 20
name: "Migration round-trip check"
needs: [build-info, build-ci-images]
runs-on: ${{ fromJSON(needs.build-info.outputs.runner-type) }}
if: needs.build-info.outputs.has-migrations == 'true'
env:
PYTHON_MAJOR_MINOR_VERSION: "${{ needs.build-info.outputs.default-python-version }}"
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
steps:
- name: "Cleanup repo"
shell: bash
run: docker run -v "${GITHUB_WORKSPACE}:/workspace" -u 0:0 bash -c "rm -rf /workspace/*"
- name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
with:
persist-credentials: false
- name: "Prepare breeze & CI image: ${{ needs.build-info.outputs.default-python-version }}"
uses: ./.github/actions/prepare_breeze_and_image
with:
platform: ${{ needs.build-info.outputs.platform }}
python: "${{ needs.build-info.outputs.default-python-version }}"
use-uv: ${{ needs.build-info.outputs.use-uv }}
make-mnt-writeable-and-cleanup: true
id: breeze
- name: "Install prek"
uses: ./.github/actions/install-prek
id: prek
with:
python-version: ${{steps.breeze.outputs.host-python-version}}
platform: ${{ needs.build-info.outputs.platform }}
save-cache: false
- name: "Migration round-trip check"
run: prek --color always --verbose --stage manual migration-round-trip --all-files
env:
VERBOSE: "false"
COLUMNS: "202"
SKIP_GROUP_OUTPUT: "true"
DEFAULT_BRANCH: ${{ needs.build-info.outputs.default-branch }}

providers:
name: "provider distributions tests"
uses: ./.github/workflows/test-providers.yml
Expand Down Expand Up @@ -939,6 +979,7 @@ jobs:
- build-prod-images
- ci-image-checks
- generate-constraints
- migration-round-trip
- mypy-providers
- providers
- tests-helm
Expand Down
8 changes: 8 additions & 0 deletions airflow-core/.pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,14 @@ repos:
(?x)
^src/airflow/migrations/versions/.*\.py$|
^docs/migrations-ref\.rst$
- id: migration-round-trip
name: Migration round-trip SQLite FK enforcement
language: python
entry: ../scripts/ci/prek/migration_round_trip.py
pass_filenames: false
require_serial: true
stages: [manual]
files: ^src/airflow/migrations/versions/.*\.py$
- id: check-default-configuration
name: Check the default configuration
entry: ../scripts/ci/prek/check_default_configuration.py
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import sqlalchemy as sa
from alembic import op

from airflow.migrations.utils import disable_sqlite_fkeys

# revision identifiers, used by Alembic.
revision = "e79fc784f145"
down_revision = "0b112f49112d"
Expand All @@ -40,22 +42,18 @@

def upgrade():
"""Apply add timetable_type to dag table for filtering."""
from airflow.migrations.utils import disable_sqlite_fkeys

with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.add_column(sa.Column("timetable_type", sa.String(length=255)))
with disable_sqlite_fkeys(op):
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.add_column(sa.Column("timetable_type", sa.String(length=255)))

op.execute("UPDATE dag SET timetable_type = '' WHERE timetable_type IS NULL")
op.execute("UPDATE dag SET timetable_type = '' WHERE timetable_type IS NULL")

with disable_sqlite_fkeys(op):
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.alter_column("timetable_type", existing_type=sa.String(length=255), nullable=False)


def downgrade():
"""Unapply add timetable_type to dag table for filtering."""
from airflow.migrations.utils import disable_sqlite_fkeys

with disable_sqlite_fkeys(op):
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.drop_column("timetable_type")
Original file line number Diff line number Diff line change
Expand Up @@ -44,50 +44,54 @@

def upgrade():
"""Apply Fix migration file inconsistencies with ORM."""
dialect_name = context.get_context().dialect.name

# Use raw SQL so this migration remains usable in offline mode (--show-sql-only).
op.execute("UPDATE connection SET is_encrypted = FALSE WHERE is_encrypted IS NULL")
op.execute("UPDATE connection SET is_extra_encrypted = FALSE WHERE is_extra_encrypted IS NULL")

op.execute("UPDATE dag SET is_paused = FALSE WHERE is_paused IS NULL")

op.execute("UPDATE slot_pool SET slots = 0 WHERE slots IS NULL")

op.execute("UPDATE task_instance SET try_number = 0 WHERE try_number IS NULL")
op.execute("UPDATE task_instance SET max_tries = -1 WHERE max_tries IS NULL")

op.execute("UPDATE variable SET val = '' WHERE val IS NULL")
op.execute("UPDATE variable SET is_encrypted = FALSE WHERE is_encrypted IS NULL")
if dialect_name == "mysql":
op.execute(
"UPDATE variable SET `key` = CONCAT('__airflow_var_fix_888b59e02a5b_', id) WHERE `key` IS NULL"
)
else:
op.execute("UPDATE variable SET key = '__airflow_var_fix_888b59e02a5b_' || id WHERE key IS NULL")

with op.batch_alter_table("connection", schema=None) as batch_op:
batch_op.alter_column("is_encrypted", existing_type=sa.BOOLEAN(), nullable=False)
batch_op.alter_column("is_extra_encrypted", existing_type=sa.BOOLEAN(), nullable=False)

with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.alter_column("is_paused", existing_type=sa.BOOLEAN(), nullable=False)

with op.batch_alter_table("slot_pool", schema=None) as batch_op:
batch_op.alter_column("slots", existing_type=sa.INTEGER(), nullable=False)

with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.alter_column("try_number", existing_type=sa.INTEGER(), nullable=False)
batch_op.alter_column(
"max_tries", existing_type=sa.INTEGER(), nullable=False, existing_server_default=sa.text("'-1'")
)

with op.batch_alter_table("variable", schema=None) as batch_op:
batch_op.alter_column("key", existing_type=StringID(length=250), nullable=False)
batch_op.alter_column(
"val", existing_type=sa.TEXT().with_variant(MEDIUMTEXT, "mysql"), nullable=False
)
batch_op.alter_column("is_encrypted", existing_type=sa.BOOLEAN(), nullable=False)
with disable_sqlite_fkeys(op):
dialect_name = context.get_context().dialect.name

# Use raw SQL so this migration remains usable in offline mode (--show-sql-only).
op.execute("UPDATE connection SET is_encrypted = FALSE WHERE is_encrypted IS NULL")
op.execute("UPDATE connection SET is_extra_encrypted = FALSE WHERE is_extra_encrypted IS NULL")

op.execute("UPDATE dag SET is_paused = FALSE WHERE is_paused IS NULL")

op.execute("UPDATE slot_pool SET slots = 0 WHERE slots IS NULL")

op.execute("UPDATE task_instance SET try_number = 0 WHERE try_number IS NULL")
op.execute("UPDATE task_instance SET max_tries = -1 WHERE max_tries IS NULL")

op.execute("UPDATE variable SET val = '' WHERE val IS NULL")
op.execute("UPDATE variable SET is_encrypted = FALSE WHERE is_encrypted IS NULL")
if dialect_name == "mysql":
op.execute(
"UPDATE variable SET `key` = CONCAT('__airflow_var_fix_888b59e02a5b_', id) WHERE `key` IS NULL"
)
else:
op.execute("UPDATE variable SET key = '__airflow_var_fix_888b59e02a5b_' || id WHERE key IS NULL")

with op.batch_alter_table("connection", schema=None) as batch_op:
batch_op.alter_column("is_encrypted", existing_type=sa.BOOLEAN(), nullable=False)
batch_op.alter_column("is_extra_encrypted", existing_type=sa.BOOLEAN(), nullable=False)

with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.alter_column("is_paused", existing_type=sa.BOOLEAN(), nullable=False)

with op.batch_alter_table("slot_pool", schema=None) as batch_op:
batch_op.alter_column("slots", existing_type=sa.INTEGER(), nullable=False)

with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.alter_column("try_number", existing_type=sa.INTEGER(), nullable=False)
batch_op.alter_column(
"max_tries",
existing_type=sa.INTEGER(),
nullable=False,
existing_server_default=sa.text("'-1'"),
)

with op.batch_alter_table("variable", schema=None) as batch_op:
batch_op.alter_column("key", existing_type=StringID(length=250), nullable=False)
batch_op.alter_column(
"val", existing_type=sa.TEXT().with_variant(MEDIUMTEXT, "mysql"), nullable=False
)
batch_op.alter_column("is_encrypted", existing_type=sa.BOOLEAN(), nullable=False)


def downgrade():
Expand Down
9 changes: 9 additions & 0 deletions contributing-docs/14_metadata_database_updates.rst
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ and back down to the former. To run any of those CI tests on your machine, you c
1. Copy the relevant command (specified by the ``run`` key for the relevant CI job), and replace the environment variable references with their literal values defined in the sibling ``env`` section.
2. Run the command you created from step 1, troubleshooting errors as needed.

SQLite FK round-trip safety
---------------------------

Migrations that rebuild a parent table via ``op.batch_alter_table`` must wrap their entire body in
``disable_sqlite_fkeys(op)`` *before* any DML or DDL opens an implicit transaction — otherwise the
wrapper's PRAGMA is a no-op and the rebuild's implicit ``DROP TABLE`` cascade-deletes child rows
(or aborts on a RESTRICT chain). The placement convention and the round-trip prek hook that
enforces it are documented in `Migration round-trip regression check <26_migration_round_trip_check.rst>`__.

How to hook your application into Airflow's migration process
-------------------------------------------------------------

Expand Down
174 changes: 174 additions & 0 deletions contributing-docs/26_migration_round_trip_check.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

Migration round-trip regression check
=====================================

This page documents the manual prek hook that verifies SQLite
migrations round-trip safely from base to head and back, plus the
``disable_sqlite_fkeys(op)`` placement convention every migration
must follow.

If you are adding or editing a migration, the convention section is
the part that affects you. The rest is operational detail for people
maintaining the check itself.

Files:

- ``scripts/in_container/run_migration_round_trip.py`` — the test driver.
- ``scripts/ci/prek/migration_round_trip.py`` — the prek hook wrapper that
drives the in-container script via Breeze.

Running the check
-----------------

The hook is registered under ``stages: [manual]`` in
``airflow-core/.pre-commit-config.yaml`` so it does not fire on every
local commit (a full base→head→base walk takes a few minutes). To run
it manually:

.. code-block:: bash

prek run migration-round-trip --hook-stage manual --all-files

CI runs it automatically on any pull request whose changed files
include ``airflow-core/src/airflow/migrations/`` — the dedicated
``Migration round-trip check`` job in ``.github/workflows/ci-amd-arm.yml``
is gated on the ``has-migrations`` selective-checks output.


Convention: ``disable_sqlite_fkeys`` placement
----------------------------------------------

Migrations that rebuild a parent table via ``op.batch_alter_table(...)``
(any table whose children declare ``ON DELETE CASCADE``) must wrap their
entire body in ``disable_sqlite_fkeys(op)``, **before any DML statement
or any rebuild-mode** ``op.batch_alter_table(...)`` runs.

The exact rule (verified empirically):

``PRAGMA foreign_keys = on/off`` only takes effect on a connection
that is currently in SQLite's autocommit mode. Pure DDL statements
(``ALTER TABLE``, ``CREATE TABLE``, ``DROP TABLE``) implicitly
commit and leave the connection in autocommit, so a PRAGMA *after*
them still works. DML statements (``INSERT``, ``UPDATE``,
``DELETE``) leave the connection in an open transaction, and any
PRAGMA after a DML — including the one inside
``disable_sqlite_fkeys`` — is silently a no-op.

The trap: ``op.batch_alter_table`` is sometimes pure DDL (e.g.
``add_column`` only — alembic uses native ``ALTER TABLE`` on SQLite)
and sometimes a rebuild that internally does
``CREATE _alembic_tmp / INSERT INTO ... SELECT FROM / DROP / ALTER
RENAME``. The internal ``INSERT`` makes the connection no longer
autocommit, so a subsequent ``disable_sqlite_fkeys`` block is a no-op.

Because (a) it's hard for migration authors to predict at write time
whether a particular ``batch_alter_table`` will rebuild and (b) the
wrapper costs nothing to put earlier, the practical convention is
strictly stronger than the technical rule:

Put ``with disable_sqlite_fkeys(op):`` as the **outermost** block
of any ``upgrade`` / ``downgrade`` that calls
``op.batch_alter_table`` on a parent table, before *any* DML or
DDL. Don't try to optimise it into a smaller scope.

Correct:

.. code-block:: python

def upgrade():
from airflow.migrations.utils import disable_sqlite_fkeys

with disable_sqlite_fkeys(op):
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.add_column(sa.Column("..."))

op.execute("UPDATE dag SET ... WHERE ...")

with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.alter_column("...", nullable=False)

Anti-pattern:

.. code-block:: python

def upgrade():
# The first batch_alter_table here is pure add_column, so it's
# native ALTER TABLE (DDL) and on its own would leave the
# connection in autocommit. But the op.execute("UPDATE ...")
# below it is DML — it leaves the connection in a non-
# autocommit state, so the wrapper's PRAGMA is a no-op and the
# alter_column rebuild runs with foreign_keys still ON.
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.add_column(sa.Column("..."))

op.execute("UPDATE dag SET ... WHERE ...")

with disable_sqlite_fkeys(op): # ← TOO LATE: PRAGMA is a no-op
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.alter_column("...", nullable=False)

The same convention applies to ``downgrade()``.

What the check tests
--------------------

The driver:

1. Spins up a fresh SQLite database with ``PRAGMA foreign_keys = ON``.
2. Walks every migration in topological order, base → head, one
revision at a time. Before each step it idempotently restores the
seed fixture (``log_template``, ``dag_bundle``, ``dag``,
``dag_version``, ``dag_run``, ``task_instance``) to whatever the
live schema can hold, so any FK chain a rebuild needs to fire
(``dag → dag_version → task_instance.dag_version_id RESTRICT`` is
the load-bearing one) is in place.
3. Walks every migration in reverse, head → base, the same way.

Any failure is reported with the failing direction (``up`` /
``down``) and the rev id, plus the underlying exception.

Why per-migration walking
-------------------------

A single-call ``airflow_db.upgradedb(to_revision=tip)`` walks all
migrations on a single shared SQLAlchemy connection. Once any earlier
migration's correctly-placed ``disable_sqlite_fkeys(op)`` flips
``foreign_keys`` to off, that state stays off on the connection — and
later migrations on the same connection silently inherit the protection.
A migration that *omits* ``disable_sqlite_fkeys`` or places it with
the wrong placement (where its own PRAGMA is a no-op) then looks fine
in single-call mode purely because of the inherited state.

Walking each migration through its own
``airflow_db.upgradedb(to_revision=specific_rev)`` call sidesteps that:
each call ``dispose_orm()`` + ``configure_orm()`` builds a fresh engine,
``setup_event_handlers`` fires ``PRAGMA foreign_keys=ON`` on the new
connection, and the migration starts from a clean slate.

Maintaining the seed
--------------------

``SEED_VALUES`` in ``run_migration_round_trip.py`` lists hardcoded
values for every column the seed touches across all revisions in the
chain. If a future migration adds a brand-new ``NOT NULL``-no-default
column to one of the seeded tables (``dag``, ``dag_version``,
``dag_run``, ``task_instance``, ``dag_bundle``, ``log_template``), the
round-trip aborts with a clear ``RuntimeError`` pointing at the table
that needs an addition. Add the new column + a sensible literal value
to ``SEED_VALUES['<table>']`` and re-run.
Loading
Loading