Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions airflow-core/docs/howto/usage-cli.rst
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,31 @@ By default, ``db clean`` will archive purged rows in tables of the form ``_airfl

When you encounter an error without using ``--skip-archive``, ``_airflow_deleted__<table>__<timestamp>`` would still exist in the DB. You can use ``db drop-archived`` command to manually drop these tables.

Detecting cleanup failures
^^^^^^^^^^^^^^^^^^^^^^^^^^

By default, ``db clean`` suppresses per-table errors (such as a database ``statement_timeout``
being exceeded on a very large table) and exits with code 0 even if one or more tables were not
cleaned. A WARNING is always emitted in the logs listing which tables were skipped due to errors.

To make the command exit with a non-zero code whenever any table cleanup fails — useful when
``airflow db clean`` is invoked from a DAG task and you want the task to turn red on failure —
pass ``--error-on-cleanup-failure``:
Comment thread
jscheffl marked this conversation as resolved.
Outdated

.. code-block:: bash

airflow db clean \
--clean-before-timestamp "$(date -u -d '21 days ago' '+%Y-%m-%dT%H:%M:%S+00:00')" \
--yes \
--error-on-cleanup-failure

.. tip::

On large deployments where the archival ``CREATE TABLE … AS SELECT`` step itself can time
out, combining ``--error-on-cleanup-failure`` with ``--skip-archive`` is recommended.
``--skip-archive`` deletes rows directly without the intermediate archive table, making the
operation both faster and less likely to hit ``statement_timeout``.

Export the purged records from the archive tables
-------------------------------------------------
The ``db export-archived`` command exports the contents of the archived tables, created by the ``db clean`` command,
Expand Down
1 change: 1 addition & 0 deletions airflow-core/newsfragments/65239.bugfix.rst
Comment thread
hkc-8010 marked this conversation as resolved.
Outdated
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ``--error-on-cleanup-failure`` flag to ``airflow db clean`` so that per-table cleanup errors, which were previously suppressed and caused the command to exit 0 silently, can now surface as a non-zero exit code. A warning summary listing failed tables is always emitted regardless of the flag.
11 changes: 11 additions & 0 deletions airflow-core/src/airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,16 @@ def string_lower_type(val):
"Lower values reduce long-running locks but increase the number of batches."
),
)
ARG_DB_ERROR_ON_CLEANUP_FAILURE = Arg(
("--error-on-cleanup-failure",),
help=(
"If set, the command will exit with a non-zero exit code if any table cleanup encountered "
"an error. By default errors are suppressed and the command exits 0 even if some tables "
"were not cleaned. Recommended when running airflow db clean from a DAG or automated "
"workflow so that failures are surfaced rather than silently skipped."
Comment thread
hkc-8010 marked this conversation as resolved.
Outdated
),
action="store_true",
)
ARG_DAG_IDS = Arg(
("--dag-ids",),
default=None,
Expand Down Expand Up @@ -1603,6 +1613,7 @@ class GroupCommand(NamedTuple):
ARG_DB_BATCH_SIZE,
ARG_DAG_IDS,
ARG_EXCLUDE_DAG_IDS,
ARG_DB_ERROR_ON_CLEANUP_FAILURE,
),
),
ActionCommand(
Expand Down
1 change: 1 addition & 0 deletions airflow-core/src/airflow/cli/commands/db_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ def cleanup_tables(args):
batch_size=args.batch_size,
dag_ids=args.dag_ids,
exclude_dag_ids=args.exclude_dag_ids,
error_on_cleanup_failure=args.error_on_cleanup_failure,
)


Expand Down
39 changes: 35 additions & 4 deletions airflow-core/src/airflow/utils/db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import csv
import logging
import os
from collections.abc import Generator
from contextlib import contextmanager
from dataclasses import dataclass
from types import SimpleNamespace
from typing import TYPE_CHECKING, Any

from sqlalchemy import and_, column, func, inspect, select, table, text
Expand Down Expand Up @@ -475,11 +477,22 @@ def _print_config(*, configs: dict[str, _TableConfig]) -> None:


@contextmanager
def _suppress_with_logging(table: str, session: Session):
"""Suppresses errors but logs them."""
def _suppress_with_logging(table: str, session: Session) -> Generator[SimpleNamespace, None, None]:
"""
Suppress per-table cleanup errors, log them, and expose failure state to the caller.

Yields a :class:`~types.SimpleNamespace` with a single attribute ``failed`` (bool).
When an :class:`~sqlalchemy.exc.OperationalError` or
:class:`~sqlalchemy.exc.ProgrammingError` is raised inside the ``with`` block the
exception is swallowed, ``ctx.failed`` is set to ``True``, a WARNING is emitted for
the table, and the session is rolled back. The caller can inspect ``ctx.failed``
after the block to decide whether to surface the error upstream.
"""
ctx = SimpleNamespace(failed=False)
try:
yield
yield ctx
except (OperationalError, ProgrammingError):
ctx.failed = True
logger.warning("Encountered error when attempting to clean table '%s'. ", table)
logger.debug("Traceback for table '%s'", table, exc_info=True)
if session.is_active:
Expand Down Expand Up @@ -554,6 +567,7 @@ def run_cleanup(
skip_archive: bool = False,
session: Session = NEW_SESSION,
batch_size: int | None = None,
error_on_cleanup_failure: bool = False,
) -> None:
"""
Purges old records in airflow metadata database.
Expand All @@ -577,6 +591,9 @@ def run_cleanup(
:param skip_archive: Set to True if you don't want the purged rows preserved in an archive table.
:param session: Session representing connection to the metadata database.
:param batch_size: Maximum number of rows to delete or archive in a single transaction.
:param error_on_cleanup_failure: If True, raise an AirflowException after processing all tables
if any per-table cleanup encountered an error. By default errors are suppressed and the
command exits 0 even if some tables were not cleaned.
"""
clean_before_timestamp = timezone.coerce_datetime(clean_before_timestamp)

