From 1adf575346f7d5a1db651da779d959af5f6b0099 Mon Sep 17 00:00:00 2001 From: Maadhavan Gupta Date: Fri, 26 Jun 2026 01:42:11 +0000 Subject: [PATCH 1/3] Use bounded DBDagBag cache in scheduler --- .../src/airflow/config_templates/config.yml | 18 ++++++++++++++++++ .../src/airflow/jobs/scheduler_job_runner.py | 18 +++++++++++++++++- airflow-core/src/airflow/models/dagbag.py | 3 +-- .../tests/unit/jobs/test_scheduler_job.py | 17 +++++++++++++++++ 4 files changed, 53 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 39423755a6841..5ee762050ef22 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -2743,6 +2743,24 @@ scheduler: type: integer default: "20" see_also: ":ref:`scheduler:ha:tunables`" + dag_cache_size: + description: | + Size of the LRU cache for SerializedDAG objects in the scheduler. + Set to 0 to use an unbounded dict with no eviction. + The cache is keyed by Dag version ID. + version_added: 3.3.0 + type: integer + example: ~ + default: "1024" + dag_cache_ttl: + description: | + Time-to-live in seconds for cached SerializedDAG objects in the scheduler. + After this time, cached DAGs will be re-fetched from the database on next access. + Set to 0 to disable TTL, so entries will only be evicted by the LRU policy. + version_added: 3.3.0 + type: integer + example: ~ + default: "3600" partition_mapper_max_downstream_keys: description: | Maximum number of downstream partition keys produced by a single diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index df2abe2aabd06..9e0705615bc84 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -347,8 +347,24 @@ def __init__( if log: self._log = log + dag_cache_size = conf.getint("scheduler", "dag_cache_size", fallback=1024) + dag_cache_ttl_config = conf.getint("scheduler", "dag_cache_ttl", fallback=3600) - self.scheduler_dag_bag = DBDagBag(load_op_links=False) + if dag_cache_size < 0: + self.log.warning("scheduler dag_cache_size must be >= 0, using unbounded dict") + dag_cache_size = 0 + + if dag_cache_ttl_config < 0: + self.log.warning("scheduler dag_cache_ttl must be >= 0, disabling TTL") + dag_cache_ttl_config = 0 + + dag_cache_ttl = dag_cache_ttl_config if dag_cache_ttl_config > 0 else None + + self.scheduler_dag_bag = DBDagBag( + load_op_links=False, + cache_size=dag_cache_size, + cache_ttl=dag_cache_ttl, + ) # Set of (dag_id, asset_name, asset_uri) tuples for trigger policies that # are permanently unreachable for the rollup window's cardinality — the diff --git a/airflow-core/src/airflow/models/dagbag.py b/airflow-core/src/airflow/models/dagbag.py index c4bd8eceea102..0eefb670d8d2e 100644 --- a/airflow-core/src/airflow/models/dagbag.py +++ b/airflow-core/src/airflow/models/dagbag.py @@ -63,8 +63,7 @@ class DBDagBag: Internal class for retrieving dags from the database. Optionally supports LRU+TTL caching when cache_size is provided. - The scheduler uses this without caching, while the API server can - enable caching via configuration. + Callers can enable bounded caching by passing cache_size and cache_ttl. :meta private: """ diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index e2d5977eb86f7..019649fba066c 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -408,6 +408,23 @@ def test_heartrate(self, heartrate): _ = SchedulerJobRunner(job=scheduler_job, executors=[self.null_exec]) assert scheduler_job.heartrate == heartrate + @patch("airflow.jobs.scheduler_job_runner.DBDagBag") + def test_scheduler_dag_bag_uses_scheduler_cache_config(self, mock_db_dag_bag): + with conf_vars( + { + ("scheduler", "dag_cache_size"): "123", + ("scheduler", "dag_cache_ttl"): "456", + } + ): + scheduler_job = Job() + SchedulerJobRunner(job=scheduler_job, executors=[self.null_exec]) + + mock_db_dag_bag.assert_called_once_with( + load_op_links=False, + cache_size=123, + cache_ttl=456, + ) + def test_no_orphan_process_will_be_left(self): current_process = psutil.Process() old_children = current_process.children(recursive=True) From 4029da4f97a4d14759143ac0658d883576e1e282 Mon Sep 17 00:00:00 2001 From: Maadhavan Gupta Date: Fri, 26 Jun 2026 08:24:27 +0000 Subject: [PATCH 2/3] Add scheduler DAG bag cache metrics --- .../src/airflow/config_templates/config.yml | 2 +- .../src/airflow/jobs/scheduler_job_runner.py | 3 ++- airflow-core/src/airflow/models/dagbag.py | 17 +++++++------ .../tests/unit/jobs/test_scheduler_job.py | 1 + .../metrics/metrics_template.yaml | 24 +++++++++++++++++++ 5 files changed, 38 insertions(+), 9 deletions(-) diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 5ee762050ef22..961e017775cfe 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -2760,7 +2760,7 @@ scheduler: version_added: 3.3.0 type: integer example: ~ - default: "3600" + default: "10800" partition_mapper_max_downstream_keys: description: | Maximum number of downstream partition keys produced by a single diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 9e0705615bc84..3a8abf8ed72a0 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -348,7 +348,7 @@ def __init__( if log: self._log = log dag_cache_size = conf.getint("scheduler", "dag_cache_size", fallback=1024) - dag_cache_ttl_config = conf.getint("scheduler", "dag_cache_ttl", fallback=3600) + dag_cache_ttl_config = conf.getint("scheduler", "dag_cache_ttl", fallback=10800) if dag_cache_size < 0: self.log.warning("scheduler dag_cache_size must be >= 0, using unbounded dict") @@ -364,6 +364,7 @@ def __init__( load_op_links=False, cache_size=dag_cache_size, cache_ttl=dag_cache_ttl, + stats_prefix="scheduler.dag_bag", ) # Set of (dag_id, asset_name, asset_uri) tuples for trigger policies that diff --git a/airflow-core/src/airflow/models/dagbag.py b/airflow-core/src/airflow/models/dagbag.py index 0eefb670d8d2e..fa104a87c49f1 100644 --- a/airflow-core/src/airflow/models/dagbag.py +++ b/airflow-core/src/airflow/models/dagbag.py @@ -73,6 +73,7 @@ def __init__( load_op_links: bool = True, cache_size: int | None = None, cache_ttl: int | None = None, + stats_prefix: str = "api_server.dag_bag", ) -> None: """ Initialize DBDagBag. @@ -80,8 +81,10 @@ def __init__( :param load_op_links: Should the extra operator link be loaded when de-serializing the DAG? :param cache_size: Size of LRU cache. If None or 0, uses unbounded dict (no eviction). :param cache_ttl: Time-to-live for cache entries in seconds. If None or 0, no TTL (LRU only). + :param stats_prefix: Prefix for cache-related metrics emitted by this DBDagBag. """ self.load_op_links = load_op_links + self._stats_prefix = stats_prefix self._dags: MutableMapping[UUID | str, _CacheEntry] = {} self._use_cache = False @@ -110,7 +113,7 @@ def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None: self._dags[serdag.dag_version_id] = _CacheEntry(dag, serdag.dag_hash, time.monotonic()) cache_size = len(self._dags) if self._use_cache: - stats.gauge("api_server.dag_bag.cache_size", cache_size, rate=0.1) + stats.gauge(f"{self._stats_prefix}.cache_size", cache_size, rate=0.1) return dag @staticmethod @@ -133,7 +136,7 @@ def _get_dag(self, version_id: UUID | str, session: Session) -> SerializedDAG | # cannot have gone stale yet -- serve it without touching the DB. if now - cached.last_validated < self._revalidation_interval: if self._use_cache: - stats.incr("api_server.dag_bag.cache_hit") + stats.incr(f"{self._stats_prefix}.cache_hit") return cached.dag # Past the window: a version may have been updated in place (same dag_version_id, new # content + new dag_hash) by SerializedDagModel.write_dag, so confirm the cached copy @@ -148,7 +151,7 @@ def _get_dag(self, version_id: UUID | str, session: Session) -> SerializedDAG | if current is not None and current.dag_hash == cached.dag_hash: self._dags[version_id] = current._replace(last_validated=now) if self._use_cache: - stats.incr("api_server.dag_bag.cache_hit") + stats.incr(f"{self._stats_prefix}.cache_hit") return cached.dag # Stale (updated in place) or the version no longer exists: drop and reload below. with self._lock: @@ -168,9 +171,9 @@ def _get_dag(self, version_id: UUID | str, session: Session) -> SerializedDAG | if self._use_cache: with self._lock: if (cached := self._dags.get(version_id)) is not None: - stats.incr("api_server.dag_bag.cache_hit") + stats.incr(f"{self._stats_prefix}.cache_hit") return cached.dag - stats.incr("api_server.dag_bag.cache_miss") + stats.incr(f"{self._stats_prefix}.cache_miss") return self._read_dag(serdag) def get_dag(self, version_id: UUID | str, session: Session) -> SerializedDAG | None: @@ -202,8 +205,8 @@ def clear_cache(self) -> int: self._dags.clear() if self._use_cache: - stats.incr("api_server.dag_bag.cache_clear") - stats.gauge("api_server.dag_bag.cache_size", 0) + stats.incr(f"{self._stats_prefix}.cache_clear") + stats.gauge(f"{self._stats_prefix}.cache_size", 0) return count @staticmethod diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 019649fba066c..bcacafcd663a1 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -423,6 +423,7 @@ def test_scheduler_dag_bag_uses_scheduler_cache_config(self, mock_db_dag_bag): load_op_links=False, cache_size=123, cache_ttl=456, + stats_prefix="scheduler.dag_bag", ) def test_no_orphan_process_will_be_left(self): diff --git a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml index 6c51e32ff7798..9a71a81dc0adf 100644 --- a/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml +++ b/shared/observability/src/airflow_shared/observability/metrics/metrics_template.yaml @@ -345,6 +345,24 @@ metrics: legacy_name: "-" name_variables: [] + - name: "scheduler.dag_bag.cache_hit" + description: "Number of cache hits when retrieving SerializedDAG from DBDagBag in the scheduler" + type: "counter" + legacy_name: "-" + name_variables: [] + + - name: "scheduler.dag_bag.cache_miss" + description: "Number of cache misses when retrieving SerializedDAG from DBDagBag in the scheduler" + type: "counter" + legacy_name: "-" + name_variables: [] + + - name: "scheduler.dag_bag.cache_clear" + description: "Number of times the DBDagBag cache was cleared in the scheduler" + type: "counter" + legacy_name: "-" + name_variables: [] + - name: "connection_test.success" description: "Number of worker-dispatched connection tests that completed successfully." type: "counter" @@ -379,6 +397,12 @@ metrics: legacy_name: "-" name_variables: [] + - name: "scheduler.dag_bag.cache_size" + description: "Number of SerializedDAG objects currently cached in DBDagBag in the scheduler" + type: "gauge" + legacy_name: "-" + name_variables: [] + - name: "connection_test.active" description: "Number of connection tests currently in flight (``queued`` + ``running``), sampled by the scheduler each tick." From 5b2d41bfbf3d80e41c9f99d502409edf9991f24e Mon Sep 17 00:00:00 2001 From: Maadhavan Gupta Date: Fri, 26 Jun 2026 10:07:49 +0000 Subject: [PATCH 3/3] Set API server DAG bag stats prefix explicitly --- airflow-core/src/airflow/api_fastapi/common/dagbag.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/common/dagbag.py b/airflow-core/src/airflow/api_fastapi/common/dagbag.py index d87aca49a524a..f3a3d1830e612 100644 --- a/airflow-core/src/airflow/api_fastapi/common/dagbag.py +++ b/airflow-core/src/airflow/api_fastapi/common/dagbag.py @@ -47,12 +47,16 @@ def create_dag_bag() -> DBDagBag: # Use unbounded dict (no eviction) if cache_size is 0 if cache_size <= 0: - return DBDagBag(cache_size=0) + return DBDagBag(cache_size=0, stats_prefix="api_server.dag_bag") # Disable TTL if cache_ttl is 0 cache_ttl: int | None = cache_ttl_config if cache_ttl_config > 0 else None - return DBDagBag(cache_size=cache_size, cache_ttl=cache_ttl) + return DBDagBag( + cache_size=cache_size, + cache_ttl=cache_ttl, + stats_prefix="api_server.dag_bag", + ) def dag_bag_from_app(request: Request) -> DBDagBag: