Skip to content

AIP-103: Adding periodic task state garbage collection and retention support#66463

Open
amoghrajesh wants to merge 2 commits intoapache:mainfrom
astronomer:aip-103-4-garbage-collection-and-cleanup
Open

AIP-103: Adding periodic task state garbage collection and retention support#66463
amoghrajesh wants to merge 2 commits intoapache:mainfrom
astronomer:aip-103-4-garbage-collection-and-cleanup

Conversation

@amoghrajesh
Copy link
Copy Markdown
Contributor

@amoghrajesh amoghrajesh commented May 6, 2026

closes: #66459

What?

Task state rows live as long as their parent DAG run. In deployments that don't run airflow db cleanup — or where task state should expire sooner than the DAG run — rows accumulate indefinitely. This PR adds an explicit retention mechanism independent of DAG run cleanup. To perform effective cleanup, following is needed:

  1. Time based Garbage Collection: delete task_state rows older than N days
  2. Early expiry: per-key override for short-lived keys like job IDs that tasks can set per state row
  3. Asset state orphan cleanup: when an asset is removed from all DAGs its asset_active entry is deleted, but asset_state rows stay behind silently

Proposed change

  • expires_at column on task_state - updated_at alone can't distinguish a 7 day key from a 30 day key. NULL means fall back to the global default_retention_days; set means delete after this timestamp regardless of updated_at. Setting default_retention_days = 0 disables time-based cleanup entirely (expires_at cleanup still runs).
  • BaseStateBackend.cleanup() no-op default — custom backends override this to implement their own retention policy. The backend reads [state_store] default_retention_days from config itself since the AIP says "the backend is responsible for enforcing the retention policy."
  • New config options under [state_store]: default_retention_days = 30 (task_state only — does not affect asset_state) and clear_on_success = False.
  • MetastoreStateBackend.cleanup() runs two passes for task_state: rows past updated_at + default_retention_days cutoff, and rows with expires_at < now().
  • airflow state-store cleanup CLI command — calls get_state_backend().cleanup(). Operators schedule this via cron or a maintenance DAG. Supports --dry-run.
  • Asset state orphan cleanup moved into the scheduler's _update_asset_orphanage() — runs in the same pass as asset deregistration, which is when the orphans are created. This is the right home since it is an internal consistency operation, not a user-facing data lifecycle decision.

Why a CLI command instead of the scheduler?

Running cleanup as a scheduler periodic task was considered but there will be concerns regarding performance to the scheduler because cleanup doesn't come without a time cost.

A dedicated CLI keeps the separation clean, schedule it where it makes sense for a deployment.

User implications / backcompat

New config options under [state_store] with safe defaults — no action needed to maintain existing behaviour. The expires_at column is nullable; existing rows get NULL (global default retention applies).

Testing

Test setup

  1. Started breeze with these configurations:
export AIRFLOW__STATE_STORE__DEFAULT_RETENTION_DAYS=1
  1. Created a dag run and pushed in 3 task states for it:
image (76)

Testing for updated_at / global cleanup

  1. Ran this query:
UPDATE task_state 
SET updated_at = '2026-05-04 00:00:00+00:00' 
WHERE key = 'job_id_2';
  1. Ran airflow state-store cleanup
[Breeze:3.10.20] root@8872db171dd2:/opt/airflow$ airflow state-store cleanup
2026-05-07T11:59:42.135506Z [info     ] setup plugin alembic.autogenerate.schemas [alembic.runtime.plugins] loc=plugins.py:37
2026-05-07T11:59:42.135615Z [info     ] setup plugin alembic.autogenerate.tables [alembic.runtime.plugins] loc=plugins.py:37
2026-05-07T11:59:42.135660Z [info     ] setup plugin alembic.autogenerate.types [alembic.runtime.plugins] loc=plugins.py:37
2026-05-07T11:59:42.135722Z [info     ] setup plugin alembic.autogenerate.constraints [alembic.runtime.plugins] loc=plugins.py:37
2026-05-07T11:59:42.135779Z [info     ] setup plugin alembic.autogenerate.defaults [alembic.runtime.plugins] loc=plugins.py:37
2026-05-07T11:59:42.135820Z [info     ] setup plugin alembic.autogenerate.comments [alembic.runtime.plugins] loc=plugins.py:37
2026-05-07T11:59:42.202654Z [info     ] Running state store cleanup    [airflow.cli.commands.state_store_command] loc=state_store_command.py:35
2026-05-07T11:59:42.224488Z [info     ] Deleted stale task_state rows  [airflow.state.metastore] loc=metastore.py:287 older_than=datetime.datetime(2026, 5, 6, 11, 59, 42, 202838, tzinfo=Timezone('UTC')) rows_deleted=1
2026-05-07T11:59:42.225427Z [info     ] Deleted expired task_state rows [airflow.state.metastore] loc=metastore.py:297 rows_deleted=0
2026-05-07T11:59:42.227473Z [info     ] State store cleanup complete   [airflow.cli.commands.state_store_command] loc=state_store_command.py:37
image

