-
Notifications
You must be signed in to change notification settings - Fork 17k
Fix SQLite pragma teardown in DB migrations #64838
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
e4975bf
df68dcc
069734e
c4cdfcf
6273f3a
0831db4
ddc8bfe
9cfa896
4d81786
a832aca
17f7557
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,13 +19,23 @@ | |
| import contextlib | ||
| from contextlib import contextmanager | ||
|
|
||
| import sqlalchemy as sa | ||
| from alembic import op as alembic_op | ||
| from sqlalchemy import text | ||
|
|
||
|
|
||
| def get_dialect_name(op) -> str: | ||
| conn = op.get_bind() | ||
| return conn.dialect.name if conn is not None else op.get_context().dialect.name | ||
|
|
||
|
|
||
| @contextmanager | ||
| def disable_sqlite_fkeys(op): | ||
| if op.get_bind().dialect.name == "sqlite": | ||
| op.execute("PRAGMA foreign_keys=off") | ||
| yield op | ||
| op.execute("PRAGMA foreign_keys=on") | ||
| if get_dialect_name(op) == "sqlite": | ||
| with contextlib.ExitStack() as exit_stack: | ||
| op.execute("PRAGMA foreign_keys=off") | ||
| exit_stack.callback(op.execute, "PRAGMA foreign_keys=on") | ||
| yield op | ||
| else: | ||
| yield op | ||
|
|
||
|
|
@@ -56,8 +66,196 @@ def mysql_drop_foreignkey_if_exists(constraint_name, table_name, op): | |
|
|
||
|
|
||
| def ignore_sqlite_value_error(): | ||
| from alembic import op | ||
|
|
||
| if op.get_bind().dialect.name == "sqlite": | ||
| if get_dialect_name(alembic_op) == "sqlite": | ||
| return contextlib.suppress(ValueError) | ||
| return contextlib.nullcontext() | ||
|
|
||
|
|
||
| def create_index_if_not_exists(op, index_name, table_name, columns, unique=False) -> None: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this still used somewhere? |
||
| """ | ||
| Create an index if it does not already exist. | ||
|
|
||
| MySQL does not support CREATE INDEX IF NOT EXISTS, so a stored procedure is used. | ||
|
iting0321 marked this conversation as resolved.
|
||
| PostgreSQL and SQLite support it natively. | ||
| """ | ||
| dialect_name = get_dialect_name(op) | ||
|
|
||
| if dialect_name == "mysql": | ||
| unique_kw = "UNIQUE " if unique else "" | ||
| col_list = ", ".join(f"`{c}`" for c in columns) | ||
| op.execute( | ||
| text(f""" | ||
| DROP PROCEDURE IF EXISTS CreateIndexIfNotExists; | ||
| CREATE PROCEDURE CreateIndexIfNotExists() | ||
| BEGIN | ||
| IF NOT EXISTS ( | ||
| SELECT 1 | ||
| FROM information_schema.STATISTICS | ||
| WHERE | ||
| TABLE_SCHEMA = DATABASE() AND | ||
| TABLE_NAME = '{table_name}' AND | ||
| INDEX_NAME = '{index_name}' | ||
| ) THEN | ||
| CREATE {unique_kw}INDEX `{index_name}` ON `{table_name}` ({col_list}); | ||
| END IF; | ||
| END; | ||
| CALL CreateIndexIfNotExists(); | ||
| DROP PROCEDURE IF EXISTS CreateIndexIfNotExists; | ||
| """) | ||
| ) | ||
| else: | ||
| op.create_index(index_name, table_name, columns, unique=unique, if_not_exists=True) | ||
|
|
||
|
|
||
| def drop_index_if_exists(op, index_name, table_name) -> None: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this still used somewhere? |
||
| """ | ||
| Drop an index if it exists. | ||
|
|
||
| Works in both online and offline mode by using raw SQL for PostgreSQL and MySQL. | ||
| SQLite and PostgreSQL support DROP INDEX IF EXISTS natively. | ||
| MySQL requires a stored procedure since it does not support IF EXISTS for DROP INDEX. | ||
| """ | ||
| dialect_name = get_dialect_name(op) | ||
|
|
||
| if dialect_name == "mysql": | ||
| op.execute( | ||
| text(f""" | ||
| DROP PROCEDURE IF EXISTS DropIndexIfExists; | ||
| CREATE PROCEDURE DropIndexIfExists() | ||
| BEGIN | ||
| IF EXISTS ( | ||
| SELECT 1 | ||
| FROM information_schema.STATISTICS | ||
| WHERE | ||
| TABLE_SCHEMA = DATABASE() AND | ||
| TABLE_NAME = '{table_name}' AND | ||
| INDEX_NAME = '{index_name}' | ||
| ) THEN | ||
| DROP INDEX `{index_name}` ON `{table_name}`; | ||
| END IF; | ||
| END; | ||
| CALL DropIndexIfExists(); | ||
| DROP PROCEDURE DropIndexIfExists; | ||
| """) | ||
|
iting0321 marked this conversation as resolved.
|
||
| ) | ||
| else: | ||
| # PostgreSQL and SQLite both support DROP INDEX IF EXISTS | ||
| op.drop_index(index_name, table_name=table_name, if_exists=True) | ||
|
|
||
|
|
||
| def drop_unique_constraints_on_columns(op, table_name, columns) -> None: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this still used somewhere? |
||
| """ | ||
| Drop all unique constraints covering any of the given columns, regardless of constraint name. | ||
|
|
||
| Works in both online and offline mode by using raw SQL for PostgreSQL and MySQL. | ||
| SQLite falls back to batch mode and requires a live connection. | ||
| """ | ||
| dialect_name = get_dialect_name(op) | ||
|
iting0321 marked this conversation as resolved.
|
||
|
|
||
| if dialect_name == "postgresql": | ||
| cols_array = ", ".join(f"'{c}'" for c in columns) | ||
| op.execute( | ||
|
Comment on lines
+155
to
+157
|
||
| text(f""" | ||
| DO $$ | ||
| DECLARE r record; | ||
| BEGIN | ||
| FOR r IN | ||
| SELECT DISTINCT tc.constraint_name | ||
| FROM information_schema.table_constraints tc | ||
| JOIN information_schema.key_column_usage kcu | ||
| ON tc.constraint_name = kcu.constraint_name | ||
| AND tc.table_schema = kcu.table_schema | ||
| WHERE tc.table_name = '{table_name}' | ||
| AND tc.constraint_type = 'UNIQUE' | ||
| AND kcu.column_name = ANY(ARRAY[{cols_array}]::text[]) | ||
| LOOP | ||
| EXECUTE 'ALTER TABLE ' || quote_ident('{table_name}') || ' DROP CONSTRAINT IF EXISTS ' | ||
| || quote_ident(r.constraint_name); | ||
| END LOOP; | ||
| END $$ | ||
| """) | ||
| ) | ||
| elif dialect_name == "mysql": | ||
| cols_in = ", ".join(f"'{c}'" for c in columns) | ||
| op.execute( | ||
| text(f""" | ||
| DROP PROCEDURE IF EXISTS DropUniqueOnColumns; | ||
| CREATE PROCEDURE DropUniqueOnColumns() | ||
| BEGIN | ||
| DECLARE done INT DEFAULT FALSE; | ||
| DECLARE v_name VARCHAR(255); | ||
| DECLARE cur CURSOR FOR | ||
| SELECT DISTINCT kcu.CONSTRAINT_NAME | ||
|
iting0321 marked this conversation as resolved.
|
||
| FROM information_schema.KEY_COLUMN_USAGE kcu | ||
| JOIN information_schema.TABLE_CONSTRAINTS tc | ||
| ON kcu.CONSTRAINT_NAME = tc.CONSTRAINT_NAME | ||
| AND kcu.TABLE_SCHEMA = tc.TABLE_SCHEMA | ||
| AND kcu.TABLE_NAME = tc.TABLE_NAME | ||
| WHERE kcu.TABLE_NAME = '{table_name}' | ||
| AND kcu.TABLE_SCHEMA = DATABASE() | ||
| AND tc.CONSTRAINT_TYPE = 'UNIQUE' | ||
| AND kcu.COLUMN_NAME IN ({cols_in}); | ||
| DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = TRUE; | ||
| OPEN cur; | ||
| drop_loop: LOOP | ||
| FETCH cur INTO v_name; | ||
| IF done THEN LEAVE drop_loop; END IF; | ||
| SET @stmt = CONCAT('ALTER TABLE `{table_name}` DROP INDEX `', v_name, '`'); | ||
| PREPARE s FROM @stmt; | ||
| EXECUTE s; | ||
| DEALLOCATE PREPARE s; | ||
| END LOOP; | ||
| CLOSE cur; | ||
| END; | ||
| CALL DropUniqueOnColumns(); | ||
| DROP PROCEDURE DropUniqueOnColumns; | ||
| """) | ||
| ) | ||
| else: | ||
| # SQLite — batch mode rewrites the table; requires a live connection | ||
| with op.batch_alter_table(table_name, schema=None) as batch_op: | ||
| for uq in sa.inspect(op.get_bind()).get_unique_constraints(table_name): | ||
| if any(col in uq["column_names"] for col in columns): | ||
| batch_op.drop_constraint(uq["name"], type_="unique") | ||
|
|
||
|
|
||
| def drop_unique_constraint_if_exists(op, table_name, constraint_name) -> None: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this still used somewhere? |
||
| """ | ||
| Drop a unique constraint by name if it exists. | ||
|
|
||
| Works in both online and offline mode by using raw SQL for PostgreSQL and MySQL. | ||
| SQLite falls back to batch mode and requires a live connection. | ||
| """ | ||
| dialect_name = get_dialect_name(op) | ||
|
|
||
| if dialect_name == "postgresql": | ||
| op.execute(text(f'ALTER TABLE "{table_name}" DROP CONSTRAINT IF EXISTS "{constraint_name}"')) | ||
| elif dialect_name == "mysql": | ||
| op.execute( | ||
| text(f""" | ||
|
iting0321 marked this conversation as resolved.
|
||
| DROP PROCEDURE IF EXISTS DropUniqueIfExists; | ||
| CREATE PROCEDURE DropUniqueIfExists() | ||
| BEGIN | ||
| IF EXISTS ( | ||
| SELECT 1 | ||
| FROM information_schema.TABLE_CONSTRAINTS | ||
| WHERE | ||
| CONSTRAINT_SCHEMA = DATABASE() AND | ||
| TABLE_NAME = '{table_name}' AND | ||
| CONSTRAINT_NAME = '{constraint_name}' AND | ||
| CONSTRAINT_TYPE = 'UNIQUE' | ||
| ) THEN | ||
| ALTER TABLE `{table_name}` DROP INDEX `{constraint_name}`; | ||
| ELSE | ||
| SELECT 1; | ||
| END IF; | ||
| END; | ||
| CALL DropUniqueIfExists(); | ||
| DROP PROCEDURE DropUniqueIfExists; | ||
| """) | ||
| ) | ||
| else: | ||
| # SQLite — batch mode rewrites the table; requires a live connection | ||
| with op.batch_alter_table(table_name, schema=None) as batch_op: | ||
| with contextlib.suppress(ValueError): | ||
| batch_op.drop_constraint(constraint_name, type_="unique") | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,9 +30,8 @@ | |
|
|
||
| import sqlalchemy as sa | ||
| from alembic import op | ||
| from sqlalchemy import text | ||
|
|
||
| from airflow.migrations.utils import mysql_drop_foreignkey_if_exists | ||
| from airflow.migrations.utils import disable_sqlite_fkeys, mysql_drop_foreignkey_if_exists | ||
|
|
||
| # revision identifiers, used by Alembic. | ||
| revision = "05234396c6fc" | ||
|
|
@@ -103,19 +102,18 @@ def _drop_fkey_if_exists(table, constraint_name): | |
| conn = op.get_bind() | ||
| dialect_name = conn.dialect.name | ||
|
|
||
| if dialect_name == "sqlite": | ||
| # SQLite requires foreign key constraints to be disabled during batch operations | ||
| conn.execute(text("PRAGMA foreign_keys=OFF")) | ||
| try: | ||
| with op.batch_alter_table(table, schema=None) as batch_op: | ||
| batch_op.drop_constraint(op.f(constraint_name), type_="foreignkey") | ||
| except ValueError: | ||
| pass | ||
| conn.execute(text("PRAGMA foreign_keys=ON")) | ||
| elif dialect_name == "mysql": | ||
| if dialect_name == "mysql": | ||
| mysql_drop_foreignkey_if_exists(constraint_name, table, op) | ||
| else: | ||
| elif dialect_name == "postgresql": | ||
| op.execute(f"ALTER TABLE {table} DROP CONSTRAINT IF EXISTS {constraint_name}") | ||
| else: | ||
| # SQLite requires foreign key constraints to be disabled during batch operations. | ||
| with disable_sqlite_fkeys(op): | ||
|
iting0321 marked this conversation as resolved.
|
||
| try: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should we use |
||
| with op.batch_alter_table(table, schema=None) as batch_op: | ||
| batch_op.drop_constraint(op.f(constraint_name), type_="foreignkey") | ||
| except ValueError: | ||
| pass | ||
|
|
||
|
|
||
| # original table name to new table name | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.