Use bounded DBDagBag cache in scheduler#69007
Conversation
pierrejeambrun
left a comment
There was a problem hiding this comment.
LGTM, a few nits / suggestions, but nothing blocking
| version_added: 3.3.0 | ||
| type: integer | ||
| example: ~ | ||
| default: "1024" |
There was a problem hiding this comment.
This default might be low. That's the number of versions accross dags. Having more than 1024 dags can be common. Trading the memory issue for repeated db read.
It's configurable anyway, but maybe a bigger default can be better suited.
| dag_cache_size = conf.getint("scheduler", "dag_cache_size", fallback=1024) | ||
| dag_cache_ttl_config = conf.getint("scheduler", "dag_cache_ttl", fallback=10800) |
There was a problem hiding this comment.
Therer is already a default in the conf. I wouldn't put a fallback too.
| dag_cache_size = conf.getint("scheduler", "dag_cache_size", fallback=1024) | |
| dag_cache_ttl_config = conf.getint("scheduler", "dag_cache_ttl", fallback=10800) | |
| dag_cache_size = conf.getint("scheduler", "dag_cache_size") | |
| dag_cache_ttl_config = conf.getint("scheduler", "dag_cache_ttl") |
| dag_cache_ttl_config = conf.getint("scheduler", "dag_cache_ttl", fallback=10800) | ||
|
|
||
| self.scheduler_dag_bag = DBDagBag(load_op_links=False) | ||
| if dag_cache_size < 0: |
There was a problem hiding this comment.
Minor inconsistency: API server uses cache_size <= 0 → unbounded; scheduler uses < 0 → warn+0, == 0 → unbounded. Both end at "0 = unbounded," so behavior matches, just expressed differently.
| assert scheduler_job.heartrate == heartrate | ||
|
|
||
| @patch("airflow.jobs.scheduler_job_runner.DBDagBag") | ||
| def test_scheduler_dag_bag_uses_scheduler_cache_config(self, mock_db_dag_bag): |
There was a problem hiding this comment.
Missing test for < 0 for cache size -> raise warning + unbounded
| self.scheduler_dag_bag = DBDagBag( | ||
| load_op_links=False, | ||
| cache_size=dag_cache_size, | ||
| cache_ttl=dag_cache_ttl, | ||
| stats_prefix="scheduler.dag_bag", |
There was a problem hiding this comment.
This enables the bounded LRU/TTL cache for the scheduler, but the scheduler's access pattern is the one case where a count-based cap backfires, so I think the approach needs a rethink before merge.
#60804 deliberately left the scheduler on the unbounded dict (its description: "The scheduler continues using a plain unbounded dict with zero lock overhead") and enabled the bounded cache for the API server only, because the access profiles differ:
get_dag_for_run()is called for essentially every running DagRun the scheduler processes.DagRun.get_running_dag_runs_to_examine()orders bylast_scheduling_decision(least-recently-scheduled first), so across consecutive loops the scheduler round-robins through all running runs. The per-looplru_cache()wrappers only dedupe within a single loop; the persistent cross-loop cache is thisDBDagBag.
A cyclic sweep over N distinct dag_version_ids against an LRU/TTL cache of maxsize M < N is the sequential-flooding case: each key is evicted just before its next access, so the hit rate collapses toward zero once N > M. Every miss then pays session.get(DagVersion, ..., joinedload(serialized_dag)) plus a full SerializedDAG deserialization on the scheduler hot path. The deployments that hit this OOM are the large ones where active versions exceed 1024, so as written the default puts them straight into that regime. (Same concern Pierre raised on the default, but it's really about the eviction mechanism, not just the number.)
The leak here is superseded dag_version_ids accumulating: once a version stops being referenced by running runs, it's never looked up again. TTL eviction targets exactly those, while the refresh-on-revalidation write-back in _get_dag keeps the hot active set resident regardless of its size. So TTL-driven eviction (default dag_cache_size=0, or a safety-valve cap well above realistic active-version counts, with the TTL doing the real bounding) fixes the reported growth without the thrash. Note too that a count cap bounds cardinality, not bytes -- 1024 large serialized DAGs can still be hundreds of MB, whereas memray measured bytes retained.
Minor, worth noting in the description: enabling the cache also flips the scheduler from nullcontext to a real RLock per _get_dag, which #60804 explicitly chose to avoid. Cheap when uncontended, so not blocking, but it reverses a documented decision.
There was a problem hiding this comment.
Hi @kaxil. I agree with your analysis for the scheduler use case. Before opening #69001, I went back and read through the design decisions in #60804 because I expected we should first discuss the right approach before implementing (or not) anything.
One thing I wasn't completely sure about from that discussion is whether the intended solution for the scheduler was simply to rely on num_runs and periodically restart the scheduler. If that's the recommended mitigation, I think it would be worth documenting somewhere, since it's not immediately obvious.
If we do want to introduce cache eviction for the scheduler, defaulting dag_cache_size to 0 (unbounded) and relying only on TTL eviction seems like a conservative choice. That addresses the leak of superseded dag_version_id while avoiding the LRU thrashing concerns you described for large deployments.
Regarding the RLock overhead mentioned in #60804, I'm not sure whether the performance impact is significant enough in practice to justify keeping an unbounded dict in the scheduler forever.
There was a problem hiding this comment.
That makes a lot of sense and I understand why a low count-based LRU default is risky for the scheduler: if the active dag_version_id working set is larger than the cache size, the scheduler can end up thrashing and repeatedly paying the DB read/deserialization cost.
My current thinking is to revise this so scheduler eviction is TTL-driven by default instead of size-cap driven. Concretely, that would mean defaulting scheduler dag_cache_size to 0, keeping dag_cache_ttl set, and updating DBDagBag so a non-zero TTL can evict entries even when there is no count-based maxsize. That should target the superseded dag_version_id growth without evicting the active cyclic working set.
I’d also remove the redundant conf fallbacks and add tests for the non-positive cache size / TTL path.
Does that direction sound reasonable before I rework the PR?
| cache_size = len(self._dags) | ||
| if self._use_cache: | ||
| stats.gauge("api_server.dag_bag.cache_size", cache_size, rate=0.1) | ||
| stats.gauge(f"{self._stats_prefix}.cache_size", cache_size, rate=0.1) |
There was a problem hiding this comment.
Changing the calls to f"{self._stats_prefix}.cache_*" makes check_metrics_synced_with_the_registry see {_stats_prefix}.cache_hit, {_stats_prefix}.cache_miss, etc. as missing from the registry. The runtime metric split makes sense, but the metric names need to stay representable to the registry checker, or the checker/registry needs to support this pattern.
Use bounded
DBDagBagcaching in the scheduler.The scheduler was creating
DBDagBag(load_op_links=False)without cache settings, which meant it used the default unbounded dictionary for cached deserialized DAGs. This PR makes the scheduler use the existingDBDagBagLRU/TTL cache support instead.Changes:
[scheduler] dag_cache_sizeand[scheduler] dag_cache_ttlconfig options.SchedulerJobRunnerto pass those config values intoDBDagBag.stats_prefixparameter toDBDagBagso scheduler cache metrics are emitted underscheduler.dag_baginstead of the API server prefix.DBDagBagclass docstring to reflect that callers can enable bounded caching.DBDagBag.closes: #69001
Testing: