diff --git a/airflow-core/docs/howto/usage-cli.rst b/airflow-core/docs/howto/usage-cli.rst
index b80faf89b3ec0..eba8f02d390f4 100644
--- a/airflow-core/docs/howto/usage-cli.rst
+++ b/airflow-core/docs/howto/usage-cli.rst
@@ -221,6 +221,34 @@ By default, ``db clean`` will archive purged rows in tables of the form ``_airfl
When you encounter an error without using ``--skip-archive``, ``_airflow_deleted__
__`` would still exist in the DB. You can use ``db drop-archived`` command to manually drop these tables.
+Detecting cleanup failures
+^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+By default, ``db clean`` suppresses per-table errors (such as a database ``statement_timeout``
+being exceeded on a very large table) and exits with code 0 even if one or more tables were not
+cleaned. A WARNING is emitted in the logs listing which tables were skipped due to errors.
+
+To make the command exit with a non-zero code whenever any table cleanup fails — useful when
+``airflow db clean`` is invoked from a DAG task and you want the task to turn red on failure —
+pass ``--error-on-cleanup-failure``:
+
+.. code-block:: bash
+
+ airflow db clean \
+ --clean-before-timestamp "$(date -u -d '21 days ago' '+%Y-%m-%dT%H:%M:%S+00:00')" \
+ --yes \
+ --error-on-cleanup-failure
+
+When ``--error-on-cleanup-failure`` is set, the raised ``RuntimeError`` includes the list of
+tables that failed cleanup, so the command still surfaces which tables were not cleaned.
+
+.. tip::
+
+ On large deployments where the archival ``CREATE TABLE … AS SELECT`` step itself can time
+ out, combining ``--error-on-cleanup-failure`` with ``--skip-archive`` is recommended.
+ ``--skip-archive`` deletes rows directly without the intermediate archive table, making the
+ operation both faster and less likely to hit ``statement_timeout``.
+
Export the purged records from the archive tables
-------------------------------------------------
The ``db export-archived`` command exports the contents of the archived tables, created by the ``db clean`` command,
diff --git a/airflow-core/src/airflow/cli/cli_config.py b/airflow-core/src/airflow/cli/cli_config.py
index a09851e5e24ff..990325de74bde 100644
--- a/airflow-core/src/airflow/cli/cli_config.py
+++ b/airflow-core/src/airflow/cli/cli_config.py
@@ -541,6 +541,11 @@ def string_lower_type(val):
"Lower values reduce long-running locks but increase the number of batches."
),
)
+ARG_DB_ERROR_ON_CLEANUP_FAILURE = Arg(
+ ("--error-on-cleanup-failure",),
+ help="Command will exit with a non-zero exit code if any table cleanup failed. By default errors are suppressed and the command exits 0.",
+ action="store_true",
+)
ARG_DAG_IDS = Arg(
("--dag-ids",),
default=None,
@@ -1603,6 +1608,7 @@ class GroupCommand(NamedTuple):
ARG_DB_BATCH_SIZE,
ARG_DAG_IDS,
ARG_EXCLUDE_DAG_IDS,
+ ARG_DB_ERROR_ON_CLEANUP_FAILURE,
),
),
ActionCommand(
diff --git a/airflow-core/src/airflow/cli/commands/db_command.py b/airflow-core/src/airflow/cli/commands/db_command.py
index 253e71d56f39d..4638746f568f4 100644
--- a/airflow-core/src/airflow/cli/commands/db_command.py
+++ b/airflow-core/src/airflow/cli/commands/db_command.py
@@ -349,6 +349,7 @@ def cleanup_tables(args):
batch_size=args.batch_size,
dag_ids=args.dag_ids,
exclude_dag_ids=args.exclude_dag_ids,
+ error_on_cleanup_failure=args.error_on_cleanup_failure,
)
diff --git a/airflow-core/src/airflow/utils/db_cleanup.py b/airflow-core/src/airflow/utils/db_cleanup.py
index e6b5283669b86..0c605b8d6bd7f 100644
--- a/airflow-core/src/airflow/utils/db_cleanup.py
+++ b/airflow-core/src/airflow/utils/db_cleanup.py
@@ -26,8 +26,10 @@
import csv
import logging
import os
+from collections.abc import Generator
from contextlib import contextmanager
from dataclasses import dataclass
+from types import SimpleNamespace
from typing import TYPE_CHECKING, Any
from sqlalchemy import and_, column, func, inspect, select, table, text
@@ -475,11 +477,22 @@ def _print_config(*, configs: dict[str, _TableConfig]) -> None:
@contextmanager
-def _suppress_with_logging(table: str, session: Session):
- """Suppresses errors but logs them."""
+def _suppress_with_logging(table: str, session: Session) -> Generator[SimpleNamespace, None, None]:
+ """
+ Suppress per-table cleanup errors, log them, and expose failure state to the caller.
+
+ Yields a :class:`~types.SimpleNamespace` with a single attribute ``failed`` (bool).
+ When an :class:`~sqlalchemy.exc.OperationalError` or
+ :class:`~sqlalchemy.exc.ProgrammingError` is raised inside the ``with`` block the
+ exception is swallowed, ``ctx.failed`` is set to ``True``, a WARNING is emitted for
+ the table, and the session is rolled back. The caller can inspect ``ctx.failed``
+ after the block to decide whether to surface the error upstream.
+ """
+ ctx = SimpleNamespace(failed=False)
try:
- yield
+ yield ctx
except (OperationalError, ProgrammingError):
+ ctx.failed = True
logger.warning("Encountered error when attempting to clean table '%s'. ", table)
logger.debug("Traceback for table '%s'", table, exc_info=True)
if session.is_active:
@@ -554,6 +567,7 @@ def run_cleanup(
skip_archive: bool = False,
session: Session = NEW_SESSION,
batch_size: int | None = None,
+ error_on_cleanup_failure: bool = False,
) -> None:
"""
Purges old records in airflow metadata database.
@@ -577,6 +591,9 @@ def run_cleanup(
:param skip_archive: Set to True if you don't want the purged rows preserved in an archive table.
:param session: Session representing connection to the metadata database.
:param batch_size: Maximum number of rows to delete or archive in a single transaction.
+ :param error_on_cleanup_failure: If True, raise a RuntimeError after processing all tables
+ if any per-table cleanup encountered an error. By default errors are suppressed, a warning
+ summary is logged, and the command exits 0 even if some tables were not cleaned.
"""
clean_before_timestamp = timezone.coerce_datetime(clean_before_timestamp)
@@ -597,10 +614,11 @@ def run_cleanup(
exclude_dag_ids=exclude_dag_ids,
)
existing_tables = reflect_tables(tables=None, session=session).tables
+ failed_tables: list[str] = []
for table_name, table_config in effective_config_dict.items():
if table_name in existing_tables:
- with _suppress_with_logging(table_name, session):
+ with _suppress_with_logging(table_name, session) as ctx:
_cleanup_table(
clean_before_timestamp=clean_before_timestamp,
dag_ids=dag_ids,
@@ -612,10 +630,22 @@ def run_cleanup(
session=session,
batch_size=batch_size,
)
- session.commit()
+ if ctx.failed:
+ failed_tables.append(table_name)
else:
logger.warning("Table %s not found. Skipping.", table_name)
+ if failed_tables:
+ if error_on_cleanup_failure:
+ raise RuntimeError(
+ f"airflow db clean encountered errors on the following tables and did not clean them: "
+ f"{failed_tables}. Check the logs above for details."
+ )
+ logger.warning(
+ "The following tables were not cleaned due to errors: %s. Check the logs above for details.",
+ failed_tables,
+ )
+
@provide_session
def export_archived_records(
diff --git a/airflow-core/tests/unit/cli/commands/test_db_command.py b/airflow-core/tests/unit/cli/commands/test_db_command.py
index a994bf9fb51cb..6c2cb44a628da 100644
--- a/airflow-core/tests/unit/cli/commands/test_db_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_db_command.py
@@ -793,6 +793,7 @@ def test_date_timezone_omitted(self, run_cleanup_mock, timezone):
confirm=False,
skip_archive=False,
batch_size=None,
+ error_on_cleanup_failure=False,
)
@pytest.mark.parametrize("timezone", ["UTC", "Europe/Berlin", "America/Los_Angeles"])
@@ -816,6 +817,7 @@ def test_date_timezone_supplied(self, run_cleanup_mock, timezone):
confirm=False,
skip_archive=False,
batch_size=None,
+ error_on_cleanup_failure=False,
)
@pytest.mark.parametrize(("confirm_arg", "expected"), [(["-y"], False), ([], True)])
@@ -845,6 +847,7 @@ def test_confirm(self, run_cleanup_mock, confirm_arg, expected):
confirm=expected,
skip_archive=False,
batch_size=None,
+ error_on_cleanup_failure=False,
)
@pytest.mark.parametrize(("extra_arg", "expected"), [(["--skip-archive"], True), ([], False)])
@@ -874,6 +877,7 @@ def test_skip_archive(self, run_cleanup_mock, extra_arg, expected):
confirm=True,
skip_archive=expected,
batch_size=None,
+ error_on_cleanup_failure=False,
)
@pytest.mark.parametrize(("dry_run_arg", "expected"), [(["--dry-run"], True), ([], False)])
@@ -903,6 +907,7 @@ def test_dry_run(self, run_cleanup_mock, dry_run_arg, expected):
confirm=True,
skip_archive=False,
batch_size=None,
+ error_on_cleanup_failure=False,
)
@pytest.mark.parametrize(
@@ -934,6 +939,7 @@ def test_tables(self, run_cleanup_mock, extra_args, expected):
confirm=True,
skip_archive=False,
batch_size=None,
+ error_on_cleanup_failure=False,
)
@pytest.mark.parametrize(("extra_args", "expected"), [(["--verbose"], True), ([], False)])
@@ -963,6 +969,7 @@ def test_verbose(self, run_cleanup_mock, extra_args, expected):
confirm=True,
skip_archive=False,
batch_size=None,
+ error_on_cleanup_failure=False,
)
@pytest.mark.parametrize(("extra_args", "expected"), [(["--batch-size", "1234"], 1234), ([], None)])
@@ -992,6 +999,7 @@ def test_batch_size(self, run_cleanup_mock, extra_args, expected):
confirm=True,
skip_archive=False,
batch_size=expected,
+ error_on_cleanup_failure=False,
)
@pytest.mark.parametrize(
@@ -1023,6 +1031,7 @@ def test_dag_ids(self, run_cleanup_mock, extra_args, expected):
confirm=True,
skip_archive=False,
batch_size=None,
+ error_on_cleanup_failure=False,
)
@pytest.mark.parametrize(
@@ -1054,6 +1063,37 @@ def test_exclude_dag_ids(self, run_cleanup_mock, extra_args, expected):
confirm=True,
skip_archive=False,
batch_size=None,
+ error_on_cleanup_failure=False,
+ )
+
+ @pytest.mark.parametrize(
+ ("extra_args", "expected"), [(["--error-on-cleanup-failure"], True), ([], False)]
+ )
+ @patch("airflow.cli.commands.db_command.run_cleanup")
+ def test_error_on_cleanup_failure(self, run_cleanup_mock, extra_args, expected):
+ """When --error-on-cleanup-failure is passed, error_on_cleanup_failure should be True."""
+ args = self.parser.parse_args(
+ [
+ "db",
+ "clean",
+ "--clean-before-timestamp",
+ "2021-01-01",
+ *extra_args,
+ ]
+ )
+ db_command.cleanup_tables(args)
+
+ run_cleanup_mock.assert_called_once_with(
+ table_names=None,
+ dry_run=False,
+ dag_ids=None,
+ exclude_dag_ids=None,
+ clean_before_timestamp=pendulum.parse("2021-01-01 00:00:00Z"),
+ verbose=False,
+ confirm=True,
+ skip_archive=False,
+ batch_size=None,
+ error_on_cleanup_failure=expected,
)
@patch("airflow.cli.commands.db_command.export_archived_records")
diff --git a/airflow-core/tests/unit/utils/test_db_cleanup.py b/airflow-core/tests/unit/utils/test_db_cleanup.py
index 2ddca75ec57c4..b0d7bb50dc0e9 100644
--- a/airflow-core/tests/unit/utils/test_db_cleanup.py
+++ b/airflow-core/tests/unit/utils/test_db_cleanup.py
@@ -145,6 +145,25 @@ def test_run_cleanup_batch_size_propagation(self, cleanup_table_mock):
cleanup_table_mock.assert_called_once()
assert cleanup_table_mock.call_args.kwargs["batch_size"] == 1234
+ @patch("airflow.utils.db_cleanup.reflect_tables")
+ @patch("airflow.utils.db_cleanup._cleanup_table")
+ def test_run_cleanup_does_not_commit_after_cleanup_table(self, cleanup_table_mock, reflect_tables_mock):
+ """run_cleanup should not add an extra commit after _cleanup_table handles its own transaction."""
+ reflect_tables_mock.return_value.tables = {"log": object()}
+ session = MagicMock()
+
+ run_cleanup(
+ clean_before_timestamp=None,
+ table_names=["log"],
+ dry_run=False,
+ verbose=False,
+ confirm=False,
+ session=session,
+ )
+
+ cleanup_table_mock.assert_called_once()
+ session.commit.assert_not_called()
+
@pytest.mark.parametrize(
"table_names",
[
@@ -549,19 +568,25 @@ def test_no_models_missing(self):
assert set(all_models) - exclusion_list.union(config_dict) == set()
assert exclusion_list.isdisjoint(config_dict)
- def test_no_failure_warnings(self, caplog):
+ def test_no_failure_warnings(self):
"""
Ensure every table we have configured (and that is present in the db) can be cleaned successfully.
For example, this checks that the recency column is actually a column.
"""
- run_cleanup(clean_before_timestamp=timezone.utcnow(), dry_run=True)
- assert "Encountered error when attempting to clean table" not in caplog.text
+ with patch("airflow.utils.db_cleanup.logger") as mock_logger:
+ run_cleanup(clean_before_timestamp=timezone.utcnow(), dry_run=True)
+ for call in mock_logger.warning.call_args_list:
+ assert "Encountered error when attempting to clean table" not in str(call)
# Lets check we have the right error message just in case
- caplog.clear()
- with patch("airflow.utils.db_cleanup._cleanup_table", side_effect=OperationalError("oops", {}, None)):
+ with (
+ patch("airflow.utils.db_cleanup.logger") as mock_logger,
+ patch("airflow.utils.db_cleanup._cleanup_table", side_effect=OperationalError("oops", {}, None)),
+ ):
run_cleanup(clean_before_timestamp=timezone.utcnow(), table_names=["task_instance"], dry_run=True)
- assert "Encountered error when attempting to clean table" in caplog.text
+ mock_logger.warning.assert_any_call(
+ "Encountered error when attempting to clean table '%s'. ", "task_instance"
+ )
@pytest.mark.parametrize(
"drop_archive",
@@ -741,6 +766,83 @@ def test_drop_archived_tables(self, mock_input, confirm_mock, inspect_mock, capl
else:
confirm_mock.assert_not_called()
+ @patch(
+ "airflow.utils.db_cleanup._cleanup_table",
+ side_effect=OperationalError("", {}, Exception("mock db error")),
+ )
+ def test_error_on_cleanup_failure_raises_when_flag_set(self, cleanup_table_mock):
+ """When error_on_cleanup_failure=True and a table fails, RuntimeError should be raised."""
+ with patch("airflow.utils.db_cleanup.logger") as mock_logger:
+ with pytest.raises(RuntimeError, match="airflow db clean encountered errors"):
+ run_cleanup(
+ clean_before_timestamp=None,
+ table_names=["log"],
+ dry_run=False,
+ verbose=False,
+ confirm=False,
+ error_on_cleanup_failure=True,
+ )
+
+ mock_logger.warning.assert_any_call(
+ "Encountered error when attempting to clean table '%s'. ", "log"
+ )
+ assert (
+ "The following tables were not cleaned due to errors: %s. Check the logs above for details.",
+ ["log"],
+ ) not in [call.args for call in mock_logger.warning.call_args_list]
+
+ @patch(
+ "airflow.utils.db_cleanup._cleanup_table",
+ side_effect=OperationalError("", {}, Exception("mock db error")),
+ )
+ def test_error_on_cleanup_failure_no_raise_by_default(self, cleanup_table_mock):
+ """When error_on_cleanup_failure=False (default) and a table fails, no exception is raised."""
+ with patch("airflow.utils.db_cleanup.logger") as mock_logger:
+ run_cleanup(
+ clean_before_timestamp=None,
+ table_names=["log"],
+ dry_run=False,
+ verbose=False,
+ confirm=False,
+ error_on_cleanup_failure=False,
+ )
+ mock_logger.warning.assert_any_call(
+ "The following tables were not cleaned due to errors: %s. Check the logs above for details.",
+ ["log"],
+ )
+
+ @patch(
+ "airflow.utils.db_cleanup._cleanup_table",
+ side_effect=OperationalError("", {}, Exception("mock db error")),
+ )
+ def test_error_on_cleanup_failure_lists_failed_tables_in_warning(self, cleanup_table_mock):
+ """A warning naming the failed tables is emitted when error_on_cleanup_failure is not set."""
+ with patch("airflow.utils.db_cleanup.logger") as mock_logger:
+ run_cleanup(
+ clean_before_timestamp=None,
+ table_names=["log"],
+ dry_run=False,
+ verbose=False,
+ confirm=False,
+ )
+ mock_logger.warning.assert_any_call(
+ "The following tables were not cleaned due to errors: %s. Check the logs above for details.",
+ ["log"],
+ )
+
+ @patch("airflow.utils.db_cleanup._cleanup_table")
+ def test_error_on_cleanup_failure_propagated_from_run_cleanup(self, cleanup_table_mock):
+ """Ensure error_on_cleanup_failure is accepted by run_cleanup without errors when no failures occur."""
+ run_cleanup(
+ clean_before_timestamp=None,
+ table_names=["log"],
+ dry_run=False,
+ verbose=False,
+ confirm=False,
+ error_on_cleanup_failure=True,
+ )
+ cleanup_table_mock.assert_called_once()
+
def create_tis(base_date, num_tis, run_type=DagRunType.SCHEDULED):
from tests_common.test_utils.taskinstance import create_task_instance