Expand All @@ -597,10 +614,11 @@ def run_cleanup(
exclude_dag_ids=exclude_dag_ids,
)
existing_tables = reflect_tables(tables=None, session=session).tables
failed_tables: list[str] = []

for table_name, table_config in effective_config_dict.items():
if table_name in existing_tables:
with _suppress_with_logging(table_name, session):
with _suppress_with_logging(table_name, session) as ctx:
_cleanup_table(
clean_before_timestamp=clean_before_timestamp,
dag_ids=dag_ids,
Expand All @@ -613,9 +631,22 @@ def run_cleanup(
batch_size=batch_size,
)
session.commit()
Comment thread
jscheffl marked this conversation as resolved.
Outdated
if ctx.failed:
failed_tables.append(table_name)
else:
logger.warning("Table %s not found. Skipping.", table_name)

if failed_tables:
logger.warning(
Comment thread
hkc-8010 marked this conversation as resolved.
Outdated
"The following tables were not cleaned due to errors: %s. Check the logs above for details.",
failed_tables,
Comment thread
jscheffl marked this conversation as resolved.
)
Comment thread
jscheffl marked this conversation as resolved.
if error_on_cleanup_failure:
raise AirflowException(
Comment thread
hkc-8010 marked this conversation as resolved.
Outdated
f"airflow db clean encountered errors on the following tables and did not clean them: "
f"{failed_tables}. Check the logs above for details."
)


@provide_session
def export_archived_records(
Expand Down
40 changes: 40 additions & 0 deletions airflow-core/tests/unit/cli/commands/test_db_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,7 @@ def test_date_timezone_omitted(self, run_cleanup_mock, timezone):
confirm=False,
skip_archive=False,
batch_size=None,
error_on_cleanup_failure=False,
)

@pytest.mark.parametrize("timezone", ["UTC", "Europe/Berlin", "America/Los_Angeles"])
Expand All @@ -756,6 +757,7 @@ def test_date_timezone_supplied(self, run_cleanup_mock, timezone):
confirm=False,
skip_archive=False,
batch_size=None,
error_on_cleanup_failure=False,
)

@pytest.mark.parametrize(("confirm_arg", "expected"), [(["-y"], False), ([], True)])
Expand Down Expand Up @@ -785,6 +787,7 @@ def test_confirm(self, run_cleanup_mock, confirm_arg, expected):
confirm=expected,
skip_archive=False,
batch_size=None,
error_on_cleanup_failure=False,
)

@pytest.mark.parametrize(("extra_arg", "expected"), [(["--skip-archive"], True), ([], False)])
Expand Down Expand Up @@ -814,6 +817,7 @@ def test_skip_archive(self, run_cleanup_mock, extra_arg, expected):
confirm=True,
skip_archive=expected,
batch_size=None,
error_on_cleanup_failure=False,
)

@pytest.mark.parametrize(("dry_run_arg", "expected"), [(["--dry-run"], True), ([], False)])
Expand Down Expand Up @@ -843,6 +847,7 @@ def test_dry_run(self, run_cleanup_mock, dry_run_arg, expected):
confirm=True,
skip_archive=False,
batch_size=None,
error_on_cleanup_failure=False,
)

@pytest.mark.parametrize(
Expand Down Expand Up @@ -874,6 +879,7 @@ def test_tables(self, run_cleanup_mock, extra_args, expected):
confirm=True,
skip_archive=False,
batch_size=None,
error_on_cleanup_failure=False,
)

@pytest.mark.parametrize(("extra_args", "expected"), [(["--verbose"], True), ([], False)])
Expand Down Expand Up @@ -903,6 +909,7 @@ def test_verbose(self, run_cleanup_mock, extra_args, expected):
confirm=True,
skip_archive=False,
batch_size=None,
error_on_cleanup_failure=False,
)

@pytest.mark.parametrize(("extra_args", "expected"), [(["--batch-size", "1234"], 1234), ([], None)])
Expand Down Expand Up @@ -932,6 +939,7 @@ def test_batch_size(self, run_cleanup_mock, extra_args, expected):
confirm=True,
skip_archive=False,
batch_size=expected,
error_on_cleanup_failure=False,
)

