Unify executor workload queues#63491
Conversation
85bfce8 to
1ce0748
Compare
ferruzzi
left a comment
There was a problem hiding this comment.
Made a real quick pass and left some comments and questions, I'll try to get a more thorough one tomorrow.
aee94fb to
8997ee4
Compare
11ee7ef to
249b014
Compare
0f6f172 to
35a927f
Compare
1a0f949 to
b6b161f
Compare
ferruzzi
left a comment
There was a problem hiding this comment.
Looks like my concerns were all addressed. LGTM
b6b161f to
228e71b
Compare
|
I'm taking a look at this now. |
ashb
left a comment
There was a problem hiding this comment.
Thanks for tackling this!
Please add a newsfragment for the deprecated public API.
I'm also worried that an un-updated executor (either one of "our" ones where the user hasn't updated yet, or a custom one) would spam-the-living-day-light out of the logs by accessing the the deprecated queued_tasks property on every heartbeat. That needs addressing to only log once per class or once per instance I think.
| def queued_tasks(self) -> dict[TaskInstanceKey, Any]: | ||
| """Return queued tasks from celery and kubernetes executor.""" | ||
| return self.celery_executor.queued_tasks | self.kubernetes_executor.queued_tasks # type: ignore[return-value] | ||
| queued_tasks = self.celery_executor.queued_tasks.copy() |
There was a problem hiding this comment.
CeleryKubernetesExecutor.queued_tasks calls the deprecated BaseExecutor.queued_tasks property on both child executors, emitting RemovedInAirflow4Warning on every access. Since this file is already being updated in this PR, please migrate these call sites to use the new API (guarded with AIRFLOW_V_3_3_PLUS for back-compat with Airflow <3.3).
There was a problem hiding this comment.
This executor raises RuntimeError on Airflow 3.0+ (line 80), so this code path is unreachable on any version where executor_queues exists. We shouldn't be changing these files any more than is strictly needed to keep CI happy
| self.team_name: str | None = team_name | ||
| self.queued_tasks: dict[TaskInstanceKey, workloads.ExecuteTask] = {} | ||
| self.queued_callbacks: dict[str, workloads.ExecuteCallback] = {} | ||
| self.executor_queues: dict[str, dict[WorkloadKey, QueueableWorkload]] = defaultdict(dict) |
There was a problem hiding this comment.
Could this be a flat dict[WorkloadKey, QueueableWorkload] instead of a dict-of-dicts?
_get_workloads_to_schedule immediately flattens all sub-dicts into a single list and then sorts by (WORKLOAD_TYPE_PRIORITY, sort_key) — so priority ordering (callbacks before tasks, higher-weight tasks first) is entirely in the sort step, not in the dict structure. A flat dict would produce identical scheduling behaviour.
The only load-bearing use of the sub-dict grouping is the deprecated queued_tasks/queued_callbacks compat properties — which are on their way out. Every type-keyed deletion in providers (del self.executor_queues[WorkloadType.EXECUTE_TASK][key]) could be a plain del flat_dict[key] since WorkloadKey is unique across types.
A flat dict would simplify CeleryKubernetesExecutor.queued_tasks (no sub-dict merging), make provider-side deletion uniform, and remove the defaultdict(dict) nesting.
There was a problem hiding this comment.
FWIW, the gate on line 80 of CeleryKubernetesExecutor throws a RuntimeError if Airflow version is >= 3.0, so I don't think "simplifying CeleryKubernetesExecutor" needs to factor into this decision.
That said, a flat dict isn't a bad idea in principle and likely would have been a cleaner choice in the first place. My main concern is mostly practical: Lambda (#63035), Celery (#63888), and Batch (#62984) have already merged using the executor_queues[WorkloadType.X][key] pattern, and ECS (#63657), K8s (#63454), and Edge (#63498) are all in progress implementing the same pattern. Flattening now would mean reworking all six executor implementations in addition to the changes that it would require in this PR. I'm not sure we really gain anything for that work?
How would you feel about adding a TODO to flatten it when the compat properties are removed (in 4.0??) At that point the nested structure loses its main justification anyway. Does that seem reasonable, or is that just punting the same work to Future Us?
228e71b to
36bb7b9
Compare
|
Thank you so much @ashb and @ferruzzi for the review! Latest push:
Two threads I'd love your steer on:
Would like to request your re-review. Will follow whichever way you'd both recommend. Thanks! |
Was generative AI tooling used to co-author this PR?
Summary
Refactors executor workload queue management for extensibility. No behavioral change , scheduling order, slot accounting, and all provider executors work identically to before.
Follows the direction proposed by @ferruzzi #62343 (comment).
Problem
Adding a new workload type (like ExecuteCallback or TestConnection) required touching ~6 places in BaseExecutor: a new queue dict, a new
supports_*flag,slots calculation, an isinstance branch in queue_workload, a dedicated scheduling method, and isinstance branches in dequeue/trigger logic. Each provider executor that overrodequeue_workloadalso needed updating. This made extending the executor interface unnecessarily painful.What this does
Replaces the per-type queue dicts and boolean capability flags with three simple primitives:
The base class queue_workload is now generic: validate the type, store by key. Four provider executors (K8s, ECS, Batch, Lambda) no longer need their own queue_workload overrides. trigger_tasks becomes trigger_workloads since it handles all workload types now.
Adding a new workload type after this refactor
No changes needed in BaseExecutor itself.
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.