Testing for expired_at

  1. Ran this query:
UPDATE task_state 
SET expires_at = '2026-05-06 08:45:00+00:00' 
WHERE key = 'job_id';
image (78)
  1. Ran the airflow state-store cleanup command (note here that the row with expired_at is cleared even though updated_at is still not reaching its cleanup interval)
[Breeze:3.10.20] root@8872db171dd2:/opt/airflow$ airflow state-store cleanup
2026-05-07T12:00:17.664269Z [info     ] setup plugin alembic.autogenerate.schemas [alembic.runtime.plugins] loc=plugins.py:37
2026-05-07T12:00:17.664388Z [info     ] setup plugin alembic.autogenerate.tables [alembic.runtime.plugins] loc=plugins.py:37
2026-05-07T12:00:17.664450Z [info     ] setup plugin alembic.autogenerate.types [alembic.runtime.plugins] loc=plugins.py:37
2026-05-07T12:00:17.664494Z [info     ] setup plugin alembic.autogenerate.constraints [alembic.runtime.plugins] loc=plugins.py:37
2026-05-07T12:00:17.664533Z [info     ] setup plugin alembic.autogenerate.defaults [alembic.runtime.plugins] loc=plugins.py:37
2026-05-07T12:00:17.664581Z [info     ] setup plugin alembic.autogenerate.comments [alembic.runtime.plugins] loc=plugins.py:37
2026-05-07T12:00:17.721927Z [info     ] Running state store cleanup    [airflow.cli.commands.state_store_command] loc=state_store_command.py:35
2026-05-07T12:00:17.730377Z [info     ] Deleted stale task_state rows  [airflow.state.metastore] loc=metastore.py:287 older_than=datetime.datetime(2026, 5, 6, 12, 0, 17, 722029, tzinfo=Timezone('UTC')) rows_deleted=0
2026-05-07T12:00:17.731136Z [info     ] Deleted expired task_state rows [airflow.state.metastore] loc=metastore.py:297 rows_deleted=1
2026-05-07T12:00:17.733545Z [info     ] State store cleanup complete   [airflow.cli.commands.state_store_command] loc=state_store_command.py:37
image (79)

Dry run:

[Breeze:3.10.20] root@8872db171dd2:/opt/airflow$ airflow state-store cleanup --dry-run
2026-05-07T12:13:34.018675Z [info     ] setup plugin alembic.autogenerate.schemas [alembic.runtime.plugins] loc=plugins.py:37
2026-05-07T12:13:34.018817Z [info     ] setup plugin alembic.autogenerate.tables [alembic.runtime.plugins] loc=plugins.py:37
2026-05-07T12:13:34.018870Z [info     ] setup plugin alembic.autogenerate.types [alembic.runtime.plugins] loc=plugins.py:37
2026-05-07T12:13:34.018914Z [info     ] setup plugin alembic.autogenerate.constraints [alembic.runtime.plugins] loc=plugins.py:37
2026-05-07T12:13:34.018995Z [info     ] setup plugin alembic.autogenerate.defaults [alembic.runtime.plugins] loc=plugins.py:37
2026-05-07T12:13:34.019080Z [info     ] setup plugin alembic.autogenerate.comments [alembic.runtime.plugins] loc=plugins.py:37
Would delete 2 task state row(s):

  Older than retention period (1):
    DAG 'my_dag', run 'manual__2026-05-07T11:46:58.479591+00:00', task 't1', key 'job_id_2'

  Per-key expiry reached (1):
    DAG 'my_dag', run 'manual__2026-05-07T11:46:58.479591+00:00', task 't1', key 'job_id

What's next


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

@boring-cyborg boring-cyborg Bot added area:ConfigTemplates area:db-migrations PRs with DB migration area:Scheduler including HA (high availability) scheduler labels May 6, 2026
@amoghrajesh amoghrajesh self-assigned this May 6, 2026
@amoghrajesh amoghrajesh moved this from Backlog to In progress in AIP-103: Task State Management May 6, 2026
@amoghrajesh amoghrajesh added this to the Airflow 3.3.0 milestone May 6, 2026
default: "airflow.state.metastore.MetastoreStateBackend"
default_retention_days:
description: |
Number of days to retain task state rows after their last update.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will it also affect asset state?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default_retention_days only applies to task_state rows. asset_state has no time-based GC — it is cleared explicitly (as per AIP) or via orphan cleanup when the asset is deregistered. Updated the config description to make this clear.