@pytest.mark.parametrize(
Expand Down Expand Up @@ -963,6 +971,7 @@ def test_dag_ids(self, run_cleanup_mock, extra_args, expected):
confirm=True,
skip_archive=False,
batch_size=None,
error_on_cleanup_failure=False,
)

@pytest.mark.parametrize(
Expand Down Expand Up @@ -994,6 +1003,37 @@ def test_exclude_dag_ids(self, run_cleanup_mock, extra_args, expected):
confirm=True,
skip_archive=False,
batch_size=None,
error_on_cleanup_failure=False,
)

@pytest.mark.parametrize(
("extra_args", "expected"), [(["--error-on-cleanup-failure"], True), ([], False)]
)
@patch("airflow.cli.commands.db_command.run_cleanup")
def test_error_on_cleanup_failure(self, run_cleanup_mock, extra_args, expected):
"""When --error-on-cleanup-failure is passed, error_on_cleanup_failure should be True."""
args = self.parser.parse_args(
[
"db",
"clean",
"--clean-before-timestamp",
"2021-01-01",
*extra_args,
]
)
db_command.cleanup_tables(args)

run_cleanup_mock.assert_called_once_with(
table_names=None,
dry_run=False,
dag_ids=None,
exclude_dag_ids=None,
clean_before_timestamp=pendulum.parse("2021-01-01 00:00:00Z"),
verbose=False,
confirm=True,
skip_archive=False,
batch_size=None,
error_on_cleanup_failure=expected,
)

@patch("airflow.cli.commands.db_command.export_archived_records")
Expand Down
61 changes: 61 additions & 0 deletions airflow-core/tests/unit/utils/test_db_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,67 @@ def test_drop_archived_tables(self, mock_input, confirm_mock, inspect_mock, capl
else:
confirm_mock.assert_not_called()

@patch(
"airflow.utils.db_cleanup._cleanup_table",
side_effect=OperationalError("", {}, Exception("mock db error")),
)
def test_error_on_cleanup_failure_raises_when_flag_set(self, cleanup_table_mock, caplog):
Comment thread
hkc-8010 marked this conversation as resolved.
Outdated
"""When error_on_cleanup_failure=True and a table fails, AirflowException should be raised."""
with pytest.raises(AirflowException, match="airflow db clean encountered errors"):
run_cleanup(
clean_before_timestamp=None,
table_names=["log"],
dry_run=False,
verbose=False,
confirm=False,
error_on_cleanup_failure=True,
)

@patch(
"airflow.utils.db_cleanup._cleanup_table",
side_effect=OperationalError("", {}, Exception("mock db error")),
)
def test_error_on_cleanup_failure_no_raise_by_default(self, cleanup_table_mock, caplog):
"""When error_on_cleanup_failure=False (default) and a table fails, no exception is raised."""
run_cleanup(
clean_before_timestamp=None,
table_names=["log"],
dry_run=False,
verbose=False,
confirm=False,
error_on_cleanup_failure=False,
)
assert "The following tables were not cleaned due to errors" in caplog.text

@patch(
"airflow.utils.db_cleanup._cleanup_table",
side_effect=OperationalError("", {}, Exception("mock db error")),
)
def test_error_on_cleanup_failure_lists_failed_tables_in_warning(self, cleanup_table_mock, caplog):
"""A warning naming the failed tables should always be emitted, regardless of the flag."""
run_cleanup(
clean_before_timestamp=None,
table_names=["log"],
dry_run=False,
verbose=False,
confirm=False,
)
assert "log" in caplog.text
assert "The following tables were not cleaned due to errors" in caplog.text

@patch("airflow.utils.db_cleanup._cleanup_table")
def test_error_on_cleanup_failure_propagated_from_run_cleanup(self, cleanup_table_mock):
"""Ensure error_on_cleanup_failure is accepted by run_cleanup without errors when no failures occur."""
run_cleanup(
clean_before_timestamp=None,
table_names=["log"],
dry_run=False,
verbose=False,
confirm=False,
error_on_cleanup_failure=True,
)
cleanup_table_mock.assert_called_once()


def create_tis(base_date, num_tis, run_type=DagRunType.SCHEDULED):
from tests_common.test_utils.taskinstance import create_task_instance
Expand Down
2 changes: 1 addition & 1 deletion scripts/ci/prek/known_airflow_exceptions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ airflow-core/src/airflow/serialization/serialized_objects.py::2
airflow-core/src/airflow/ti_deps/deps/valid_state_dep.py::1
airflow-core/src/airflow/utils/cli.py::4
airflow-core/src/airflow/utils/db.py::1
airflow-core/src/airflow/utils/db_cleanup.py::1
airflow-core/src/airflow/utils/db_cleanup.py::2
airflow-core/src/airflow/utils/db_manager.py::3
airflow-core/src/airflow/utils/dot_renderer.py::3
airflow-core/src/airflow/utils/helpers.py::4
Expand Down
Loading