From 50a4f5a4301e559429fa189795fe066934df5cfa Mon Sep 17 00:00:00 2001 From: David Blain Date: Fri, 27 Mar 2026 15:43:13 +0100 Subject: [PATCH 01/11] refactor: Add support for non-deferred pagination mode in GenericTransfer --- .../common/sql/operators/generic_transfer.py | 36 +++++++++--- .../common/sql/operators/generic_transfer.pyi | 2 + .../sql/operators/test_generic_transfer.py | 55 ++++++++++++++++++- 3 files changed, 83 insertions(+), 10 deletions(-) diff --git a/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.py b/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.py index 043ec22a5c8c7..053af41488173 100644 --- a/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.py +++ b/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.py @@ -21,7 +21,7 @@ from functools import cached_property from typing import TYPE_CHECKING, Any -from airflow.providers.common.compat.sdk import AirflowException, BaseHook, BaseOperator +from airflow.providers.common.compat.sdk import AirflowException, BaseHook, BaseOperator, conf from airflow.providers.common.sql.hooks.sql import DbApiHook from airflow.providers.common.sql.triggers.sql import SQLExecuteQueryTrigger @@ -55,6 +55,7 @@ class GenericTransfer(BaseOperator): :param insert_args: extra params for `insert_rows` method. :param page_size: number of records to be read in paginated mode (optional). :param paginated_sql_statement_clause: SQL statement clause to be used for pagination (optional). + :param deferrable: Run operator in the deferrable mode """ template_fields: Sequence[str] = ( @@ -90,6 +91,7 @@ def __init__( insert_args: dict | None = None, page_size: int | None = None, paginated_sql_statement_clause: str | None = None, + deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), **kwargs, ) -> None: super().__init__(**kwargs) @@ -104,6 +106,7 @@ def __init__( self.insert_args = insert_args or {} self.page_size = page_size self.paginated_sql_statement_clause = paginated_sql_statement_clause or "{} LIMIT {} OFFSET {}" + self.deferrable = deferrable @classmethod def get_hook(cls, conn_id: str, hook_params: dict | None = None) -> DbApiHook: @@ -159,14 +162,29 @@ def execute(self, context: Context): self.destination_hook.run(self.preoperator) if self.page_size and isinstance(self.sql, str): - self.defer( - trigger=SQLExecuteQueryTrigger( - conn_id=self.source_conn_id, - hook_params=self.source_hook_params, - sql=self.get_paginated_sql(0), - ), - method_name=self.execute_complete.__name__, - ) + if self.deferrable: + self.defer( + trigger=SQLExecuteQueryTrigger( + conn_id=self.source_conn_id, + hook_params=self.source_hook_params, + sql=self.get_paginated_sql(0), + ), + method_name=self.execute_complete.__name__, + ) + else: + offset = 0 + while True: + paginated_sql = self.get_paginated_sql(offset) + self.log.info("Executing: \n %s", paginated_sql) + if rows := self.source_hook.get_records(paginated_sql): + self._insert_rows(rows=rows, context=context) + offset += self.page_size + else: + self.log.info( + "No more rows to fetch into %s; ending transfer.", + self.destination_table, + ) + break else: if isinstance(self.sql, str): self.sql = [self.sql] diff --git a/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.pyi b/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.pyi index 3194e3877fdc2..d609251f90ddf 100644 --- a/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.pyi +++ b/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.pyi @@ -55,6 +55,7 @@ class GenericTransfer(BaseOperator): preoperator: Incomplete insert_args: Incomplete page_size: Incomplete + deferrable: bool def __init__( self, *, @@ -68,6 +69,7 @@ class GenericTransfer(BaseOperator): preoperator: str | list[str] | None = None, insert_args: dict | None = None, page_size: int | None = None, + deferrable: bool = False, **kwargs, ) -> None: ... @classmethod diff --git a/providers/common/sql/tests/unit/common/sql/operators/test_generic_transfer.py b/providers/common/sql/tests/unit/common/sql/operators/test_generic_transfer.py index 1406542df4ea5..da6a773ea4b36 100644 --- a/providers/common/sql/tests/unit/common/sql/operators/test_generic_transfer.py +++ b/providers/common/sql/tests/unit/common/sql/operators/test_generic_transfer.py @@ -25,6 +25,7 @@ import pytest from more_itertools import flatten +from sqlalchemy.orm import deferred from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.models.connection import Connection @@ -382,7 +383,50 @@ def test_non_paginated_read_for_multiple_sql_statements_with_rows_processor(self "table": "NEW_HR.EMPLOYEES", } - def test_paginated_read(self): + def test_non_deferred_paginated_read(self): + """ + Test that GenericTransfer paginates eagerly (non-deferred) when page_size is set and deferrable is False. + """ + with mock.patch(f"{BASEHOOK_PATCH_PATH}.get_connection", side_effect=self.get_connection): + with mock.patch(f"{BASEHOOK_PATCH_PATH}.get_hook", side_effect=self.get_hook): + operator = GenericTransfer( + task_id="transfer_table", + source_conn_id="my_source_conn_id", + destination_conn_id="my_destination_conn_id", + sql="SELECT * FROM HR.EMPLOYEES", + destination_table="NEW_HR.EMPLOYEES", + page_size=1000, # Enable pagination + insert_args=INSERT_ARGS, + execution_timeout=timedelta(hours=1), + deferrable=False, # Explicitly non-deferred + ) + + operator.execute(context=mock_context(task=operator)) + + assert self.mocked_source_hook.get_records.call_count == 3 + assert ( + self.mocked_source_hook.get_records.call_args_list[0].args[0] + == "SELECT * FROM HR.EMPLOYEES LIMIT 1000 OFFSET 0" + ) + assert ( + self.mocked_source_hook.get_records.call_args_list[1].args[0] + == "SELECT * FROM HR.EMPLOYEES LIMIT 1000 OFFSET 1000" + ) + assert ( + self.mocked_source_hook.get_records.call_args_list[2].args[0] + == "SELECT * FROM HR.EMPLOYEES LIMIT 1000 OFFSET 2000" + ) + assert self.mocked_destination_hook.insert_rows.call_count == 2 + assert self.mocked_destination_hook.insert_rows.call_args_list[0].kwargs == { + **INSERT_ARGS, + **{"rows": [[1, 2], [11, 12], [3, 4], [13, 14]], "table": "NEW_HR.EMPLOYEES"}, + } + assert self.mocked_destination_hook.insert_rows.call_args_list[1].kwargs == { + **INSERT_ARGS, + **{"rows": [[3, 4], [13, 14]], "table": "NEW_HR.EMPLOYEES"}, + } + + def test_deferred_paginated_read(self): """ This unit test is based on the example described in the medium article: https://medium.com/apache-airflow/transfering-data-from-sap-hana-to-mssql-using-the-airflow-generictransfer-d29f147a9f1f @@ -399,6 +443,7 @@ def test_paginated_read(self): page_size=1000, # Fetch data in chunks of 1000 rows for pagination insert_args=INSERT_ARGS, execution_timeout=timedelta(hours=1), + deferrable=True, ) results, events = execute_operator(operator) @@ -414,6 +459,14 @@ def test_paginated_read(self): self.mocked_source_hook.get_records.call_args_list[0].args[0] == "SELECT * FROM HR.EMPLOYEES LIMIT 1000 OFFSET 0" ) + assert ( + self.mocked_source_hook.get_records.call_args_list[1].args[0] + == "SELECT * FROM HR.EMPLOYEES LIMIT 1000 OFFSET 1000" + ) + assert ( + self.mocked_source_hook.get_records.call_args_list[2].args[0] + == "SELECT * FROM HR.EMPLOYEES LIMIT 1000 OFFSET 2000" + ) assert self.mocked_destination_hook.insert_rows.call_count == 2 assert self.mocked_destination_hook.insert_rows.call_args_list[0].kwargs == { **INSERT_ARGS, From 4c4c9778fc954ffef699245fd6a9dd6bdd987d6b Mon Sep 17 00:00:00 2001 From: David Blain Date: Fri, 27 Mar 2026 19:56:38 +0100 Subject: [PATCH 02/11] refactor: Removed unused sqlalchemy import --- .../sql/tests/unit/common/sql/operators/test_generic_transfer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/providers/common/sql/tests/unit/common/sql/operators/test_generic_transfer.py b/providers/common/sql/tests/unit/common/sql/operators/test_generic_transfer.py index da6a773ea4b36..36865140c9c43 100644 --- a/providers/common/sql/tests/unit/common/sql/operators/test_generic_transfer.py +++ b/providers/common/sql/tests/unit/common/sql/operators/test_generic_transfer.py @@ -25,7 +25,6 @@ import pytest from more_itertools import flatten -from sqlalchemy.orm import deferred from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.models.connection import Connection From 8cfaa3edc3c9001548901a6facf081dab92fcb2d Mon Sep 17 00:00:00 2001 From: David Blain Date: Fri, 3 Apr 2026 16:47:35 +0200 Subject: [PATCH 03/11] refactor: Add paginated_sql_statement_clause to GenericTransfer stub --- .../airflow/providers/common/sql/operators/generic_transfer.pyi | 2 ++ 1 file changed, 2 insertions(+) diff --git a/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.pyi b/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.pyi index d609251f90ddf..a7a69bb452b65 100644 --- a/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.pyi +++ b/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.pyi @@ -55,6 +55,7 @@ class GenericTransfer(BaseOperator): preoperator: Incomplete insert_args: Incomplete page_size: Incomplete + paginated_sql_statement_clause: Incomplete deferrable: bool def __init__( self, @@ -69,6 +70,7 @@ class GenericTransfer(BaseOperator): preoperator: str | list[str] | None = None, insert_args: dict | None = None, page_size: int | None = None, + paginated_sql_statement_clause: str | None = None, deferrable: bool = False, **kwargs, ) -> None: ... From e6d72bf0d731f3af7b3e1849e908b1529256e224 Mon Sep 17 00:00:00 2001 From: David Blain Date: Tue, 28 Apr 2026 20:26:16 +0200 Subject: [PATCH 04/11] refactor: Updated changelog for sql providers --- providers/common/sql/docs/changelog.rst | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/providers/common/sql/docs/changelog.rst b/providers/common/sql/docs/changelog.rst index 18c8971328b59..62651bd5490b7 100644 --- a/providers/common/sql/docs/changelog.rst +++ b/providers/common/sql/docs/changelog.rst @@ -25,24 +25,18 @@ Changelog --------- -1.34.0 +1.36.0 ...... -Features -~~~~~~~~ - -* ``Simplify default rows limit return result (#64183)`` - -Bug Fixes -~~~~~~~~~ +.. warning:: + **Breaking Change:** The default execution mode for paginated (``page_size`` + string SQL) GenericTransfer tasks has changed. Previously, these tasks always ran in deferred mode (using deferrable execution). Starting with this release, they now run synchronously by default unless you explicitly opt in to deferrable mode. -* ``Removed logging of rows length in SQLInsertRowsOperator to avoid crash on non materialized rows (#63346)`` -* ``Fix provider YAML validation for common SQL analytics operator (#63393)`` + This is a silent behavior change for any existing DAG using paginated GenericTransfer. If you want to restore the old behavior (always defer execution): -Misc -~~~~ + 1. Pass ``deferrable=True`` to each affected GenericTransfer task, **or** + 2. Set the global config option ``[operators] default_deferrable = true`` to make all operators deferrable by default. -* ``Add Python 3.14 Support (#63520)`` + Review your DAGs and configuration if you rely on deferred execution for paginated GenericTransfer tasks. .. Below changes are excluded from the changelog. Move them to appropriate section above if needed. Do not delete the lines(!): From c9850eec8d7906a4fd9a1ef6a2c2def488ccc6bb Mon Sep 17 00:00:00 2001 From: David Blain Date: Tue, 28 Apr 2026 20:26:54 +0200 Subject: [PATCH 05/11] refactor: Don't pass deferrable explicitly in test_non_deferred_paginated_read --- .../sql/tests/unit/common/sql/operators/test_generic_transfer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/providers/common/sql/tests/unit/common/sql/operators/test_generic_transfer.py b/providers/common/sql/tests/unit/common/sql/operators/test_generic_transfer.py index 36865140c9c43..ee27e4f2a01d2 100644 --- a/providers/common/sql/tests/unit/common/sql/operators/test_generic_transfer.py +++ b/providers/common/sql/tests/unit/common/sql/operators/test_generic_transfer.py @@ -397,7 +397,6 @@ def test_non_deferred_paginated_read(self): page_size=1000, # Enable pagination insert_args=INSERT_ARGS, execution_timeout=timedelta(hours=1), - deferrable=False, # Explicitly non-deferred ) operator.execute(context=mock_context(task=operator)) From ee0a9b244cb8b057606ef605874600eda92f22b9 Mon Sep 17 00:00:00 2001 From: David Blain Date: Wed, 29 Apr 2026 20:47:37 +0200 Subject: [PATCH 06/11] refactor: Updated changelog.rst --- providers/common/sql/docs/changelog.rst | 2 -- 1 file changed, 2 deletions(-) diff --git a/providers/common/sql/docs/changelog.rst b/providers/common/sql/docs/changelog.rst index abfd3fb13e96f..1a561615be722 100644 --- a/providers/common/sql/docs/changelog.rst +++ b/providers/common/sql/docs/changelog.rst @@ -40,8 +40,6 @@ Changelog .. Below changes are excluded from the changelog. Move them to appropriate section above if needed. Do not delete the lines(!): - * ``Fix flaky test_run_no_log in DbApiHook by verifying caplog text instead of length. (#63860)`` - * ``Add *.iml to .gitignore in all distributions (#63636)`` 1.35.0 ...... From 0c87892738ce7b7b8d56649bfd0070321fc596fc Mon Sep 17 00:00:00 2001 From: David Blain Date: Sun, 3 May 2026 11:13:04 +0200 Subject: [PATCH 07/11] refactor: Removed version from changelog as it has to be defined by release manager --- providers/common/sql/docs/changelog.rst | 3 --- 1 file changed, 3 deletions(-) diff --git a/providers/common/sql/docs/changelog.rst b/providers/common/sql/docs/changelog.rst index 1a561615be722..8ced2c43ad869 100644 --- a/providers/common/sql/docs/changelog.rst +++ b/providers/common/sql/docs/changelog.rst @@ -25,9 +25,6 @@ Changelog --------- -1.36.0 -...... - .. warning:: **Breaking Change:** The default execution mode for paginated (``page_size`` + string SQL) GenericTransfer tasks has changed. Previously, these tasks always ran in deferred mode (using deferrable execution). Starting with this release, they now run synchronously by default unless you explicitly opt in to deferrable mode. From 06b6c45a6ac3acbcfe7a3e848a4b9de3721c16f4 Mon Sep 17 00:00:00 2001 From: David Blain Date: Sun, 3 May 2026 11:14:52 +0200 Subject: [PATCH 08/11] refactor: Improved documentation for deferrable parameter --- .../airflow/providers/common/sql/operators/generic_transfer.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.py b/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.py index 053af41488173..9302a20b1d158 100644 --- a/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.py +++ b/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.py @@ -55,7 +55,8 @@ class GenericTransfer(BaseOperator): :param insert_args: extra params for `insert_rows` method. :param page_size: number of records to be read in paginated mode (optional). :param paginated_sql_statement_clause: SQL statement clause to be used for pagination (optional). - :param deferrable: Run operator in the deferrable mode + :param deferrable: Run operator in deferrable mode (only effective in paginated mode, i.e. + when `page_size` is set and `sql` is a string). """ template_fields: Sequence[str] = ( From c7625f039cb0785a7568e3807cfbf0ca1e06708e Mon Sep 17 00:00:00 2001 From: David Blain Date: Sun, 3 May 2026 11:16:11 +0200 Subject: [PATCH 09/11] refactor: Also added offset logging statement in non-deferred branch --- .../airflow/providers/common/sql/operators/generic_transfer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.py b/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.py index 9302a20b1d158..5a4975a1f6ca8 100644 --- a/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.py +++ b/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.py @@ -180,6 +180,7 @@ def execute(self, context: Context): if rows := self.source_hook.get_records(paginated_sql): self._insert_rows(rows=rows, context=context) offset += self.page_size + self.log.info("Offset increased to %d", offset) else: self.log.info( "No more rows to fetch into %s; ending transfer.", From 853b21f2ced66b68c5371a3ff3496ee9c7dbc822 Mon Sep 17 00:00:00 2001 From: David Blain Date: Sun, 3 May 2026 11:18:46 +0200 Subject: [PATCH 10/11] refactor: Fast exit while loop if length of rows is smaller than page size --- .../airflow/providers/common/sql/operators/generic_transfer.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.py b/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.py index 5a4975a1f6ca8..3fd7b330c44b7 100644 --- a/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.py +++ b/providers/common/sql/src/airflow/providers/common/sql/operators/generic_transfer.py @@ -179,6 +179,8 @@ def execute(self, context: Context): self.log.info("Executing: \n %s", paginated_sql) if rows := self.source_hook.get_records(paginated_sql): self._insert_rows(rows=rows, context=context) + if len(rows) < self.page_size: + break offset += self.page_size self.log.info("Offset increased to %d", offset) else: From 5188167a879ad76cd318a9b946609b3b439dfd54 Mon Sep 17 00:00:00 2001 From: David Blain Date: Mon, 4 May 2026 10:20:42 +0200 Subject: [PATCH 11/11] refactor: Fixed test_non_deferred_paginated_read --- .../unit/common/sql/operators/test_generic_transfer.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/providers/common/sql/tests/unit/common/sql/operators/test_generic_transfer.py b/providers/common/sql/tests/unit/common/sql/operators/test_generic_transfer.py index ee27e4f2a01d2..a268d232701e4 100644 --- a/providers/common/sql/tests/unit/common/sql/operators/test_generic_transfer.py +++ b/providers/common/sql/tests/unit/common/sql/operators/test_generic_transfer.py @@ -385,6 +385,7 @@ def test_non_paginated_read_for_multiple_sql_statements_with_rows_processor(self def test_non_deferred_paginated_read(self): """ Test that GenericTransfer paginates eagerly (non-deferred) when page_size is set and deferrable is False. + It stops early when fewer rows than page_size are returned (no need for an extra empty-page fetch). """ with mock.patch(f"{BASEHOOK_PATCH_PATH}.get_connection", side_effect=self.get_connection): with mock.patch(f"{BASEHOOK_PATCH_PATH}.get_hook", side_effect=self.get_hook): @@ -394,7 +395,7 @@ def test_non_deferred_paginated_read(self): destination_conn_id="my_destination_conn_id", sql="SELECT * FROM HR.EMPLOYEES", destination_table="NEW_HR.EMPLOYEES", - page_size=1000, # Enable pagination + page_size=2, insert_args=INSERT_ARGS, execution_timeout=timedelta(hours=1), ) @@ -404,15 +405,15 @@ def test_non_deferred_paginated_read(self): assert self.mocked_source_hook.get_records.call_count == 3 assert ( self.mocked_source_hook.get_records.call_args_list[0].args[0] - == "SELECT * FROM HR.EMPLOYEES LIMIT 1000 OFFSET 0" + == "SELECT * FROM HR.EMPLOYEES LIMIT 2 OFFSET 0" ) assert ( self.mocked_source_hook.get_records.call_args_list[1].args[0] - == "SELECT * FROM HR.EMPLOYEES LIMIT 1000 OFFSET 1000" + == "SELECT * FROM HR.EMPLOYEES LIMIT 2 OFFSET 2" ) assert ( self.mocked_source_hook.get_records.call_args_list[2].args[0] - == "SELECT * FROM HR.EMPLOYEES LIMIT 1000 OFFSET 2000" + == "SELECT * FROM HR.EMPLOYEES LIMIT 2 OFFSET 4" ) assert self.mocked_destination_hook.insert_rows.call_count == 2 assert self.mocked_destination_hook.insert_rows.call_args_list[0].kwargs == {