Reads ``[state_store] default_retention_days`` from config and delegates to the
configured state backend. Runs on the interval set by ``[state_store] state_cleanup_interval``.
"""
retention_days = conf.getint("state_store", "default_retention_days")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if default_retention_days is set to 0, then we'll delete everything older than now, it doesn't look right 🤔

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, fixed it! retention_days=0 now disables time based cleanup entirely. Refactored so MetastoreStateBackend.cleanup() reads default_retention_days from config directly rather than receiving it as an argument, felt cleaner since the backend is responsible for enforcing its own retention policy. Scheduler just calls cleanup() with no args.

Copy link
Copy Markdown
Member

@jason810496 jason810496 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to introduce batching / pagination for the task state garbage collection?

Comment on lines +276 to +290
older_than = now - timedelta(days=retention_days) if retention_days > 0 else None
with create_session() as session:
if older_than:
session.execute(delete(TaskStateModel).where(TaskStateModel.updated_at < older_than))
session.execute(
delete(TaskStateModel).where(
TaskStateModel.expires_at.isnot(None),
TaskStateModel.expires_at < now,
)
)
active_asset_ids = select(AssetModel.id).join(
AssetActive, (AssetActive.name == AssetModel.name) & (AssetActive.uri == AssetModel.uri)
)
session.execute(delete(AssetStateModel).where(AssetStateModel.asset_id.not_in(active_asset_ids)))

Copy link
Copy Markdown
Member

@jason810496 jason810496 May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's a valid assumption that users might produce a large amount of state records here between the state_cleanup_interval time window. I have some concern regarding the single pass delete Transaction here.

I just double checked the concern with Claude:


Yes, this is a real problem. Several compounding issues:

  1. Missing indexes — every cleanup will be a full table scan

The two predicates the cleanup filters on are not indexed:

  • task_state.updated_at — no index (only task_state_pkey on (dag_run_id,
    task_id, map_index, key) and idx_task_state_lookup on (dag_id, run_id,
    task_id, map_index))
  • task_state.expires_at — no index (just added in this PR)

So both DELETE WHERE updated_at < cutoff and DELETE WHERE expires_at < now()
do full sequential scans. On a deployment with millions of rows that's minutes
of scanning every 24h, plus the locks held for the whole duration.

  1. No batching / no LIMIT

Compare to airflow db cleanup (utils/db_cleanup.py:217), which deletes in
configurable batches and commits between them. The new path runs three plain
bulk DELETEs in a single session. Long-running bulk DELETE means:

  • Row locks held for the duration (writers calling task_state.set() upserts on
    matching rows block — they queue behind the cleanup transaction).
  • On Postgres: massive WAL churn, autovacuum can't keep up, table bloat.
  • On MySQL/InnoDB at REPEATABLE READ (Airflow's default): next-key/gap locks
    make conflicts even more likely.
  1. All three DELETEs share one transaction

with create_session() as session: opens one session; each session.execute()
runs inside it; commit happens at exit. If pass 1 takes 90s, the locks from
pass 1 are held while pass 2 and pass 3 run. A failure in pass 3 rolls back
passes 1 and 2 (cleanup makes no forward progress at all).

  1. Scheduler main loop is blocked

_cleanup_expired_task_state is registered via call_regular_interval, which is
synchronous in the scheduler loop. Same pattern as
_remove_unreferenced_triggers and _update_asset_orphanage — but those have
small cardinality. task_state is user-driven and unbounded (the AIP encourages
users to write a lot of it). With a multi-minute cleanup the scheduler is not
scheduling for those minutes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, good catches.
I will address all of them except last now cos its invalid from scheduler perspective, its a cli command now

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled it in: cdc4237

@amoghrajesh amoghrajesh force-pushed the aip-103-4-garbage-collection-and-cleanup branch from 082d92d to 7dc826d Compare May 7, 2026 12:24
@amoghrajesh amoghrajesh force-pushed the aip-103-4-garbage-collection-and-cleanup branch from 7dc826d to b644ce6 Compare May 7, 2026 12:29
STATE_STORE_COMMANDS = (
ActionCommand(
name="cleanup",
help="Remove expired task state rows via the configured state backend",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
help="Remove expired task state rows via the configured state backend",
help="Remove expired stored state via the configured state backend",

Comment on lines +69 to +70
# even if updated_at is recent. NULL means no early expiry — the row is still cleaned
# up by the global updated_at + default_retention_days check. Populated via
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing IMO -- an expires_at of None should mean it never expires.

We can pre-compute the expires_at value at update time by reading the default_retention config then (i.e. cleanup becomes a simpler "SELECT where expires_at < Now()`.

This possibly also removes the need for an index on udpated_at.

Comment on lines +279 to +284
pk_cols = (
TaskStateModel.dag_run_id,
TaskStateModel.task_id,
TaskStateModel.map_index,
TaskStateModel.key,
)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given how this is used, it might be time to add a single column id pk (either integer, or uuid)

def _delete_batched(where_clause) -> int:
total = 0
while True:
with create_session() as session:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this should be a new session object each time around the loop, but instead one session object that is explicitly session.commit()ed after each batch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:ConfigTemplates area:db-migrations PRs with DB migration area:Scheduler including HA (high availability) scheduler

Projects

Status: In progress

Development

Successfully merging this pull request may close these issues.

Add periodic task state GC and expires_at retention support

4 participants