Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 205 additions & 7 deletions airflow-core/src/airflow/migrations/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,23 @@
import contextlib
from contextlib import contextmanager

import sqlalchemy as sa
from alembic import op as alembic_op
from sqlalchemy import text


def get_dialect_name(op) -> str:
conn = op.get_bind()
return conn.dialect.name if conn is not None else op.get_context().dialect.name


@contextmanager
def disable_sqlite_fkeys(op):
if op.get_bind().dialect.name == "sqlite":
op.execute("PRAGMA foreign_keys=off")
yield op
op.execute("PRAGMA foreign_keys=on")
if get_dialect_name(op) == "sqlite":
with contextlib.ExitStack() as exit_stack:
op.execute("PRAGMA foreign_keys=off")
exit_stack.callback(op.execute, "PRAGMA foreign_keys=on")
yield op
Comment thread
jason810496 marked this conversation as resolved.
else:
yield op

Expand Down Expand Up @@ -56,8 +66,196 @@ def mysql_drop_foreignkey_if_exists(constraint_name, table_name, op):


def ignore_sqlite_value_error():
from alembic import op

if op.get_bind().dialect.name == "sqlite":
if get_dialect_name(alembic_op) == "sqlite":
return contextlib.suppress(ValueError)
return contextlib.nullcontext()


def create_index_if_not_exists(op, index_name, table_name, columns, unique=False) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still used somewhere?

"""
Create an index if it does not already exist.

MySQL does not support CREATE INDEX IF NOT EXISTS, so a stored procedure is used.
Comment thread
iting0321 marked this conversation as resolved.
PostgreSQL and SQLite support it natively.
"""
dialect_name = get_dialect_name(op)

if dialect_name == "mysql":
unique_kw = "UNIQUE " if unique else ""
col_list = ", ".join(f"`{c}`" for c in columns)
op.execute(
text(f"""
DROP PROCEDURE IF EXISTS CreateIndexIfNotExists;
CREATE PROCEDURE CreateIndexIfNotExists()
BEGIN
IF NOT EXISTS (
SELECT 1
FROM information_schema.STATISTICS
WHERE
TABLE_SCHEMA = DATABASE() AND
TABLE_NAME = '{table_name}' AND
INDEX_NAME = '{index_name}'
) THEN
CREATE {unique_kw}INDEX `{index_name}` ON `{table_name}` ({col_list});
END IF;
END;
CALL CreateIndexIfNotExists();
DROP PROCEDURE IF EXISTS CreateIndexIfNotExists;
""")
)
else:
op.create_index(index_name, table_name, columns, unique=unique, if_not_exists=True)


def drop_index_if_exists(op, index_name, table_name) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still used somewhere?

"""
Drop an index if it exists.

Works in both online and offline mode by using raw SQL for PostgreSQL and MySQL.
SQLite and PostgreSQL support DROP INDEX IF EXISTS natively.
MySQL requires a stored procedure since it does not support IF EXISTS for DROP INDEX.
"""
dialect_name = get_dialect_name(op)

if dialect_name == "mysql":
op.execute(
text(f"""
DROP PROCEDURE IF EXISTS DropIndexIfExists;
CREATE PROCEDURE DropIndexIfExists()
BEGIN
IF EXISTS (
SELECT 1
FROM information_schema.STATISTICS
WHERE
TABLE_SCHEMA = DATABASE() AND
TABLE_NAME = '{table_name}' AND
INDEX_NAME = '{index_name}'
) THEN
DROP INDEX `{index_name}` ON `{table_name}`;
END IF;
END;
CALL DropIndexIfExists();
DROP PROCEDURE DropIndexIfExists;
""")
Comment thread
iting0321 marked this conversation as resolved.
)
else:
# PostgreSQL and SQLite both support DROP INDEX IF EXISTS
op.drop_index(index_name, table_name=table_name, if_exists=True)


def drop_unique_constraints_on_columns(op, table_name, columns) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still used somewhere?

"""
Drop all unique constraints covering any of the given columns, regardless of constraint name.

Works in both online and offline mode by using raw SQL for PostgreSQL and MySQL.
SQLite falls back to batch mode and requires a live connection.
"""
dialect_name = get_dialect_name(op)
Comment thread
iting0321 marked this conversation as resolved.

