Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 201 additions & 4 deletions airflow-core/src/airflow/migrations/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,21 @@
import contextlib
from contextlib import contextmanager

from sqlalchemy import text


def get_dialect_name(op) -> str:
conn = op.get_bind()
return conn.dialect.name if conn is not None else op.get_context().dialect.name


@contextmanager
def disable_sqlite_fkeys(op):
if op.get_bind().dialect.name == "sqlite":
op.execute("PRAGMA foreign_keys=off")
yield op
op.execute("PRAGMA foreign_keys=on")
if get_dialect_name(op) == "sqlite":
with contextlib.ExitStack() as exit_stack:
op.execute("PRAGMA foreign_keys=off")
exit_stack.callback(op.execute, "PRAGMA foreign_keys=on")
yield op
Comment thread
jason810496 marked this conversation as resolved.
else:
yield op

Expand Down Expand Up @@ -61,3 +69,192 @@ def ignore_sqlite_value_error():
if op.get_bind().dialect.name == "sqlite":
Comment thread
iting0321 marked this conversation as resolved.
Outdated
return contextlib.suppress(ValueError)
Comment thread
iting0321 marked this conversation as resolved.
return contextlib.nullcontext()


def create_index_if_not_exists(op, index_name, table_name, columns, unique=False) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still used somewhere?

"""
Create an index if it does not already exist.

MySQL does not support CREATE INDEX IF NOT EXISTS, so a stored procedure is used.
Comment thread
iting0321 marked this conversation as resolved.
PostgreSQL and SQLite support it natively.
"""
dialect_name = get_dialect_name(op)

if dialect_name == "mysql":
unique_kw = "UNIQUE " if unique else ""
col_list = ", ".join(f"`{c}`" for c in columns)
op.execute(
text(f"""
DROP PROCEDURE IF EXISTS CreateIndexIfNotExists;
CREATE PROCEDURE CreateIndexIfNotExists()
BEGIN
IF NOT EXISTS (
SELECT 1
FROM information_schema.STATISTICS
WHERE
TABLE_SCHEMA = DATABASE() AND
TABLE_NAME = '{table_name}' AND
INDEX_NAME = '{index_name}'
) THEN
CREATE {unique_kw}INDEX `{index_name}` ON `{table_name}` ({col_list});
END IF;
END;
CALL CreateIndexIfNotExists();
DROP PROCEDURE IF EXISTS CreateIndexIfNotExists;
""")
)
else:
op.create_index(index_name, table_name, columns, unique=unique, if_not_exists=True)


def drop_index_if_exists(op, index_name, table_name) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still used somewhere?

"""
Drop an index if it exists.

Works in both online and offline mode by using raw SQL for PostgreSQL and MySQL.
SQLite and PostgreSQL support DROP INDEX IF EXISTS natively.
MySQL requires a stored procedure since it does not support IF EXISTS for DROP INDEX.
"""
dialect_name = get_dialect_name(op)

if dialect_name == "mysql":
op.execute(
text(f"""
CREATE PROCEDURE DropIndexIfExists()
BEGIN
IF EXISTS (
SELECT 1
FROM information_schema.STATISTICS
WHERE
TABLE_SCHEMA = DATABASE() AND
TABLE_NAME = '{table_name}' AND
INDEX_NAME = '{index_name}'
) THEN
DROP INDEX `{index_name}` ON `{table_name}`;
END IF;
END;
CALL DropIndexIfExists();
DROP PROCEDURE DropIndexIfExists;
""")
Comment thread
iting0321 marked this conversation as resolved.
)
else:
# PostgreSQL and SQLite both support DROP INDEX IF EXISTS
op.drop_index(index_name, table_name=table_name, if_exists=True)


def drop_unique_constraints_on_columns(op, table_name, columns) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still used somewhere?

"""
Drop all unique constraints covering any of the given columns, regardless of constraint name.

Works in both online and offline mode by using raw SQL for PostgreSQL and MySQL.
SQLite falls back to batch mode and requires a live connection.
"""
import sqlalchemy as sa

dialect_name = get_dialect_name(op)
Comment thread
iting0321 marked this conversation as resolved.

