diff --git a/.github/workflows/ci-amd-arm.yml b/.github/workflows/ci-amd-arm.yml index 85b7c8b1552be..ccdb2ea36a8b8 100644 --- a/.github/workflows/ci-amd-arm.yml +++ b/.github/workflows/ci-amd-arm.yml @@ -377,6 +377,46 @@ jobs: RUFF_FORMAT: "github" INCLUDE_MYPY_VOLUME: "false" + migration-round-trip: + timeout-minutes: 20 + name: "Migration round-trip check" + needs: [build-info, build-ci-images] + runs-on: ${{ fromJSON(needs.build-info.outputs.runner-type) }} + if: needs.build-info.outputs.has-migrations == 'true' + env: + PYTHON_MAJOR_MINOR_VERSION: "${{ needs.build-info.outputs.default-python-version }}" + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + steps: + - name: "Cleanup repo" + shell: bash + run: docker run -v "${GITHUB_WORKSPACE}:/workspace" -u 0:0 bash -c "rm -rf /workspace/*" + - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + with: + persist-credentials: false + - name: "Prepare breeze & CI image: ${{ needs.build-info.outputs.default-python-version }}" + uses: ./.github/actions/prepare_breeze_and_image + with: + platform: ${{ needs.build-info.outputs.platform }} + python: "${{ needs.build-info.outputs.default-python-version }}" + use-uv: ${{ needs.build-info.outputs.use-uv }} + make-mnt-writeable-and-cleanup: true + id: breeze + - name: "Install prek" + uses: ./.github/actions/install-prek + id: prek + with: + python-version: ${{steps.breeze.outputs.host-python-version}} + platform: ${{ needs.build-info.outputs.platform }} + save-cache: false + - name: "Migration round-trip check" + run: prek --color always --verbose --stage manual migration-round-trip --all-files + env: + VERBOSE: "false" + COLUMNS: "202" + SKIP_GROUP_OUTPUT: "true" + DEFAULT_BRANCH: ${{ needs.build-info.outputs.default-branch }} + providers: name: "provider distributions tests" uses: ./.github/workflows/test-providers.yml @@ -939,6 +979,7 @@ jobs: - build-prod-images - ci-image-checks - generate-constraints + - migration-round-trip - mypy-providers - providers - tests-helm diff --git a/airflow-core/.pre-commit-config.yaml b/airflow-core/.pre-commit-config.yaml index 64ddae194edb3..be5d724e56965 100644 --- a/airflow-core/.pre-commit-config.yaml +++ b/airflow-core/.pre-commit-config.yaml @@ -256,6 +256,14 @@ repos: (?x) ^src/airflow/migrations/versions/.*\.py$| ^docs/migrations-ref\.rst$ + - id: migration-round-trip + name: Migration round-trip SQLite FK enforcement + language: python + entry: ../scripts/ci/prek/migration_round_trip.py + pass_filenames: false + require_serial: true + stages: [manual] + files: ^src/airflow/migrations/versions/.*\.py$ - id: check-default-configuration name: Check the default configuration entry: ../scripts/ci/prek/check_default_configuration.py diff --git a/airflow-core/src/airflow/migrations/versions/0100_3_2_0_add_timetable_type_to_dag_table_for_.py b/airflow-core/src/airflow/migrations/versions/0100_3_2_0_add_timetable_type_to_dag_table_for_.py index 47349528c6e4e..9c01edf3112de 100644 --- a/airflow-core/src/airflow/migrations/versions/0100_3_2_0_add_timetable_type_to_dag_table_for_.py +++ b/airflow-core/src/airflow/migrations/versions/0100_3_2_0_add_timetable_type_to_dag_table_for_.py @@ -30,6 +30,8 @@ import sqlalchemy as sa from alembic import op +from airflow.migrations.utils import disable_sqlite_fkeys + # revision identifiers, used by Alembic. revision = "e79fc784f145" down_revision = "0b112f49112d" @@ -40,22 +42,18 @@ def upgrade(): """Apply add timetable_type to dag table for filtering.""" - from airflow.migrations.utils import disable_sqlite_fkeys - - with op.batch_alter_table("dag", schema=None) as batch_op: - batch_op.add_column(sa.Column("timetable_type", sa.String(length=255))) + with disable_sqlite_fkeys(op): + with op.batch_alter_table("dag", schema=None) as batch_op: + batch_op.add_column(sa.Column("timetable_type", sa.String(length=255))) - op.execute("UPDATE dag SET timetable_type = '' WHERE timetable_type IS NULL") + op.execute("UPDATE dag SET timetable_type = '' WHERE timetable_type IS NULL") - with disable_sqlite_fkeys(op): with op.batch_alter_table("dag", schema=None) as batch_op: batch_op.alter_column("timetable_type", existing_type=sa.String(length=255), nullable=False) def downgrade(): """Unapply add timetable_type to dag table for filtering.""" - from airflow.migrations.utils import disable_sqlite_fkeys - with disable_sqlite_fkeys(op): with op.batch_alter_table("dag", schema=None) as batch_op: batch_op.drop_column("timetable_type") diff --git a/airflow-core/src/airflow/migrations/versions/0108_3_2_0_fix_migration_file_ORM_inconsistencies.py b/airflow-core/src/airflow/migrations/versions/0108_3_2_0_fix_migration_file_ORM_inconsistencies.py index b653ff15dc77b..4598d92d1aca5 100644 --- a/airflow-core/src/airflow/migrations/versions/0108_3_2_0_fix_migration_file_ORM_inconsistencies.py +++ b/airflow-core/src/airflow/migrations/versions/0108_3_2_0_fix_migration_file_ORM_inconsistencies.py @@ -44,50 +44,54 @@ def upgrade(): """Apply Fix migration file inconsistencies with ORM.""" - dialect_name = context.get_context().dialect.name - - # Use raw SQL so this migration remains usable in offline mode (--show-sql-only). - op.execute("UPDATE connection SET is_encrypted = FALSE WHERE is_encrypted IS NULL") - op.execute("UPDATE connection SET is_extra_encrypted = FALSE WHERE is_extra_encrypted IS NULL") - - op.execute("UPDATE dag SET is_paused = FALSE WHERE is_paused IS NULL") - - op.execute("UPDATE slot_pool SET slots = 0 WHERE slots IS NULL") - - op.execute("UPDATE task_instance SET try_number = 0 WHERE try_number IS NULL") - op.execute("UPDATE task_instance SET max_tries = -1 WHERE max_tries IS NULL") - - op.execute("UPDATE variable SET val = '' WHERE val IS NULL") - op.execute("UPDATE variable SET is_encrypted = FALSE WHERE is_encrypted IS NULL") - if dialect_name == "mysql": - op.execute( - "UPDATE variable SET `key` = CONCAT('__airflow_var_fix_888b59e02a5b_', id) WHERE `key` IS NULL" - ) - else: - op.execute("UPDATE variable SET key = '__airflow_var_fix_888b59e02a5b_' || id WHERE key IS NULL") - - with op.batch_alter_table("connection", schema=None) as batch_op: - batch_op.alter_column("is_encrypted", existing_type=sa.BOOLEAN(), nullable=False) - batch_op.alter_column("is_extra_encrypted", existing_type=sa.BOOLEAN(), nullable=False) - - with op.batch_alter_table("dag", schema=None) as batch_op: - batch_op.alter_column("is_paused", existing_type=sa.BOOLEAN(), nullable=False) - - with op.batch_alter_table("slot_pool", schema=None) as batch_op: - batch_op.alter_column("slots", existing_type=sa.INTEGER(), nullable=False) - - with op.batch_alter_table("task_instance", schema=None) as batch_op: - batch_op.alter_column("try_number", existing_type=sa.INTEGER(), nullable=False) - batch_op.alter_column( - "max_tries", existing_type=sa.INTEGER(), nullable=False, existing_server_default=sa.text("'-1'") - ) - - with op.batch_alter_table("variable", schema=None) as batch_op: - batch_op.alter_column("key", existing_type=StringID(length=250), nullable=False) - batch_op.alter_column( - "val", existing_type=sa.TEXT().with_variant(MEDIUMTEXT, "mysql"), nullable=False - ) - batch_op.alter_column("is_encrypted", existing_type=sa.BOOLEAN(), nullable=False) + with disable_sqlite_fkeys(op): + dialect_name = context.get_context().dialect.name + + # Use raw SQL so this migration remains usable in offline mode (--show-sql-only). + op.execute("UPDATE connection SET is_encrypted = FALSE WHERE is_encrypted IS NULL") + op.execute("UPDATE connection SET is_extra_encrypted = FALSE WHERE is_extra_encrypted IS NULL") + + op.execute("UPDATE dag SET is_paused = FALSE WHERE is_paused IS NULL") + + op.execute("UPDATE slot_pool SET slots = 0 WHERE slots IS NULL") + + op.execute("UPDATE task_instance SET try_number = 0 WHERE try_number IS NULL") + op.execute("UPDATE task_instance SET max_tries = -1 WHERE max_tries IS NULL") + + op.execute("UPDATE variable SET val = '' WHERE val IS NULL") + op.execute("UPDATE variable SET is_encrypted = FALSE WHERE is_encrypted IS NULL") + if dialect_name == "mysql": + op.execute( + "UPDATE variable SET `key` = CONCAT('__airflow_var_fix_888b59e02a5b_', id) WHERE `key` IS NULL" + ) + else: + op.execute("UPDATE variable SET key = '__airflow_var_fix_888b59e02a5b_' || id WHERE key IS NULL") + + with op.batch_alter_table("connection", schema=None) as batch_op: + batch_op.alter_column("is_encrypted", existing_type=sa.BOOLEAN(), nullable=False) + batch_op.alter_column("is_extra_encrypted", existing_type=sa.BOOLEAN(), nullable=False) + + with op.batch_alter_table("dag", schema=None) as batch_op: + batch_op.alter_column("is_paused", existing_type=sa.BOOLEAN(), nullable=False) + + with op.batch_alter_table("slot_pool", schema=None) as batch_op: + batch_op.alter_column("slots", existing_type=sa.INTEGER(), nullable=False) + + with op.batch_alter_table("task_instance", schema=None) as batch_op: + batch_op.alter_column("try_number", existing_type=sa.INTEGER(), nullable=False) + batch_op.alter_column( + "max_tries", + existing_type=sa.INTEGER(), + nullable=False, + existing_server_default=sa.text("'-1'"), + ) + + with op.batch_alter_table("variable", schema=None) as batch_op: + batch_op.alter_column("key", existing_type=StringID(length=250), nullable=False) + batch_op.alter_column( + "val", existing_type=sa.TEXT().with_variant(MEDIUMTEXT, "mysql"), nullable=False + ) + batch_op.alter_column("is_encrypted", existing_type=sa.BOOLEAN(), nullable=False) def downgrade(): diff --git a/contributing-docs/14_metadata_database_updates.rst b/contributing-docs/14_metadata_database_updates.rst index b8dd7115ff61f..6f3d6a3f22e02 100644 --- a/contributing-docs/14_metadata_database_updates.rst +++ b/contributing-docs/14_metadata_database_updates.rst @@ -97,6 +97,15 @@ and back down to the former. To run any of those CI tests on your machine, you c 1. Copy the relevant command (specified by the ``run`` key for the relevant CI job), and replace the environment variable references with their literal values defined in the sibling ``env`` section. 2. Run the command you created from step 1, troubleshooting errors as needed. +SQLite FK round-trip safety +--------------------------- + +Migrations that rebuild a parent table via ``op.batch_alter_table`` must wrap their entire body in +``disable_sqlite_fkeys(op)`` *before* any DML or DDL opens an implicit transaction — otherwise the +wrapper's PRAGMA is a no-op and the rebuild's implicit ``DROP TABLE`` cascade-deletes child rows +(or aborts on a RESTRICT chain). The placement convention and the round-trip prek hook that +enforces it are documented in `Migration round-trip regression check <26_migration_round_trip_check.rst>`__. + How to hook your application into Airflow's migration process ------------------------------------------------------------- diff --git a/contributing-docs/26_migration_round_trip_check.rst b/contributing-docs/26_migration_round_trip_check.rst new file mode 100644 index 0000000000000..1f5052c43fdb5 --- /dev/null +++ b/contributing-docs/26_migration_round_trip_check.rst @@ -0,0 +1,174 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Migration round-trip regression check +===================================== + +This page documents the manual prek hook that verifies SQLite +migrations round-trip safely from base to head and back, plus the +``disable_sqlite_fkeys(op)`` placement convention every migration +must follow. + +If you are adding or editing a migration, the convention section is +the part that affects you. The rest is operational detail for people +maintaining the check itself. + +Files: + +- ``scripts/in_container/run_migration_round_trip.py`` — the test driver. +- ``scripts/ci/prek/migration_round_trip.py`` — the prek hook wrapper that + drives the in-container script via Breeze. + +Running the check +----------------- + +The hook is registered under ``stages: [manual]`` in +``airflow-core/.pre-commit-config.yaml`` so it does not fire on every +local commit (a full base→head→base walk takes a few minutes). To run +it manually: + +.. code-block:: bash + + prek run migration-round-trip --hook-stage manual --all-files + +CI runs it automatically on any pull request whose changed files +include ``airflow-core/src/airflow/migrations/`` — the dedicated +``Migration round-trip check`` job in ``.github/workflows/ci-amd-arm.yml`` +is gated on the ``has-migrations`` selective-checks output. + + +Convention: ``disable_sqlite_fkeys`` placement +---------------------------------------------- + +Migrations that rebuild a parent table via ``op.batch_alter_table(...)`` +(any table whose children declare ``ON DELETE CASCADE``) must wrap their +entire body in ``disable_sqlite_fkeys(op)``, **before any DML statement +or any rebuild-mode** ``op.batch_alter_table(...)`` runs. + +The exact rule (verified empirically): + + ``PRAGMA foreign_keys = on/off`` only takes effect on a connection + that is currently in SQLite's autocommit mode. Pure DDL statements + (``ALTER TABLE``, ``CREATE TABLE``, ``DROP TABLE``) implicitly + commit and leave the connection in autocommit, so a PRAGMA *after* + them still works. DML statements (``INSERT``, ``UPDATE``, + ``DELETE``) leave the connection in an open transaction, and any + PRAGMA after a DML — including the one inside + ``disable_sqlite_fkeys`` — is silently a no-op. + +The trap: ``op.batch_alter_table`` is sometimes pure DDL (e.g. +``add_column`` only — alembic uses native ``ALTER TABLE`` on SQLite) +and sometimes a rebuild that internally does +``CREATE _alembic_tmp / INSERT INTO ... SELECT FROM / DROP / ALTER +RENAME``. The internal ``INSERT`` makes the connection no longer +autocommit, so a subsequent ``disable_sqlite_fkeys`` block is a no-op. + +Because (a) it's hard for migration authors to predict at write time +whether a particular ``batch_alter_table`` will rebuild and (b) the +wrapper costs nothing to put earlier, the practical convention is +strictly stronger than the technical rule: + + Put ``with disable_sqlite_fkeys(op):`` as the **outermost** block + of any ``upgrade`` / ``downgrade`` that calls + ``op.batch_alter_table`` on a parent table, before *any* DML or + DDL. Don't try to optimise it into a smaller scope. + +Correct: + +.. code-block:: python + + def upgrade(): + from airflow.migrations.utils import disable_sqlite_fkeys + + with disable_sqlite_fkeys(op): + with op.batch_alter_table("dag", schema=None) as batch_op: + batch_op.add_column(sa.Column("...")) + + op.execute("UPDATE dag SET ... WHERE ...") + + with op.batch_alter_table("dag", schema=None) as batch_op: + batch_op.alter_column("...", nullable=False) + +Anti-pattern: + +.. code-block:: python + + def upgrade(): + # The first batch_alter_table here is pure add_column, so it's + # native ALTER TABLE (DDL) and on its own would leave the + # connection in autocommit. But the op.execute("UPDATE ...") + # below it is DML — it leaves the connection in a non- + # autocommit state, so the wrapper's PRAGMA is a no-op and the + # alter_column rebuild runs with foreign_keys still ON. + with op.batch_alter_table("dag", schema=None) as batch_op: + batch_op.add_column(sa.Column("...")) + + op.execute("UPDATE dag SET ... WHERE ...") + + with disable_sqlite_fkeys(op): # ← TOO LATE: PRAGMA is a no-op + with op.batch_alter_table("dag", schema=None) as batch_op: + batch_op.alter_column("...", nullable=False) + +The same convention applies to ``downgrade()``. + +What the check tests +-------------------- + +The driver: + +1. Spins up a fresh SQLite database with ``PRAGMA foreign_keys = ON``. +2. Walks every migration in topological order, base → head, one + revision at a time. Before each step it idempotently restores the + seed fixture (``log_template``, ``dag_bundle``, ``dag``, + ``dag_version``, ``dag_run``, ``task_instance``) to whatever the + live schema can hold, so any FK chain a rebuild needs to fire + (``dag → dag_version → task_instance.dag_version_id RESTRICT`` is + the load-bearing one) is in place. +3. Walks every migration in reverse, head → base, the same way. + +Any failure is reported with the failing direction (``up`` / +``down``) and the rev id, plus the underlying exception. + +Why per-migration walking +------------------------- + +A single-call ``airflow_db.upgradedb(to_revision=tip)`` walks all +migrations on a single shared SQLAlchemy connection. Once any earlier +migration's correctly-placed ``disable_sqlite_fkeys(op)`` flips +``foreign_keys`` to off, that state stays off on the connection — and +later migrations on the same connection silently inherit the protection. +A migration that *omits* ``disable_sqlite_fkeys`` or places it with +the wrong placement (where its own PRAGMA is a no-op) then looks fine +in single-call mode purely because of the inherited state. + +Walking each migration through its own +``airflow_db.upgradedb(to_revision=specific_rev)`` call sidesteps that: +each call ``dispose_orm()`` + ``configure_orm()`` builds a fresh engine, +``setup_event_handlers`` fires ``PRAGMA foreign_keys=ON`` on the new +connection, and the migration starts from a clean slate. + +Maintaining the seed +-------------------- + +``SEED_VALUES`` in ``run_migration_round_trip.py`` lists hardcoded +values for every column the seed touches across all revisions in the +chain. If a future migration adds a brand-new ``NOT NULL``-no-default +column to one of the seeded tables (``dag``, ``dag_version``, +``dag_run``, ``task_instance``, ``dag_bundle``, ``log_template``), the +round-trip aborts with a clear ``RuntimeError`` pointing at the table +that needs an addition. Add the new column + a sensible literal value +to ``SEED_VALUES['']`` and re-run. diff --git a/scripts/ci/prek/migration_round_trip.py b/scripts/ci/prek/migration_round_trip.py new file mode 100755 index 0000000000000..8db62e944069f --- /dev/null +++ b/scripts/ci/prek/migration_round_trip.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# /// script +# requires-python = ">=3.10" +# dependencies = [ +# "rich>=13.6.0", +# ] +# /// +""" +Prek entrypoint for the migration round-trip regression check. + +The real logic lives in ``scripts/in_container/run_migration_round_trip.py`` +(mounted at ``/opt/airflow/scripts/`` inside Breeze). This wrapper just +spins up a Breeze sqlite shell and runs that module. + +Full design + the disable_sqlite_fkeys placement convention every migration +must follow is documented in +``contributing-docs/26_migration_round_trip_check.rst``. +""" + +from __future__ import annotations + +from common_prek_utils import ( + initialize_breeze_prek, + run_command_via_breeze_shell, + validate_cmd_result, +) + +initialize_breeze_prek(__name__, __file__) + +cmd_result = run_command_via_breeze_shell( + ["python3", "/opt/airflow/scripts/in_container/run_migration_round_trip.py"], + backend="sqlite", +) + +validate_cmd_result(cmd_result) diff --git a/scripts/in_container/run_migration_round_trip.py b/scripts/in_container/run_migration_round_trip.py new file mode 100755 index 0000000000000..20e15c672c3db --- /dev/null +++ b/scripts/in_container/run_migration_round_trip.py @@ -0,0 +1,344 @@ +#!/usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Migration round-trip regression check — intended to run inside Breeze. + +Walks every revision base → head (upgrade direction) and head → base +(downgrade direction) one rev at a time, on a single fresh SQLite DB +with ``PRAGMA foreign_keys = ON``. Before each step the seed fixture is +idempotently restored so any FK chain a rebuild needs to fire is in +place. + +Per-migration walking is essential. ``airflow_db.upgradedb`` / +``airflow_db.downgrade`` reconfigures Airflow's engine via +``_single_connection_pool`` for every call, so each migration starts +on a freshly-connected SQLAlchemy engine where ``setup_event_handlers`` +fires ``PRAGMA foreign_keys=ON``. In contrast a single-call +``upgradedb(to_revision=tip)`` walks all migrations on one shared +connection: once an earlier migration's ``disable_sqlite_fkeys`` flips +FK off on that connection, the off state simply persists into later +migrations, silently masking broken or missing wrappers downstream. + +The placement convention every migration must follow (only DML breaks +PRAGMA, but the safe rule is strictly stronger) and the FK chains +exercised by the seed are documented in +``contributing-docs/26_migration_round_trip_check.rst``. +""" + +from __future__ import annotations + +import os +import sys +import tempfile +from collections.abc import Callable +from pathlib import Path +from typing import Any + +from alembic.script import ScriptDirectory +from rich.console import Console +from rich.panel import Panel +from rich.rule import Rule +from sqlalchemy import text + +console = Console(width=140, color_system="standard", highlight=False) + +SEED_DAG_ID = "round_trip_dag" +SEED_DAG_VERSION_ID = "abcdef0123456789abcdef0123456789" +SEED_BUNDLE_NAME = "dags-folder" +SEED_RUN_ID = "manual_run_1" +SEED_TASK_ID = "task1" +SEED_TASK_INSTANCE_ID = "fedcba9876543210fedcba9876543210" + +# Hardcoded INSERT values for tables we seed. Each entry maps column name +# to a SQL literal. When a column is not present in the live schema (e.g. +# an older revision predating its addition), it is dropped from the INSERT +# — provided every NOT-NULL-no-default column at that revision is covered. +# When a future migration introduces a brand-new NOT NULL column without a +# default, ``_insert_or_skip`` raises with a clear message pointing at the +# table that needs an entry here. +SEED_VALUES: dict[str, dict[str, str]] = { + "log_template": { + "id": "1", + "filename": "'{{ ti.dag_id }}/{{ ts }}.log'", + "elasticsearch_id": "'{{ ti.dag_id }}-{{ ts }}'", + "created_at": "'2024-01-01 00:00:00.000000+00:00'", + }, + "dag_bundle": { + "name": f"'{SEED_BUNDLE_NAME}'", + "active": "1", + "version": "NULL", + "last_refreshed": "NULL", + "signed_url_template": "NULL", + "url_template": "NULL", + "bundle_template_params": "NULL", + }, + "dag": { + "dag_id": f"'{SEED_DAG_ID}'", + "max_active_tasks": "16", + "has_task_concurrency_limits": "0", + "is_paused": "0", + "is_active": "1", + "is_stale": "0", + "fileloc": "'/tmp/round_trip_dag.py'", + "fail_fast": "0", + "max_consecutive_failed_dag_runs": "0", + "bundle_name": f"'{SEED_BUNDLE_NAME}'", + "has_import_errors": "0", + "max_active_runs": "16", + "_default_view": "'grid'", + "exceeds_max_non_backfill": "0", + "timetable_type": "'cron'", + "timetable_partitioned": "0", + "timetable_periodic": "0", + }, + "dag_version": { + "id": f"'{SEED_DAG_VERSION_ID}'", + "version_number": "1", + "dag_id": f"'{SEED_DAG_ID}'", + "bundle_name": f"'{SEED_BUNDLE_NAME}'", + "bundle_version": "NULL", + "created_at": "'2024-01-01 00:00:00+00:00'", + "last_updated": "'2024-01-01 00:00:00+00:00'", + }, + "dag_run": { + "id": "1", + "dag_id": f"'{SEED_DAG_ID}'", + "execution_date": "'2024-01-01 00:00:00.000000+00:00'", + "logical_date": "'2024-01-01 00:00:00.000000+00:00'", + "run_id": f"'{SEED_RUN_ID}'", + "run_type": "'manual'", + "state": "'success'", + "log_template_id": "1", + "run_after": "'2024-01-01 00:00:00+00:00'", + "span_status": "'not_started'", + "clear_number": "0", + }, + "task_instance": { + "id": f"'{SEED_TASK_INSTANCE_ID}'", + "task_id": f"'{SEED_TASK_ID}'", + "dag_id": f"'{SEED_DAG_ID}'", + "run_id": f"'{SEED_RUN_ID}'", + "map_index": "-1", + "pool": "'default'", + "pool_slots": "1", + "state": "'success'", + "try_number": "1", + "max_tries": "0", + }, +} + +# FK-dependency-respecting insertion order. +SEED_ORDER = ("log_template", "dag_bundle", "dag", "dag_version", "dag_run", "task_instance") + + +def _make_fresh_db() -> Path: + """Create a fresh SQLite file and re-point Airflow's engine at it.""" + fd, path = tempfile.mkstemp(prefix="round_trip_", suffix=".db") + os.close(fd) + db = Path(path) + db.unlink() # Airflow creates the file on first connect + + new_conn = f"sqlite:///{db}" + os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = new_conn + + # Delayed: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN must be set in the environment + # before Airflow initialises its configuration, so settings.configure_orm() + # picks up the fresh DB path rather than whatever was in the environment at + # process start. + from airflow import settings + + settings.SQL_ALCHEMY_CONN = new_conn + settings.dispose_orm() + settings.configure_orm() + return db + + +def _step(msg: str) -> None: + console.print() + console.print(Rule(f"[bold cyan]{msg}[/]", style="cyan")) + + +def _table_info(conn, table: str) -> list[tuple[Any, ...]]: + return list(conn.execute(text(f"PRAGMA table_info('{table}')")).fetchall()) + + +def _table_exists(conn, table: str) -> bool: + return bool( + conn.execute( + text("SELECT 1 FROM sqlite_master WHERE type='table' AND name=:n"), + {"n": table}, + ).scalar() + ) + + +def _insert_or_skip(conn, table: str) -> None: + """Insert a seed row into ``table`` if the table exists and the row is missing. + + Only columns present in the live schema and listed in ``SEED_VALUES`` are + included. Raises if a NOT-NULL-no-default column at the live revision is + not in ``SEED_VALUES``. + """ + if not _table_exists(conn, table): + return + + info = _table_info(conn, table) + live_cols = {r[1] for r in info} + not_null_no_default = {r[1] for r in info if r[3] and r[4] is None} + + seed = SEED_VALUES[table] + use = {k: v for k, v in seed.items() if k in live_cols} + missing = not_null_no_default - set(use) + if missing: + raise RuntimeError( + f"Cannot seed `{table}`: NOT-NULL columns without defaults are missing seed values: " + f"{sorted(missing)}. Add them to SEED_VALUES['{table}'] in this script." + ) + + key = "dag_id" if "dag_id" in use else ("id" if "id" in use else next(iter(use))) + if conn.execute(text(f"SELECT 1 FROM {table} WHERE {key}={use[key]}")).scalar(): + return + + cols = ", ".join(use) + vals = ", ".join(use.values()) + conn.execute(text(f"INSERT INTO {table} ({cols}) VALUES ({vals})")) + + +def _seed_for_current_schema(get_engine: Callable[[], Any]) -> None: + """Idempotently seed every table the live schema can accept. + + Run before each migration step so the seed is in place to fire FK-cascade + chains during the rebuild that step performs (if any). + """ + with get_engine().begin() as conn: + for table in SEED_ORDER: + _insert_or_skip(conn, table) + + ti_cols = ( + {r[1] for r in _table_info(conn, "task_instance")} + if _table_exists(conn, "task_instance") + else set() + ) + if "dag_version_id" in ti_cols and _table_exists(conn, "dag_version"): + conn.execute( + text( + "UPDATE task_instance SET dag_version_id = :dvid " + "WHERE task_id = :tid AND dag_id = :did AND dag_version_id IS NULL" + ), + {"dvid": SEED_DAG_VERSION_ID, "tid": SEED_TASK_ID, "did": SEED_DAG_ID}, + ) + + +def _all_revisions() -> list[str]: + """Return every migration revision in topological order, base → head.""" + from airflow.utils.db import _get_alembic_config + + cfg = _get_alembic_config() + script = ScriptDirectory.from_config(cfg) + return [r.revision for r in reversed(list(script.walk_revisions(base="base", head="heads")))] + + +def main() -> None: + revs = _all_revisions() + console.print( + Panel( + f"Walking {len(revs)} revisions: base → head, then head → base.", + title="migration round-trip", + border_style="cyan", + expand=False, + ) + ) + + db = _make_fresh_db() + try: + # settings was imported and configured by _make_fresh_db(); importing + # it here just retrieves the already-initialised module from sys.modules. + from airflow import settings + from airflow.utils import db as airflow_db + from airflow.utils.db import _SKIP_EXTERNAL_DB_MANAGERS_UPGRADE + + token = _SKIP_EXTERNAL_DB_MANAGERS_UPGRADE.set(True) + try: + _step(f"Walk UP base → {revs[-1][:8]}…") + for rev in revs: + try: + # Pass settings.get_engine (not the engine itself) so each + # call picks up the engine that upgradedb/downgrade may have + # just reconfigured via _single_connection_pool. + _seed_for_current_schema(settings.get_engine) + except Exception as exc: + _fail("seed", rev, exc) + return + try: + airflow_db.upgradedb(to_revision=rev) + except Exception as exc: + _fail("up", rev, exc) + return + + _step(f"Walk DOWN {revs[-1][:8]}… → {revs[0][:8]}…") + # Stop at the first migration (the squashed-migrations file). Going + # all the way to ``base`` means executing the squashed migration's + # downgrade, which "drop everything we created" can fail on objects + # that later migrations have already renamed or removed (e.g. the + # ``sm_dag`` index). That's a pre-existing limitation of the + # squashed-migrations file, not a FK round-trip concern. + down_revs = list(reversed(revs[1:])) + down_targets = list(reversed(revs[:-1])) + for downgrading_rev, target in zip(down_revs, down_targets): + try: + _seed_for_current_schema(settings.get_engine) + except Exception as exc: + _fail("seed", downgrading_rev, exc) + return + try: + airflow_db.downgrade(to_revision=target) + except Exception as exc: + _fail("down", downgrading_rev, exc) + return + + console.print() + console.print( + Panel( + f"[bold green]PASS[/] — {len(revs)} revisions round-tripped clean", + border_style="green", + expand=False, + ) + ) + finally: + _SKIP_EXTERNAL_DB_MANAGERS_UPGRADE.reset(token) + finally: + db.unlink(missing_ok=True) + + +def _fail(direction: str, rev: str, exc: Exception) -> None: + console.print() + console.print( + Panel( + f"[bold red]{direction.upper()} {rev}[/]\n" + f" {type(exc).__name__}: {exc}\n\n" + "[dim]See contributing-docs/26_migration_round_trip_check.rst for the " + "disable_sqlite_fkeys placement convention.[/]", + title="[bold red]FAIL[/]", + border_style="red", + expand=False, + ) + ) + sys.exit(1) + + +if __name__ == "__main__": + main()