From b0bbc3d9c344f87a16ccc81fc079e17b4012985e Mon Sep 17 00:00:00 2001 From: Hemkumar Chheda Date: Tue, 14 Apr 2026 23:40:58 +0530 Subject: [PATCH 1/6] fix(db_cleanup): add --error-on-cleanup-failure flag to airflow db clean airflow db clean suppresses all per-table cleanup errors via _suppress_with_logging() and exits 0 even when tables could not be cleaned. This makes it impossible to detect silent failures in automated DAG-based maintenance workflows, which can lead to unchecked table growth and eventual migration failures on upgrade. This commit adds an opt-in --error-on-cleanup-failure flag that causes run_cleanup() to raise AirflowException (and the CLI to exit 1) if any table cleanup encountered an error. Default behaviour is unchanged. Additionally, a warning listing all tables that were not cleaned is now always emitted when failures occur, even without the flag, improving observability without requiring any opt-in. Changes: - airflow/utils/db_cleanup.py: update _suppress_with_logging to track whether an exception was suppressed via a SimpleNamespace context object; collect failed table names in run_cleanup(); emit a warning summary and optionally raise AirflowException. - airflow/cli/cli_config.py: add ARG_DB_ERROR_ON_CLEANUP_FAILURE and wire it into the db clean ActionCommand args list. - airflow/cli/commands/db_command.py: forward error_on_cleanup_failure from CLI args to run_cleanup(). - tests/utils/test_db_cleanup.py: add unit tests covering the new flag and the warning summary behaviour. Made-with: Cursor --- airflow-core/src/airflow/cli/cli_config.py | 11 ++++ .../src/airflow/cli/commands/db_command.py | 1 + airflow-core/src/airflow/utils/db_cleanup.py | 28 ++++++++-- .../tests/unit/utils/test_db_cleanup.py | 52 +++++++++++++++++++ 4 files changed, 89 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/cli/cli_config.py b/airflow-core/src/airflow/cli/cli_config.py index 6b94dac4c258d..3b072c39ab4b5 100644 --- a/airflow-core/src/airflow/cli/cli_config.py +++ b/airflow-core/src/airflow/cli/cli_config.py @@ -541,6 +541,16 @@ def string_lower_type(val): "Lower values reduce long-running locks but increase the number of batches." ), ) +ARG_DB_ERROR_ON_CLEANUP_FAILURE = Arg( + ("--error-on-cleanup-failure",), + help=( + "If set, the command will exit with a non-zero exit code if any table cleanup encountered " + "an error. By default errors are suppressed and the command exits 0 even if some tables " + "were not cleaned. Recommended when running airflow db clean from a DAG or automated " + "workflow so that failures are surfaced rather than silently skipped." + ), + action="store_true", +) ARG_DAG_IDS = Arg( ("--dag-ids",), default=None, @@ -1603,6 +1613,7 @@ class GroupCommand(NamedTuple): ARG_DB_BATCH_SIZE, ARG_DAG_IDS, ARG_EXCLUDE_DAG_IDS, + ARG_DB_ERROR_ON_CLEANUP_FAILURE, ), ), ActionCommand( diff --git a/airflow-core/src/airflow/cli/commands/db_command.py b/airflow-core/src/airflow/cli/commands/db_command.py index b47d949e84f13..546a5db20eed8 100644 --- a/airflow-core/src/airflow/cli/commands/db_command.py +++ b/airflow-core/src/airflow/cli/commands/db_command.py @@ -316,6 +316,7 @@ def cleanup_tables(args): batch_size=args.batch_size, dag_ids=args.dag_ids, exclude_dag_ids=args.exclude_dag_ids, + error_on_cleanup_failure=args.error_on_cleanup_failure, ) diff --git a/airflow-core/src/airflow/utils/db_cleanup.py b/airflow-core/src/airflow/utils/db_cleanup.py index e6b5283669b86..0b4dad848733d 100644 --- a/airflow-core/src/airflow/utils/db_cleanup.py +++ b/airflow-core/src/airflow/utils/db_cleanup.py @@ -28,6 +28,7 @@ import os from contextlib import contextmanager from dataclasses import dataclass +from types import SimpleNamespace from typing import TYPE_CHECKING, Any from sqlalchemy import and_, column, func, inspect, select, table, text @@ -476,10 +477,12 @@ def _print_config(*, configs: dict[str, _TableConfig]) -> None: @contextmanager def _suppress_with_logging(table: str, session: Session): - """Suppresses errors but logs them.""" + """Suppresses errors but logs them, and tracks whether an exception was suppressed.""" + ctx = SimpleNamespace(failed=False) try: - yield + yield ctx except (OperationalError, ProgrammingError): + ctx.failed = True logger.warning("Encountered error when attempting to clean table '%s'. ", table) logger.debug("Traceback for table '%s'", table, exc_info=True) if session.is_active: @@ -554,6 +557,7 @@ def run_cleanup( skip_archive: bool = False, session: Session = NEW_SESSION, batch_size: int | None = None, + error_on_cleanup_failure: bool = False, ) -> None: """ Purges old records in airflow metadata database. @@ -577,6 +581,9 @@ def run_cleanup( :param skip_archive: Set to True if you don't want the purged rows preserved in an archive table. :param session: Session representing connection to the metadata database. :param batch_size: Maximum number of rows to delete or archive in a single transaction. + :param error_on_cleanup_failure: If True, raise an AirflowException after processing all tables + if any per-table cleanup encountered an error. By default errors are suppressed and the + command exits 0 even if some tables were not cleaned. """ clean_before_timestamp = timezone.coerce_datetime(clean_before_timestamp) @@ -597,10 +604,11 @@ def run_cleanup( exclude_dag_ids=exclude_dag_ids, ) existing_tables = reflect_tables(tables=None, session=session).tables + failed_tables: list[str] = [] for table_name, table_config in effective_config_dict.items(): if table_name in existing_tables: - with _suppress_with_logging(table_name, session): + with _suppress_with_logging(table_name, session) as ctx: _cleanup_table( clean_before_timestamp=clean_before_timestamp, dag_ids=dag_ids, @@ -613,9 +621,23 @@ def run_cleanup( batch_size=batch_size, ) session.commit() + if ctx.failed: + failed_tables.append(table_name) else: logger.warning("Table %s not found. Skipping.", table_name) + if failed_tables: + logger.warning( + "The following tables were not cleaned due to errors: %s. " + "Check the logs above for details.", + failed_tables, + ) + if error_on_cleanup_failure: + raise AirflowException( + f"airflow db clean encountered errors on the following tables and did not clean them: " + f"{failed_tables}. Check the logs above for details." + ) + @provide_session def export_archived_records( diff --git a/airflow-core/tests/unit/utils/test_db_cleanup.py b/airflow-core/tests/unit/utils/test_db_cleanup.py index 2ddca75ec57c4..ec30cbd317c50 100644 --- a/airflow-core/tests/unit/utils/test_db_cleanup.py +++ b/airflow-core/tests/unit/utils/test_db_cleanup.py @@ -741,6 +741,58 @@ def test_drop_archived_tables(self, mock_input, confirm_mock, inspect_mock, capl else: confirm_mock.assert_not_called() + @patch("airflow.utils.db_cleanup._cleanup_table", side_effect=OperationalError("", {}, None)) + def test_error_on_cleanup_failure_raises_when_flag_set(self, cleanup_table_mock, caplog): + """When error_on_cleanup_failure=True and a table fails, AirflowException should be raised.""" + with pytest.raises(AirflowException, match="airflow db clean encountered errors"): + run_cleanup( + clean_before_timestamp=None, + table_names=["log"], + dry_run=False, + verbose=False, + confirm=False, + error_on_cleanup_failure=True, + ) + + @patch("airflow.utils.db_cleanup._cleanup_table", side_effect=OperationalError("", {}, None)) + def test_error_on_cleanup_failure_no_raise_by_default(self, cleanup_table_mock, caplog): + """When error_on_cleanup_failure=False (default) and a table fails, no exception is raised.""" + run_cleanup( + clean_before_timestamp=None, + table_names=["log"], + dry_run=False, + verbose=False, + confirm=False, + error_on_cleanup_failure=False, + ) + assert "The following tables were not cleaned due to errors" in caplog.text + + @patch("airflow.utils.db_cleanup._cleanup_table", side_effect=OperationalError("", {}, None)) + def test_error_on_cleanup_failure_lists_failed_tables_in_warning(self, cleanup_table_mock, caplog): + """A warning naming the failed tables should always be emitted, regardless of the flag.""" + run_cleanup( + clean_before_timestamp=None, + table_names=["log"], + dry_run=False, + verbose=False, + confirm=False, + ) + assert "log" in caplog.text + assert "The following tables were not cleaned due to errors" in caplog.text + + @patch("airflow.utils.db_cleanup._cleanup_table") + def test_error_on_cleanup_failure_propagated_from_run_cleanup(self, cleanup_table_mock): + """Ensure error_on_cleanup_failure is accepted by run_cleanup without errors when no failures occur.""" + run_cleanup( + clean_before_timestamp=None, + table_names=["log"], + dry_run=False, + verbose=False, + confirm=False, + error_on_cleanup_failure=True, + ) + cleanup_table_mock.assert_called_once() + def create_tis(base_date, num_tis, run_type=DagRunType.SCHEDULED): from tests_common.test_utils.taskinstance import create_task_instance From ecd0697d61dd4b830aaa87b0843255bbc7172196 Mon Sep 17 00:00:00 2001 From: Hemkumar Chheda Date: Tue, 14 Apr 2026 23:56:28 +0530 Subject: [PATCH 2/6] fix(db_cleanup): improve type annotations and add docs for --error-on-cleanup-failure - Use collections.abc.Generator for the _suppress_with_logging return type annotation (ruff UP035 compliant) instead of typing.Generator. - Expand the _suppress_with_logging docstring to describe the yielded SimpleNamespace context object and the failure-tracking behaviour. - Add a new "Detecting cleanup failures" section to docs/howto/usage-cli.rst documenting the --error-on-cleanup-failure flag and the --skip-archive recommendation for large tables. Made-with: Cursor --- airflow-core/docs/howto/usage-cli.rst | 25 ++++++++++++++++++++ airflow-core/src/airflow/utils/db_cleanup.py | 14 +++++++++-- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/airflow-core/docs/howto/usage-cli.rst b/airflow-core/docs/howto/usage-cli.rst index b80faf89b3ec0..b8fa890d25749 100644 --- a/airflow-core/docs/howto/usage-cli.rst +++ b/airflow-core/docs/howto/usage-cli.rst @@ -221,6 +221,31 @@ By default, ``db clean`` will archive purged rows in tables of the form ``_airfl When you encounter an error without using ``--skip-archive``, ``_airflow_deleted____`` would still exist in the DB. You can use ``db drop-archived`` command to manually drop these tables. +Detecting cleanup failures +^^^^^^^^^^^^^^^^^^^^^^^^^^ + +By default, ``db clean`` suppresses per-table errors (such as a database ``statement_timeout`` +being exceeded on a very large table) and exits with code 0 even if one or more tables were not +cleaned. A WARNING is always emitted in the logs listing which tables were skipped due to errors. + +To make the command exit with a non-zero code whenever any table cleanup fails — useful when +``airflow db clean`` is invoked from a DAG task and you want the task to turn red on failure — +pass ``--error-on-cleanup-failure``: + +.. code-block:: bash + + airflow db clean \ + --clean-before-timestamp "$(date -u -d '21 days ago' '+%Y-%m-%dT%H:%M:%S+00:00')" \ + --yes \ + --error-on-cleanup-failure + +.. tip:: + + On large deployments where the archival ``CREATE TABLE … AS SELECT`` step itself can time + out, combining ``--error-on-cleanup-failure`` with ``--skip-archive`` is recommended. + ``--skip-archive`` deletes rows directly without the intermediate archive table, making the + operation both faster and less likely to hit ``statement_timeout``. + Export the purged records from the archive tables ------------------------------------------------- The ``db export-archived`` command exports the contents of the archived tables, created by the ``db clean`` command, diff --git a/airflow-core/src/airflow/utils/db_cleanup.py b/airflow-core/src/airflow/utils/db_cleanup.py index 0b4dad848733d..beac98467ff77 100644 --- a/airflow-core/src/airflow/utils/db_cleanup.py +++ b/airflow-core/src/airflow/utils/db_cleanup.py @@ -26,6 +26,7 @@ import csv import logging import os +from collections.abc import Generator from contextlib import contextmanager from dataclasses import dataclass from types import SimpleNamespace @@ -476,8 +477,17 @@ def _print_config(*, configs: dict[str, _TableConfig]) -> None: @contextmanager -def _suppress_with_logging(table: str, session: Session): - """Suppresses errors but logs them, and tracks whether an exception was suppressed.""" +def _suppress_with_logging(table: str, session: Session) -> Generator[SimpleNamespace, None, None]: + """ + Suppress per-table cleanup errors, log them, and expose failure state to the caller. + + Yields a :class:`~types.SimpleNamespace` with a single attribute ``failed`` (bool). + When an :class:`~sqlalchemy.exc.OperationalError` or + :class:`~sqlalchemy.exc.ProgrammingError` is raised inside the ``with`` block the + exception is swallowed, ``ctx.failed`` is set to ``True``, a WARNING is emitted for + the table, and the session is rolled back. The caller can inspect ``ctx.failed`` + after the block to decide whether to surface the error upstream. + """ ctx = SimpleNamespace(failed=False) try: yield ctx From 712de819ee704ff534855ed150eb2281fed044ef Mon Sep 17 00:00:00 2001 From: Hemkumar Chheda Date: Wed, 15 Apr 2026 09:46:46 +0530 Subject: [PATCH 3/6] fix(db_cleanup): address CI failures on PR #65239 - Fix mypy arg-type errors: OperationalError third argument must be a BaseException, not None. Replace OperationalError("", {}, None) with OperationalError("", {}, Exception("mock db error")) in three new tests in test_db_cleanup.py. - Fix ruff ISC violation: collapse implicit string concatenation in the run_cleanup() warning call into a single string literal. - Update existing CLI tests in test_db_command.py to include the new error_on_cleanup_failure=False kwarg in all ten assert_called_once_with assertions. - Add test_error_on_cleanup_failure to test_db_command.py to verify the --error-on-cleanup-failure flag is correctly forwarded to run_cleanup. Made-with: Cursor --- airflow-core/src/airflow/utils/db_cleanup.py | 3 +- .../unit/cli/commands/test_db_command.py | 40 +++++++++++++++++++ .../tests/unit/utils/test_db_cleanup.py | 6 +-- 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/airflow-core/src/airflow/utils/db_cleanup.py b/airflow-core/src/airflow/utils/db_cleanup.py index beac98467ff77..e4fe91ece2cd6 100644 --- a/airflow-core/src/airflow/utils/db_cleanup.py +++ b/airflow-core/src/airflow/utils/db_cleanup.py @@ -638,8 +638,7 @@ def run_cleanup( if failed_tables: logger.warning( - "The following tables were not cleaned due to errors: %s. " - "Check the logs above for details.", + "The following tables were not cleaned due to errors: %s. Check the logs above for details.", failed_tables, ) if error_on_cleanup_failure: diff --git a/airflow-core/tests/unit/cli/commands/test_db_command.py b/airflow-core/tests/unit/cli/commands/test_db_command.py index cd00e0838df8b..52c2b4cc37659 100644 --- a/airflow-core/tests/unit/cli/commands/test_db_command.py +++ b/airflow-core/tests/unit/cli/commands/test_db_command.py @@ -733,6 +733,7 @@ def test_date_timezone_omitted(self, run_cleanup_mock, timezone): confirm=False, skip_archive=False, batch_size=None, + error_on_cleanup_failure=False, ) @pytest.mark.parametrize("timezone", ["UTC", "Europe/Berlin", "America/Los_Angeles"]) @@ -756,6 +757,7 @@ def test_date_timezone_supplied(self, run_cleanup_mock, timezone): confirm=False, skip_archive=False, batch_size=None, + error_on_cleanup_failure=False, ) @pytest.mark.parametrize(("confirm_arg", "expected"), [(["-y"], False), ([], True)]) @@ -785,6 +787,7 @@ def test_confirm(self, run_cleanup_mock, confirm_arg, expected): confirm=expected, skip_archive=False, batch_size=None, + error_on_cleanup_failure=False, ) @pytest.mark.parametrize(("extra_arg", "expected"), [(["--skip-archive"], True), ([], False)]) @@ -814,6 +817,7 @@ def test_skip_archive(self, run_cleanup_mock, extra_arg, expected): confirm=True, skip_archive=expected, batch_size=None, + error_on_cleanup_failure=False, ) @pytest.mark.parametrize(("dry_run_arg", "expected"), [(["--dry-run"], True), ([], False)]) @@ -843,6 +847,7 @@ def test_dry_run(self, run_cleanup_mock, dry_run_arg, expected): confirm=True, skip_archive=False, batch_size=None, + error_on_cleanup_failure=False, ) @pytest.mark.parametrize( @@ -874,6 +879,7 @@ def test_tables(self, run_cleanup_mock, extra_args, expected): confirm=True, skip_archive=False, batch_size=None, + error_on_cleanup_failure=False, ) @pytest.mark.parametrize(("extra_args", "expected"), [(["--verbose"], True), ([], False)]) @@ -903,6 +909,7 @@ def test_verbose(self, run_cleanup_mock, extra_args, expected): confirm=True, skip_archive=False, batch_size=None, + error_on_cleanup_failure=False, ) @pytest.mark.parametrize(("extra_args", "expected"), [(["--batch-size", "1234"], 1234), ([], None)]) @@ -932,6 +939,7 @@ def test_batch_size(self, run_cleanup_mock, extra_args, expected): confirm=True, skip_archive=False, batch_size=expected, + error_on_cleanup_failure=False, ) @pytest.mark.parametrize( @@ -963,6 +971,7 @@ def test_dag_ids(self, run_cleanup_mock, extra_args, expected): confirm=True, skip_archive=False, batch_size=None, + error_on_cleanup_failure=False, ) @pytest.mark.parametrize( @@ -994,6 +1003,37 @@ def test_exclude_dag_ids(self, run_cleanup_mock, extra_args, expected): confirm=True, skip_archive=False, batch_size=None, + error_on_cleanup_failure=False, + ) + + @pytest.mark.parametrize( + ("extra_args", "expected"), [(["--error-on-cleanup-failure"], True), ([], False)] + ) + @patch("airflow.cli.commands.db_command.run_cleanup") + def test_error_on_cleanup_failure(self, run_cleanup_mock, extra_args, expected): + """When --error-on-cleanup-failure is passed, error_on_cleanup_failure should be True.""" + args = self.parser.parse_args( + [ + "db", + "clean", + "--clean-before-timestamp", + "2021-01-01", + *extra_args, + ] + ) + db_command.cleanup_tables(args) + + run_cleanup_mock.assert_called_once_with( + table_names=None, + dry_run=False, + dag_ids=None, + exclude_dag_ids=None, + clean_before_timestamp=pendulum.parse("2021-01-01 00:00:00Z"), + verbose=False, + confirm=True, + skip_archive=False, + batch_size=None, + error_on_cleanup_failure=expected, ) @patch("airflow.cli.commands.db_command.export_archived_records") diff --git a/airflow-core/tests/unit/utils/test_db_cleanup.py b/airflow-core/tests/unit/utils/test_db_cleanup.py index ec30cbd317c50..1c3289e9f7bdc 100644 --- a/airflow-core/tests/unit/utils/test_db_cleanup.py +++ b/airflow-core/tests/unit/utils/test_db_cleanup.py @@ -741,7 +741,7 @@ def test_drop_archived_tables(self, mock_input, confirm_mock, inspect_mock, capl else: confirm_mock.assert_not_called() - @patch("airflow.utils.db_cleanup._cleanup_table", side_effect=OperationalError("", {}, None)) + @patch("airflow.utils.db_cleanup._cleanup_table", side_effect=OperationalError("", {}, Exception("mock db error"))) def test_error_on_cleanup_failure_raises_when_flag_set(self, cleanup_table_mock, caplog): """When error_on_cleanup_failure=True and a table fails, AirflowException should be raised.""" with pytest.raises(AirflowException, match="airflow db clean encountered errors"): @@ -754,7 +754,7 @@ def test_error_on_cleanup_failure_raises_when_flag_set(self, cleanup_table_mock, error_on_cleanup_failure=True, ) - @patch("airflow.utils.db_cleanup._cleanup_table", side_effect=OperationalError("", {}, None)) + @patch("airflow.utils.db_cleanup._cleanup_table", side_effect=OperationalError("", {}, Exception("mock db error"))) def test_error_on_cleanup_failure_no_raise_by_default(self, cleanup_table_mock, caplog): """When error_on_cleanup_failure=False (default) and a table fails, no exception is raised.""" run_cleanup( @@ -767,7 +767,7 @@ def test_error_on_cleanup_failure_no_raise_by_default(self, cleanup_table_mock, ) assert "The following tables were not cleaned due to errors" in caplog.text - @patch("airflow.utils.db_cleanup._cleanup_table", side_effect=OperationalError("", {}, None)) + @patch("airflow.utils.db_cleanup._cleanup_table", side_effect=OperationalError("", {}, Exception("mock db error"))) def test_error_on_cleanup_failure_lists_failed_tables_in_warning(self, cleanup_table_mock, caplog): """A warning naming the failed tables should always be emitted, regardless of the flag.""" run_cleanup( From 4634136ec166ebe1e7cbf386a57290c22b0b6a56 Mon Sep 17 00:00:00 2001 From: Hemkumar Chheda Date: Wed, 15 Apr 2026 10:39:19 +0530 Subject: [PATCH 4/6] fix(db_cleanup): add --error-on-cleanup-failure flag documentation and tests - Introduce a new news fragment detailing the addition of the ``--error-on-cleanup-failure`` flag to the ``airflow db clean`` command, allowing for better error handling during table cleanup. - Update unit tests in `test_db_cleanup.py` to ensure proper functionality of the new flag, including checks for raised exceptions and warning messages for failed tables. - Adjust the known exceptions list to reflect changes in `db_cleanup.py`. --- airflow-core/newsfragments/65239.bugfix.rst | 1 + airflow-core/tests/unit/utils/test_db_cleanup.py | 15 ++++++++++++--- scripts/ci/prek/known_airflow_exceptions.txt | 2 +- 3 files changed, 14 insertions(+), 4 deletions(-) create mode 100644 airflow-core/newsfragments/65239.bugfix.rst diff --git a/airflow-core/newsfragments/65239.bugfix.rst b/airflow-core/newsfragments/65239.bugfix.rst new file mode 100644 index 0000000000000..5a9a086e0b587 --- /dev/null +++ b/airflow-core/newsfragments/65239.bugfix.rst @@ -0,0 +1 @@ +Add ``--error-on-cleanup-failure`` flag to ``airflow db clean`` so that per-table cleanup errors, which were previously suppressed and caused the command to exit 0 silently, can now surface as a non-zero exit code. A warning summary listing failed tables is always emitted regardless of the flag. diff --git a/airflow-core/tests/unit/utils/test_db_cleanup.py b/airflow-core/tests/unit/utils/test_db_cleanup.py index 1c3289e9f7bdc..6c16c1b2753ac 100644 --- a/airflow-core/tests/unit/utils/test_db_cleanup.py +++ b/airflow-core/tests/unit/utils/test_db_cleanup.py @@ -741,7 +741,10 @@ def test_drop_archived_tables(self, mock_input, confirm_mock, inspect_mock, capl else: confirm_mock.assert_not_called() - @patch("airflow.utils.db_cleanup._cleanup_table", side_effect=OperationalError("", {}, Exception("mock db error"))) + @patch( + "airflow.utils.db_cleanup._cleanup_table", + side_effect=OperationalError("", {}, Exception("mock db error")), + ) def test_error_on_cleanup_failure_raises_when_flag_set(self, cleanup_table_mock, caplog): """When error_on_cleanup_failure=True and a table fails, AirflowException should be raised.""" with pytest.raises(AirflowException, match="airflow db clean encountered errors"): @@ -754,7 +757,10 @@ def test_error_on_cleanup_failure_raises_when_flag_set(self, cleanup_table_mock, error_on_cleanup_failure=True, ) - @patch("airflow.utils.db_cleanup._cleanup_table", side_effect=OperationalError("", {}, Exception("mock db error"))) + @patch( + "airflow.utils.db_cleanup._cleanup_table", + side_effect=OperationalError("", {}, Exception("mock db error")), + ) def test_error_on_cleanup_failure_no_raise_by_default(self, cleanup_table_mock, caplog): """When error_on_cleanup_failure=False (default) and a table fails, no exception is raised.""" run_cleanup( @@ -767,7 +773,10 @@ def test_error_on_cleanup_failure_no_raise_by_default(self, cleanup_table_mock, ) assert "The following tables were not cleaned due to errors" in caplog.text - @patch("airflow.utils.db_cleanup._cleanup_table", side_effect=OperationalError("", {}, Exception("mock db error"))) + @patch( + "airflow.utils.db_cleanup._cleanup_table", + side_effect=OperationalError("", {}, Exception("mock db error")), + ) def test_error_on_cleanup_failure_lists_failed_tables_in_warning(self, cleanup_table_mock, caplog): """A warning naming the failed tables should always be emitted, regardless of the flag.""" run_cleanup( diff --git a/scripts/ci/prek/known_airflow_exceptions.txt b/scripts/ci/prek/known_airflow_exceptions.txt index 1a870c169d602..d7dfe1225e7c1 100644 --- a/scripts/ci/prek/known_airflow_exceptions.txt +++ b/scripts/ci/prek/known_airflow_exceptions.txt @@ -18,7 +18,7 @@ airflow-core/src/airflow/serialization/serialized_objects.py::2 airflow-core/src/airflow/ti_deps/deps/valid_state_dep.py::1 airflow-core/src/airflow/utils/cli.py::4 airflow-core/src/airflow/utils/db.py::1 -airflow-core/src/airflow/utils/db_cleanup.py::1 +airflow-core/src/airflow/utils/db_cleanup.py::2 airflow-core/src/airflow/utils/db_manager.py::3 airflow-core/src/airflow/utils/dot_renderer.py::3 airflow-core/src/airflow/utils/helpers.py::4 From 4b811f6c38e3ab1bfaa021eec8d2cc71b01f8c09 Mon Sep 17 00:00:00 2001 From: Hemkumar Chheda Date: Sat, 18 Apr 2026 10:59:32 +0530 Subject: [PATCH 5/6] fix(db_cleanup): update error handling for table cleanup failures - Change the behavior of the `--error-on-cleanup-failure` flag to raise a RuntimeError instead of an AirflowException when table cleanup encounters errors. - Update the documentation and help text for the flag to clarify its functionality. - Ensure that warning messages for failed tables are always emitted, regardless of the flag's state. - Modify unit tests in `test_db_cleanup.py` to reflect the new error handling and verify the correct logging behavior. This update improves error visibility during automated workflows by ensuring that cleanup failures are properly reported. --- airflow-core/newsfragments/65239.bugfix.rst | 1 - airflow-core/src/airflow/cli/cli_config.py | 7 +- airflow-core/src/airflow/utils/db_cleanup.py | 12 +-- .../tests/unit/utils/test_db_cleanup.py | 73 +++++++++++-------- scripts/ci/prek/known_airflow_exceptions.txt | 2 +- 5 files changed, 51 insertions(+), 44 deletions(-) delete mode 100644 airflow-core/newsfragments/65239.bugfix.rst diff --git a/airflow-core/newsfragments/65239.bugfix.rst b/airflow-core/newsfragments/65239.bugfix.rst deleted file mode 100644 index 5a9a086e0b587..0000000000000 --- a/airflow-core/newsfragments/65239.bugfix.rst +++ /dev/null @@ -1 +0,0 @@ -Add ``--error-on-cleanup-failure`` flag to ``airflow db clean`` so that per-table cleanup errors, which were previously suppressed and caused the command to exit 0 silently, can now surface as a non-zero exit code. A warning summary listing failed tables is always emitted regardless of the flag. diff --git a/airflow-core/src/airflow/cli/cli_config.py b/airflow-core/src/airflow/cli/cli_config.py index 3b072c39ab4b5..964b8469e3249 100644 --- a/airflow-core/src/airflow/cli/cli_config.py +++ b/airflow-core/src/airflow/cli/cli_config.py @@ -543,12 +543,7 @@ def string_lower_type(val): ) ARG_DB_ERROR_ON_CLEANUP_FAILURE = Arg( ("--error-on-cleanup-failure",), - help=( - "If set, the command will exit with a non-zero exit code if any table cleanup encountered " - "an error. By default errors are suppressed and the command exits 0 even if some tables " - "were not cleaned. Recommended when running airflow db clean from a DAG or automated " - "workflow so that failures are surfaced rather than silently skipped." - ), + help="Command will exit with a non-zero exit code if any table cleanup failed. By default errors are suppressed and the command exits 0.", action="store_true", ) ARG_DAG_IDS = Arg( diff --git a/airflow-core/src/airflow/utils/db_cleanup.py b/airflow-core/src/airflow/utils/db_cleanup.py index e4fe91ece2cd6..cd132f60b373e 100644 --- a/airflow-core/src/airflow/utils/db_cleanup.py +++ b/airflow-core/src/airflow/utils/db_cleanup.py @@ -591,7 +591,7 @@ def run_cleanup( :param skip_archive: Set to True if you don't want the purged rows preserved in an archive table. :param session: Session representing connection to the metadata database. :param batch_size: Maximum number of rows to delete or archive in a single transaction. - :param error_on_cleanup_failure: If True, raise an AirflowException after processing all tables + :param error_on_cleanup_failure: If True, raise a RuntimeError after processing all tables if any per-table cleanup encountered an error. By default errors are suppressed and the command exits 0 even if some tables were not cleaned. """ @@ -637,15 +637,15 @@ def run_cleanup( logger.warning("Table %s not found. Skipping.", table_name) if failed_tables: - logger.warning( - "The following tables were not cleaned due to errors: %s. Check the logs above for details.", - failed_tables, - ) if error_on_cleanup_failure: - raise AirflowException( + raise RuntimeError( f"airflow db clean encountered errors on the following tables and did not clean them: " f"{failed_tables}. Check the logs above for details." ) + logger.warning( + "The following tables were not cleaned due to errors: %s. Check the logs above for details.", + failed_tables, + ) @provide_session diff --git a/airflow-core/tests/unit/utils/test_db_cleanup.py b/airflow-core/tests/unit/utils/test_db_cleanup.py index 6c16c1b2753ac..e778fe4cc88ac 100644 --- a/airflow-core/tests/unit/utils/test_db_cleanup.py +++ b/airflow-core/tests/unit/utils/test_db_cleanup.py @@ -549,19 +549,25 @@ def test_no_models_missing(self): assert set(all_models) - exclusion_list.union(config_dict) == set() assert exclusion_list.isdisjoint(config_dict) - def test_no_failure_warnings(self, caplog): + def test_no_failure_warnings(self): """ Ensure every table we have configured (and that is present in the db) can be cleaned successfully. For example, this checks that the recency column is actually a column. """ - run_cleanup(clean_before_timestamp=timezone.utcnow(), dry_run=True) - assert "Encountered error when attempting to clean table" not in caplog.text + with patch("airflow.utils.db_cleanup.logger") as mock_logger: + run_cleanup(clean_before_timestamp=timezone.utcnow(), dry_run=True) + for call in mock_logger.warning.call_args_list: + assert "Encountered error when attempting to clean table" not in str(call) # Lets check we have the right error message just in case - caplog.clear() - with patch("airflow.utils.db_cleanup._cleanup_table", side_effect=OperationalError("oops", {}, None)): + with ( + patch("airflow.utils.db_cleanup.logger") as mock_logger, + patch("airflow.utils.db_cleanup._cleanup_table", side_effect=OperationalError("oops", {}, None)), + ): run_cleanup(clean_before_timestamp=timezone.utcnow(), table_names=["task_instance"], dry_run=True) - assert "Encountered error when attempting to clean table" in caplog.text + mock_logger.warning.assert_any_call( + "Encountered error when attempting to clean table '%s'. ", "task_instance" + ) @pytest.mark.parametrize( "drop_archive", @@ -745,9 +751,9 @@ def test_drop_archived_tables(self, mock_input, confirm_mock, inspect_mock, capl "airflow.utils.db_cleanup._cleanup_table", side_effect=OperationalError("", {}, Exception("mock db error")), ) - def test_error_on_cleanup_failure_raises_when_flag_set(self, cleanup_table_mock, caplog): - """When error_on_cleanup_failure=True and a table fails, AirflowException should be raised.""" - with pytest.raises(AirflowException, match="airflow db clean encountered errors"): + def test_error_on_cleanup_failure_raises_when_flag_set(self, cleanup_table_mock): + """When error_on_cleanup_failure=True and a table fails, RuntimeError should be raised.""" + with pytest.raises(RuntimeError, match="airflow db clean encountered errors"): run_cleanup( clean_before_timestamp=None, table_names=["log"], @@ -761,33 +767,40 @@ def test_error_on_cleanup_failure_raises_when_flag_set(self, cleanup_table_mock, "airflow.utils.db_cleanup._cleanup_table", side_effect=OperationalError("", {}, Exception("mock db error")), ) - def test_error_on_cleanup_failure_no_raise_by_default(self, cleanup_table_mock, caplog): + def test_error_on_cleanup_failure_no_raise_by_default(self, cleanup_table_mock): """When error_on_cleanup_failure=False (default) and a table fails, no exception is raised.""" - run_cleanup( - clean_before_timestamp=None, - table_names=["log"], - dry_run=False, - verbose=False, - confirm=False, - error_on_cleanup_failure=False, - ) - assert "The following tables were not cleaned due to errors" in caplog.text + with patch("airflow.utils.db_cleanup.logger") as mock_logger: + run_cleanup( + clean_before_timestamp=None, + table_names=["log"], + dry_run=False, + verbose=False, + confirm=False, + error_on_cleanup_failure=False, + ) + mock_logger.warning.assert_any_call( + "The following tables were not cleaned due to errors: %s. Check the logs above for details.", + ["log"], + ) @patch( "airflow.utils.db_cleanup._cleanup_table", side_effect=OperationalError("", {}, Exception("mock db error")), ) - def test_error_on_cleanup_failure_lists_failed_tables_in_warning(self, cleanup_table_mock, caplog): - """A warning naming the failed tables should always be emitted, regardless of the flag.""" - run_cleanup( - clean_before_timestamp=None, - table_names=["log"], - dry_run=False, - verbose=False, - confirm=False, - ) - assert "log" in caplog.text - assert "The following tables were not cleaned due to errors" in caplog.text + def test_error_on_cleanup_failure_lists_failed_tables_in_warning(self, cleanup_table_mock): + """A warning naming the failed tables is emitted when error_on_cleanup_failure is not set.""" + with patch("airflow.utils.db_cleanup.logger") as mock_logger: + run_cleanup( + clean_before_timestamp=None, + table_names=["log"], + dry_run=False, + verbose=False, + confirm=False, + ) + mock_logger.warning.assert_any_call( + "The following tables were not cleaned due to errors: %s. Check the logs above for details.", + ["log"], + ) @patch("airflow.utils.db_cleanup._cleanup_table") def test_error_on_cleanup_failure_propagated_from_run_cleanup(self, cleanup_table_mock): diff --git a/scripts/ci/prek/known_airflow_exceptions.txt b/scripts/ci/prek/known_airflow_exceptions.txt index d7dfe1225e7c1..1a870c169d602 100644 --- a/scripts/ci/prek/known_airflow_exceptions.txt +++ b/scripts/ci/prek/known_airflow_exceptions.txt @@ -18,7 +18,7 @@ airflow-core/src/airflow/serialization/serialized_objects.py::2 airflow-core/src/airflow/ti_deps/deps/valid_state_dep.py::1 airflow-core/src/airflow/utils/cli.py::4 airflow-core/src/airflow/utils/db.py::1 -airflow-core/src/airflow/utils/db_cleanup.py::2 +airflow-core/src/airflow/utils/db_cleanup.py::1 airflow-core/src/airflow/utils/db_manager.py::3 airflow-core/src/airflow/utils/dot_renderer.py::3 airflow-core/src/airflow/utils/helpers.py::4 From 1f2730d41d608c76abb302e61c48be1aaf0d2edf Mon Sep 17 00:00:00 2001 From: Hemkumar Chheda Date: Fri, 8 May 2026 12:14:12 +0530 Subject: [PATCH 6/6] fix(db_cleanup): remove redundant cleanup commit --- airflow-core/docs/howto/usage-cli.rst | 5 ++- airflow-core/src/airflow/utils/db_cleanup.py | 5 +-- .../tests/unit/utils/test_db_cleanup.py | 44 +++++++++++++++---- 3 files changed, 42 insertions(+), 12 deletions(-) diff --git a/airflow-core/docs/howto/usage-cli.rst b/airflow-core/docs/howto/usage-cli.rst index b8fa890d25749..eba8f02d390f4 100644 --- a/airflow-core/docs/howto/usage-cli.rst +++ b/airflow-core/docs/howto/usage-cli.rst @@ -226,7 +226,7 @@ Detecting cleanup failures By default, ``db clean`` suppresses per-table errors (such as a database ``statement_timeout`` being exceeded on a very large table) and exits with code 0 even if one or more tables were not -cleaned. A WARNING is always emitted in the logs listing which tables were skipped due to errors. +cleaned. A WARNING is emitted in the logs listing which tables were skipped due to errors. To make the command exit with a non-zero code whenever any table cleanup fails — useful when ``airflow db clean`` is invoked from a DAG task and you want the task to turn red on failure — @@ -239,6 +239,9 @@ pass ``--error-on-cleanup-failure``: --yes \ --error-on-cleanup-failure +When ``--error-on-cleanup-failure`` is set, the raised ``RuntimeError`` includes the list of +tables that failed cleanup, so the command still surfaces which tables were not cleaned. + .. tip:: On large deployments where the archival ``CREATE TABLE … AS SELECT`` step itself can time diff --git a/airflow-core/src/airflow/utils/db_cleanup.py b/airflow-core/src/airflow/utils/db_cleanup.py index cd132f60b373e..0c605b8d6bd7f 100644 --- a/airflow-core/src/airflow/utils/db_cleanup.py +++ b/airflow-core/src/airflow/utils/db_cleanup.py @@ -592,8 +592,8 @@ def run_cleanup( :param session: Session representing connection to the metadata database. :param batch_size: Maximum number of rows to delete or archive in a single transaction. :param error_on_cleanup_failure: If True, raise a RuntimeError after processing all tables - if any per-table cleanup encountered an error. By default errors are suppressed and the - command exits 0 even if some tables were not cleaned. + if any per-table cleanup encountered an error. By default errors are suppressed, a warning + summary is logged, and the command exits 0 even if some tables were not cleaned. """ clean_before_timestamp = timezone.coerce_datetime(clean_before_timestamp) @@ -630,7 +630,6 @@ def run_cleanup( session=session, batch_size=batch_size, ) - session.commit() if ctx.failed: failed_tables.append(table_name) else: diff --git a/airflow-core/tests/unit/utils/test_db_cleanup.py b/airflow-core/tests/unit/utils/test_db_cleanup.py index e778fe4cc88ac..b0d7bb50dc0e9 100644 --- a/airflow-core/tests/unit/utils/test_db_cleanup.py +++ b/airflow-core/tests/unit/utils/test_db_cleanup.py @@ -145,6 +145,25 @@ def test_run_cleanup_batch_size_propagation(self, cleanup_table_mock): cleanup_table_mock.assert_called_once() assert cleanup_table_mock.call_args.kwargs["batch_size"] == 1234 + @patch("airflow.utils.db_cleanup.reflect_tables") + @patch("airflow.utils.db_cleanup._cleanup_table") + def test_run_cleanup_does_not_commit_after_cleanup_table(self, cleanup_table_mock, reflect_tables_mock): + """run_cleanup should not add an extra commit after _cleanup_table handles its own transaction.""" + reflect_tables_mock.return_value.tables = {"log": object()} + session = MagicMock() + + run_cleanup( + clean_before_timestamp=None, + table_names=["log"], + dry_run=False, + verbose=False, + confirm=False, + session=session, + ) + + cleanup_table_mock.assert_called_once() + session.commit.assert_not_called() + @pytest.mark.parametrize( "table_names", [ @@ -753,15 +772,24 @@ def test_drop_archived_tables(self, mock_input, confirm_mock, inspect_mock, capl ) def test_error_on_cleanup_failure_raises_when_flag_set(self, cleanup_table_mock): """When error_on_cleanup_failure=True and a table fails, RuntimeError should be raised.""" - with pytest.raises(RuntimeError, match="airflow db clean encountered errors"): - run_cleanup( - clean_before_timestamp=None, - table_names=["log"], - dry_run=False, - verbose=False, - confirm=False, - error_on_cleanup_failure=True, + with patch("airflow.utils.db_cleanup.logger") as mock_logger: + with pytest.raises(RuntimeError, match="airflow db clean encountered errors"): + run_cleanup( + clean_before_timestamp=None, + table_names=["log"], + dry_run=False, + verbose=False, + confirm=False, + error_on_cleanup_failure=True, + ) + + mock_logger.warning.assert_any_call( + "Encountered error when attempting to clean table '%s'. ", "log" ) + assert ( + "The following tables were not cleaned due to errors: %s. Check the logs above for details.", + ["log"], + ) not in [call.args for call in mock_logger.warning.call_args_list] @patch( "airflow.utils.db_cleanup._cleanup_table",