if dialect_name == "postgresql":
cols_array = ", ".join(f"'{c}'" for c in columns)
op.execute(
Comment on lines +155 to +157
text(f"""
DO $$
DECLARE r record;
BEGIN
FOR r IN
SELECT DISTINCT tc.constraint_name
FROM information_schema.table_constraints tc
JOIN information_schema.key_column_usage kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_schema = kcu.table_schema
WHERE tc.table_name = '{table_name}'
AND tc.constraint_type = 'UNIQUE'
AND kcu.column_name = ANY(ARRAY[{cols_array}]::text[])
LOOP
EXECUTE 'ALTER TABLE ' || quote_ident('{table_name}') || ' DROP CONSTRAINT IF EXISTS '
|| quote_ident(r.constraint_name);
END LOOP;
END $$
""")
)
elif dialect_name == "mysql":
cols_in = ", ".join(f"'{c}'" for c in columns)
op.execute(
text(f"""
DROP PROCEDURE IF EXISTS DropUniqueOnColumns;
CREATE PROCEDURE DropUniqueOnColumns()
BEGIN
DECLARE done INT DEFAULT FALSE;
DECLARE v_name VARCHAR(255);
DECLARE cur CURSOR FOR
SELECT DISTINCT kcu.CONSTRAINT_NAME
Comment thread
iting0321 marked this conversation as resolved.
FROM information_schema.KEY_COLUMN_USAGE kcu
JOIN information_schema.TABLE_CONSTRAINTS tc
ON kcu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME
AND kcu.TABLE_SCHEMA = tc.TABLE_SCHEMA
AND kcu.TABLE_NAME = tc.TABLE_NAME
WHERE kcu.TABLE_NAME = '{table_name}'
AND kcu.TABLE_SCHEMA = DATABASE()
AND tc.CONSTRAINT_TYPE = 'UNIQUE'
AND kcu.COLUMN_NAME IN ({cols_in});
DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE;
OPEN cur;
drop_loop: LOOP
FETCH cur INTO v_name;
IF done THEN LEAVE drop_loop; END IF;
SET @stmt = CONCAT('ALTER TABLE `{table_name}` DROP INDEX `', v_name, '`');
PREPARE s FROM @stmt;
EXECUTE s;
DEALLOCATE PREPARE s;
END LOOP;
CLOSE cur;
END;
CALL DropUniqueOnColumns();
DROP PROCEDURE DropUniqueOnColumns;
""")
)
else:
# SQLite — batch mode rewrites the table; requires a live connection
with op.batch_alter_table(table_name, schema=None) as batch_op:
for uq in sa.inspect(op.get_bind()).get_unique_constraints(table_name):
if any(col in uq["column_names"] for col in columns):
batch_op.drop_constraint(uq["name"], type_="unique")


def drop_unique_constraint_if_exists(op, table_name, constraint_name) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this still used somewhere?

"""
Drop a unique constraint by name if it exists.

Works in both online and offline mode by using raw SQL for PostgreSQL and MySQL.
SQLite falls back to batch mode and requires a live connection.
"""
dialect_name = get_dialect_name(op)

if dialect_name == "postgresql":
op.execute(text(f'ALTER TABLE "{table_name}" DROP CONSTRAINT IF EXISTS "{constraint_name}"'))
elif dialect_name == "mysql":
op.execute(
text(f"""
Comment thread
iting0321 marked this conversation as resolved.
DROP PROCEDURE IF EXISTS DropUniqueIfExists;
CREATE PROCEDURE DropUniqueIfExists()
BEGIN
IF EXISTS (
SELECT 1
FROM information_schema.TABLE_CONSTRAINTS
WHERE
CONSTRAINT_SCHEMA = DATABASE() AND
TABLE_NAME = '{table_name}' AND
CONSTRAINT_NAME = '{constraint_name}' AND
CONSTRAINT_TYPE = 'UNIQUE'
) THEN
ALTER TABLE `{table_name}` DROP INDEX `{constraint_name}`;
ELSE
SELECT 1;
END IF;
END;
CALL DropUniqueIfExists();
DROP PROCEDURE DropUniqueIfExists;
""")
)
else:
# SQLite — batch mode rewrites the table; requires a live connection
with op.batch_alter_table(table_name, schema=None) as batch_op:
with contextlib.suppress(ValueError):
batch_op.drop_constraint(constraint_name, type_="unique")
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@

import sqlalchemy as sa
from alembic import op
from sqlalchemy import text

from airflow.migrations.utils import mysql_drop_foreignkey_if_exists
from airflow.migrations.utils import disable_sqlite_fkeys, mysql_drop_foreignkey_if_exists

# revision identifiers, used by Alembic.
revision = "05234396c6fc"
Expand Down Expand Up @@ -103,19 +102,18 @@ def _drop_fkey_if_exists(table, constraint_name):
conn = op.get_bind()
dialect_name = conn.dialect.name

if dialect_name == "sqlite":
# SQLite requires foreign key constraints to be disabled during batch operations
conn.execute(text("PRAGMA foreign_keys=OFF"))
try:
with op.batch_alter_table(table, schema=None) as batch_op:
batch_op.drop_constraint(op.f(constraint_name), type_="foreignkey")
except ValueError:
pass
conn.execute(text("PRAGMA foreign_keys=ON"))
elif dialect_name == "mysql":
if dialect_name == "mysql":
mysql_drop_foreignkey_if_exists(constraint_name, table, op)
else:
elif dialect_name == "postgresql":
op.execute(f"ALTER TABLE {table} DROP CONSTRAINT IF EXISTS {constraint_name}")
else:
# SQLite requires foreign key constraints to be disabled during batch operations.
with disable_sqlite_fkeys(op):
Comment thread
iting0321 marked this conversation as resolved.
try:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use ignore_sqlite_value_error?

with op.batch_alter_table(table, schema=None) as batch_op:
batch_op.drop_constraint(op.f(constraint_name), type_="foreignkey")
except ValueError:
pass


# original table name to new table name
Expand Down
Loading
Loading