if dialect_name == "postgresql":
cols_array = ", ".join(f"'{c}'" for c in columns)
op.execute(
Comment on lines +155 to +157
text(f"""
DO $$
DECLARE r record;
BEGIN
FOR r IN
SELECT DISTINCT tc.constraint_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
WHERE tc.table_name = '{table_name}'
AND tc.constraint_type = 'UNIQUE'
AND kcu.column_name = ANY(ARRAY[{cols_array}]::text[])
LOOP
EXECUTE 'ALTER TABLE ' || quote_ident('{table_name}') || ' DROP CONSTRAINT IF EXISTS '
|| quote_ident(r.constraint_name);
END LOOP;
END $$
""")
)
elif dialect_name == "mysql":
cols_in = ", ".join(f"'{c}'" for c in columns)
op.execute(
text(f"""
CREATE PROCEDURE DropUniqueOnColumns()
BEGIN
DECLARE done INT DEFAULT FALSE;
DECLARE v_name VARCHAR(255);
DECLARE cur CURSOR FOR
SELECT DISTINCT kcu.CONSTRAINT_NAME
Comment thread
iting0321 marked this conversation as resolved.
FROM information_schema.KEY_COLUMN_USAGE kcu
JOIN information_schema.TABLE_CONSTRAINTS tc
ON kcu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
AND kcu.TABLE_SCHEMA = tc.TABLE_SCHEMA
AND kcu.TABLE_NAME = tc.TABLE_NAME
WHERE kcu.TABLE_NAME = '{table_name}'
AND kcu.TABLE_SCHEMA = DATABASE()
AND tc.CONSTRAINT_TYPE = 'UNIQUE'
AND kcu.COLUMN_NAME IN ({cols_in});
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
OPEN cur;
drop_loop: LOOP
FETCH cur INTO v_name;
IF done THEN LEAVE drop_loop; END IF;
SET @stmt = CONCAT('ALTER TABLE `{table_name}` DROP INDEX `', v_name, '`');
PREPARE s FROM @stmt;
EXECUTE s;
DEALLOCATE PREPARE s;
END LOOP;
CLOSE cur;
END;
CALL DropUniqueOnColumns();
DROP PROCEDURE DropUniqueOnColumns;
""")
)
else:
# SQLite — batch mode rewrites the table; requires a live connection
with op.batch_alter_table(table_name, schema=None) as batch_op:
for uq in sa.inspect(op.get_bind()).get_unique_constraints(table_name):
if any(col in uq["column_names"] for col in columns):
batch_op.drop_constraint(uq["name"], type_="unique")


def drop_unique_constraint_if_exists(op, table_name, constraint_name) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still used somewhere?

"""
Drop a unique constraint by name if it exists.

Works in both online and offline mode by using raw SQL for PostgreSQL and MySQL.
SQLite falls back to batch mode and requires a live connection.
"""
dialect_name = get_dialect_name(op)

if dialect_name == "postgresql":
op.execute(text(f'ALTER TABLE "{table_name}" DROP CONSTRAINT IF EXISTS "{constraint_name}"'))
elif dialect_name == "mysql":
op.execute(
text(f"""
Comment thread
iting0321 marked this conversation as resolved.
CREATE PROCEDURE DropUniqueIfExists()
BEGIN
IF EXISTS (
SELECT 1
FROM information_schema.TABLE_CONSTRAINTS
WHERE
CONSTRAINT_SCHEMA = DATABASE() AND
TABLE_NAME = '{table_name}' AND
CONSTRAINT_NAME = '{constraint_name}' AND
CONSTRAINT_TYPE = 'UNIQUE'
) THEN
ALTER TABLE `{table_name}` DROP INDEX `{constraint_name}`;
ELSE
SELECT 1;
END IF;
END;
CALL DropUniqueIfExists();
DROP PROCEDURE DropUniqueIfExists;
""")
)
else:
# SQLite — batch mode rewrites the table; requires a live connection
with op.batch_alter_table(table_name, schema=None) as batch_op:
with contextlib.suppress(ValueError):
batch_op.drop_constraint(constraint_name, type_="unique")
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@

import sqlalchemy as sa
from alembic import op
from sqlalchemy import text

from airflow.migrations.utils import mysql_drop_foreignkey_if_exists
from airflow.migrations.utils import disable_sqlite_fkeys, mysql_drop_foreignkey_if_exists

# revision identifiers, used by Alembic.
revision = "05234396c6fc"
Expand Down Expand Up @@ -103,19 +102,18 @@ def _drop_fkey_if_exists(table, constraint_name):
conn = op.get_bind()
dialect_name = conn.dialect.name

Comment on lines 101 to 104
if dialect_name == "sqlite":
# SQLite requires foreign key constraints to be disabled during batch operations
conn.execute(text("PRAGMA foreign_keys=OFF"))
try:
with op.batch_alter_table(table, schema=None) as batch_op:
batch_op.drop_constraint(op.f(constraint_name), type_="foreignkey")
except ValueError:
pass
conn.execute(text("PRAGMA foreign_keys=ON"))
elif dialect_name == "mysql":
if dialect_name == "mysql":
mysql_drop_foreignkey_if_exists(constraint_name, table, op)
else:
elif dialect_name == "postgresql":
op.execute(f"ALTER TABLE {table} DROP CONSTRAINT IF EXISTS {constraint_name}")
else:
# SQLite requires foreign key constraints to be disabled during batch operations.
with disable_sqlite_fkeys(op):
Comment thread
iting0321 marked this conversation as resolved.
try:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use ignore_sqlite_value_error?

