Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2743,6 +2743,24 @@ scheduler:
type: integer
default: "20"
see_also: ":ref:`scheduler:ha:tunables`"
dag_cache_size:
description: |
Size of the LRU cache for SerializedDAG objects in the scheduler.
Set to 0 to use an unbounded dict with no eviction.
The cache is keyed by Dag version ID.
version_added: 3.3.0
type: integer
example: ~
default: "1024"

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This default might be low. That's the number of versions accross dags. Having more than 1024 dags can be common. Trading the memory issue for repeated db read.

It's configurable anyway, but maybe a bigger default can be better suited.

dag_cache_ttl:
description: |
Time-to-live in seconds for cached SerializedDAG objects in the scheduler.
After this time, cached DAGs will be re-fetched from the database on next access.
Set to 0 to disable TTL, so entries will only be evicted by the LRU policy.
version_added: 3.3.0
type: integer
example: ~
default: "10800"
partition_mapper_max_downstream_keys:
description: |
Maximum number of downstream partition keys produced by a single
Expand Down
19 changes: 18 additions & 1 deletion airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,25 @@ def __init__(

if log:
self._log = log
dag_cache_size = conf.getint("scheduler", "dag_cache_size", fallback=1024)
dag_cache_ttl_config = conf.getint("scheduler", "dag_cache_ttl", fallback=10800)
Comment on lines +350 to +351

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Therer is already a default in the conf. I wouldn't put a fallback too.

Suggested change
dag_cache_size = conf.getint("scheduler", "dag_cache_size", fallback=1024)
dag_cache_ttl_config = conf.getint("scheduler", "dag_cache_ttl", fallback=10800)
dag_cache_size = conf.getint("scheduler", "dag_cache_size")
dag_cache_ttl_config = conf.getint("scheduler", "dag_cache_ttl")


self.scheduler_dag_bag = DBDagBag(load_op_links=False)
if dag_cache_size < 0:

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor inconsistency: API server uses cache_size <= 0 → unbounded; scheduler uses < 0 → warn+0, == 0 → unbounded. Both end at "0 = unbounded," so behavior matches, just expressed differently.

self.log.warning("scheduler dag_cache_size must be >= 0, using unbounded dict")
dag_cache_size = 0

if dag_cache_ttl_config < 0:
self.log.warning("scheduler dag_cache_ttl must be >= 0, disabling TTL")
dag_cache_ttl_config = 0

dag_cache_ttl = dag_cache_ttl_config if dag_cache_ttl_config > 0 else None

self.scheduler_dag_bag = DBDagBag(
load_op_links=False,
cache_size=dag_cache_size,
cache_ttl=dag_cache_ttl,
Comment thread
Mady356 marked this conversation as resolved.
stats_prefix="scheduler.dag_bag",
Comment on lines +363 to +367

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This enables the bounded LRU/TTL cache for the scheduler, but the scheduler's access pattern is the one case where a count-based cap backfires, so I think the approach needs a rethink before merge.

#60804 deliberately left the scheduler on the unbounded dict (its description: "The scheduler continues using a plain unbounded dict with zero lock overhead") and enabled the bounded cache for the API server only, because the access profiles differ:

  • get_dag_for_run() is called for essentially every running DagRun the scheduler processes.
  • DagRun.get_running_dag_runs_to_examine() orders by last_scheduling_decision (least-recently-scheduled first), so across consecutive loops the scheduler round-robins through all running runs. The per-loop lru_cache() wrappers only dedupe within a single loop; the persistent cross-loop cache is this DBDagBag.

A cyclic sweep over N distinct dag_version_ids against an LRU/TTL cache of maxsize M < N is the sequential-flooding case: each key is evicted just before its next access, so the hit rate collapses toward zero once N > M. Every miss then pays session.get(DagVersion, ..., joinedload(serialized_dag)) plus a full SerializedDAG deserialization on the scheduler hot path. The deployments that hit this OOM are the large ones where active versions exceed 1024, so as written the default puts them straight into that regime. (Same concern Pierre raised on the default, but it's really about the eviction mechanism, not just the number.)

The leak here is superseded dag_version_ids accumulating: once a version stops being referenced by running runs, it's never looked up again. TTL eviction targets exactly those, while the refresh-on-revalidation write-back in _get_dag keeps the hot active set resident regardless of its size. So TTL-driven eviction (default dag_cache_size=0, or a safety-valve cap well above realistic active-version counts, with the TTL doing the real bounding) fixes the reported growth without the thrash. Note too that a count cap bounds cardinality, not bytes -- 1024 large serialized DAGs can still be hundreds of MB, whereas memray measured bytes retained.

Minor, worth noting in the description: enabling the cache also flips the scheduler from nullcontext to a real RLock per _get_dag, which #60804 explicitly chose to avoid. Cheap when uncontended, so not blocking, but it reverses a documented decision.

@aeroyorch aeroyorch Jun 27, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @kaxil. I agree with your analysis for the scheduler use case. Before opening #69001, I went back and read through the design decisions in #60804 because I expected we should first discuss the right approach before implementing (or not) anything.

One thing I wasn't completely sure about from that discussion is whether the intended solution for the scheduler was simply to rely on num_runs and periodically restart the scheduler. If that's the recommended mitigation, I think it would be worth documenting somewhere, since it's not immediately obvious.

If we do want to introduce cache eviction for the scheduler, defaulting dag_cache_size to 0 (unbounded) and relying only on TTL eviction seems like a conservative choice. That addresses the leak of superseded dag_version_id while avoiding the LRU thrashing concerns you described for large deployments.

Regarding the RLock overhead mentioned in #60804, I'm not sure whether the performance impact is significant enough in practice to justify keeping an unbounded dict in the scheduler forever.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes a lot of sense and I understand why a low count-based LRU default is risky for the scheduler: if the active dag_version_id working set is larger than the cache size, the scheduler can end up thrashing and repeatedly paying the DB read/deserialization cost.

My current thinking is to revise this so scheduler eviction is TTL-driven by default instead of size-cap driven. Concretely, that would mean defaulting scheduler dag_cache_size to 0, keeping dag_cache_ttl set, and updating DBDagBag so a non-zero TTL can evict entries even when there is no count-based maxsize. That should target the superseded dag_version_id growth without evicting the active cyclic working set.

I’d also remove the redundant conf fallbacks and add tests for the non-positive cache size / TTL path.

Does that direction sound reasonable before I rework the PR?

)

# Set of (dag_id, asset_name, asset_uri) tuples for trigger policies that
# are permanently unreachable for the rollup window's cardinality — the
Expand Down
20 changes: 11 additions & 9 deletions airflow-core/src/airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ class DBDagBag:
Internal class for retrieving dags from the database.

Optionally supports LRU+TTL caching when cache_size is provided.
The scheduler uses this without caching, while the API server can
enable caching via configuration.
Callers can enable bounded caching by passing cache_size and cache_ttl.

:meta private:
"""
Expand All @@ -74,15 +73,18 @@ def __init__(
load_op_links: bool = True,
cache_size: int | None = None,
cache_ttl: int | None = None,
stats_prefix: str = "api_server.dag_bag",
Comment thread
Mady356 marked this conversation as resolved.
) -> None:
"""
Initialize DBDagBag.

:param load_op_links: Should the extra operator link be loaded when de-serializing the DAG?
:param cache_size: Size of LRU cache. If None or 0, uses unbounded dict (no eviction).
:param cache_ttl: Time-to-live for cache entries in seconds. If None or 0, no TTL (LRU only).
:param stats_prefix: Prefix for cache-related metrics emitted by this DBDagBag.
"""
self.load_op_links = load_op_links
self._stats_prefix = stats_prefix
self._dags: MutableMapping[UUID | str, _CacheEntry] = {}
self._use_cache = False

Expand Down Expand Up @@ -111,7 +113,7 @@ def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
self._dags[serdag.dag_version_id] = _CacheEntry(dag, serdag.dag_hash, time.monotonic())
cache_size = len(self._dags)
if self._use_cache:
stats.gauge("api_server.dag_bag.cache_size", cache_size, rate=0.1)
stats.gauge(f"{self._stats_prefix}.cache_size", cache_size, rate=0.1)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the calls to f"{self._stats_prefix}.cache_*" makes check_metrics_synced_with_the_registry see {_stats_prefix}.cache_hit, {_stats_prefix}.cache_miss, etc. as missing from the registry. The runtime metric split makes sense, but the metric names need to stay representable to the registry checker, or the checker/registry needs to support this pattern.

return dag

@staticmethod
Expand All @@ -134,7 +136,7 @@ def _get_dag(self, version_id: UUID | str, session: Session) -> SerializedDAG |
# cannot have gone stale yet -- serve it without touching the DB.
if now - cached.last_validated < self._revalidation_interval:
if self._use_cache:
stats.incr("api_server.dag_bag.cache_hit")
stats.incr(f"{self._stats_prefix}.cache_hit")
return cached.dag
# Past the window: a version may have been updated in place (same dag_version_id, new
# content + new dag_hash) by SerializedDagModel.write_dag, so confirm the cached copy
Expand All @@ -149,7 +151,7 @@ def _get_dag(self, version_id: UUID | str, session: Session) -> SerializedDAG |
if current is not None and current.dag_hash == cached.dag_hash:
self._dags[version_id] = current._replace(last_validated=now)
if self._use_cache:
stats.incr("api_server.dag_bag.cache_hit")
stats.incr(f"{self._stats_prefix}.cache_hit")
return cached.dag
# Stale (updated in place) or the version no longer exists: drop and reload below.
with self._lock:
Expand All @@ -169,9 +171,9 @@ def _get_dag(self, version_id: UUID | str, session: Session) -> SerializedDAG |
if self._use_cache:
with self._lock:
if (cached := self._dags.get(version_id)) is not None:
stats.incr("api_server.dag_bag.cache_hit")
stats.incr(f"{self._stats_prefix}.cache_hit")
return cached.dag
stats.incr("api_server.dag_bag.cache_miss")
stats.incr(f"{self._stats_prefix}.cache_miss")
return self._read_dag(serdag)

def get_dag(self, version_id: UUID | str, session: Session) -> SerializedDAG | None:
Expand Down Expand Up @@ -203,8 +205,8 @@ def clear_cache(self) -> int:
self._dags.clear()

if self._use_cache:
stats.incr("api_server.dag_bag.cache_clear")
stats.gauge("api_server.dag_bag.cache_size", 0)
stats.incr(f"{self._stats_prefix}.cache_clear")
stats.gauge(f"{self._stats_prefix}.cache_size", 0)
return count

@staticmethod
Expand Down
18 changes: 18 additions & 0 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,24 @@ def test_heartrate(self, heartrate):
_ = SchedulerJobRunner(job=scheduler_job, executors=[self.null_exec])
assert scheduler_job.heartrate == heartrate

@patch("airflow.jobs.scheduler_job_runner.DBDagBag")
def test_scheduler_dag_bag_uses_scheduler_cache_config(self, mock_db_dag_bag):

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing test for < 0 for cache size -> raise warning + unbounded

with conf_vars(
{
("scheduler", "dag_cache_size"): "123",
("scheduler", "dag_cache_ttl"): "456",
}
):
scheduler_job = Job()
SchedulerJobRunner(job=scheduler_job, executors=[self.null_exec])

mock_db_dag_bag.assert_called_once_with(
load_op_links=False,
cache_size=123,
cache_ttl=456,
stats_prefix="scheduler.dag_bag",
)

def test_no_orphan_process_will_be_left(self):
current_process = psutil.Process()
old_children = current_process.children(recursive=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,24 @@ metrics:
legacy_name: "-"
name_variables: []

- name: "scheduler.dag_bag.cache_hit"
description: "Number of cache hits when retrieving SerializedDAG from DBDagBag in the scheduler"
type: "counter"
legacy_name: "-"
name_variables: []

- name: "scheduler.dag_bag.cache_miss"
description: "Number of cache misses when retrieving SerializedDAG from DBDagBag in the scheduler"
type: "counter"
legacy_name: "-"
name_variables: []

- name: "scheduler.dag_bag.cache_clear"
description: "Number of times the DBDagBag cache was cleared in the scheduler"
type: "counter"
legacy_name: "-"
name_variables: []

- name: "connection_test.success"
description: "Number of worker-dispatched connection tests that completed successfully."
type: "counter"
Expand Down Expand Up @@ -379,6 +397,12 @@ metrics:
legacy_name: "-"
name_variables: []

- name: "scheduler.dag_bag.cache_size"
description: "Number of SerializedDAG objects currently cached in DBDagBag in the scheduler"
type: "gauge"
legacy_name: "-"
name_variables: []

- name: "connection_test.active"
description: "Number of connection tests currently in flight (``queued`` + ``running``), sampled by the
scheduler each tick."
Expand Down