with op.batch_alter_table(table, schema=None) as batch_op:
batch_op.drop_constraint(op.f(constraint_name), type_="foreignkey")
except ValueError:
pass


# original table name to new table name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from sqlalchemy.sql import text

from airflow.migrations.db_types import StringID
from airflow.migrations.utils import ignore_sqlite_value_error
from airflow.migrations.utils import disable_sqlite_fkeys, ignore_sqlite_value_error

# revision identifiers, used by Alembic.
revision = "7582ea3f3dd5"
Expand Down Expand Up @@ -61,8 +61,8 @@ def upgrade():
('dags-folder');
""")
)

if dialect_name == "sqlite":
op.execute(text("PRAGMA foreign_keys=OFF"))
op.execute(
text("""
INSERT OR IGNORE INTO dag_bundle (name) VALUES
Expand All @@ -71,51 +71,38 @@ def upgrade():
""")
)

conn = op.get_bind()
with ignore_sqlite_value_error(), op.batch_alter_table("dag", schema=None) as batch_op:
conn.execute(
text(
"""
UPDATE dag
SET bundle_name =
CASE
WHEN fileloc LIKE '%/airflow/example_dags/%' THEN 'example_dags'
ELSE 'dags-folder'
END
WHERE bundle_name IS NULL
"""
with disable_sqlite_fkeys(op):
conn = op.get_bind()
with ignore_sqlite_value_error(), op.batch_alter_table("dag", schema=None) as batch_op:
conn.execute(
text(
"""
UPDATE dag
SET bundle_name =
CASE
WHEN fileloc LIKE '%/airflow/example_dags/%' THEN 'example_dags'
ELSE 'dags-folder'
END
WHERE bundle_name IS NULL
"""
)
)
)
# drop the foreign key temporarily and recreate it once both columns are changed
batch_op.drop_constraint(batch_op.f("dag_bundle_name_fkey"), type_="foreignkey")
batch_op.alter_column("bundle_name", nullable=False, existing_type=StringID())
# drop the foreign key temporarily and recreate it once both columns are changed
batch_op.drop_constraint(batch_op.f("dag_bundle_name_fkey"), type_="foreignkey")
batch_op.alter_column("bundle_name", nullable=False, existing_type=StringID())

with op.batch_alter_table("dag_bundle", schema=None) as batch_op:
batch_op.alter_column("name", nullable=False, existing_type=StringID())
with op.batch_alter_table("dag_bundle", schema=None) as batch_op:
batch_op.alter_column("name", nullable=False, existing_type=StringID())

with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.create_foreign_key(
batch_op.f("dag_bundle_name_fkey"), "dag_bundle", ["bundle_name"], ["name"]
)

if dialect_name == "sqlite":
op.execute(text("PRAGMA foreign_keys=ON"))
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.create_foreign_key(
batch_op.f("dag_bundle_name_fkey"), "dag_bundle", ["bundle_name"], ["name"]
)


def downgrade():
"""Make bundle_name nullable."""
import contextlib

dialect_name = op.get_bind().dialect.name
exitstack = contextlib.ExitStack()

if dialect_name == "sqlite":
# SQLite requires foreign key constraints to be disabled during batch operations
conn = op.get_bind()
conn.execute(text("PRAGMA foreign_keys=OFF"))
exitstack.callback(conn.execute, text("PRAGMA foreign_keys=ON"))

with exitstack:
with disable_sqlite_fkeys(op):
Comment thread
iting0321 marked this conversation as resolved.
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.drop_constraint(batch_op.f("dag_bundle_name_fkey"), type_="foreignkey")
batch_op.alter_column("bundle_name", nullable=True, existing_type=StringID())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@

import sqlalchemy as sa
from alembic import op
from sqlalchemy import text

from airflow.migrations.utils import disable_sqlite_fkeys

# revision identifiers, used by Alembic.
revision = "eaf332f43c7c"
Expand All @@ -47,15 +48,6 @@ def upgrade():

def downgrade():
"""Unapply add last_parse_duration to dag model."""
conn = op.get_bind()
dialect_name = conn.dialect.name

if dialect_name == "sqlite":
# SQLite requires foreign key constraints to be disabled during batch operations
conn.execute(text("PRAGMA foreign_keys=OFF"))
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.drop_column("last_parse_duration")
conn.execute(text("PRAGMA foreign_keys=ON"))
else:
with disable_sqlite_fkeys(op):
with op.batch_alter_table("dag", schema=None) as batch_op:
batch_op.drop_column("last_parse_duration")
Loading
Loading