Add ExecuteCallback support to AWS ECS Executor#63657
Add ExecuteCallback support to AWS ECS Executor#63657shivaam wants to merge 2 commits intoapache:mainfrom
Conversation
f3156ea to
4728b66
Compare
|
@ferruzzi Draft PR |
38ddfea to
6ac2634
Compare
0372a19 to
a93048e
Compare
| failure_count, | ||
| ) | ||
| self.fail(task_key) | ||
| self.fail(cast("TaskInstanceKey", task_key)) |
There was a problem hiding this comment.
Once #65392 merges I think you can drop this and the other places you used cast(). Same for the places where you used isinstance(task_key, tuple)
6a1c191 to
a543ec3
Compare
|
@ferruzzi Thanks for the review! I have updated the Pr and addressed all your comments.
|
ea660cf to
28b791f
Compare
| if AIRFLOW_V_3_3_PLUS: | ||
| self.queued_callbacks: dict[str, workloads.ExecuteCallback] = {} |
There was a problem hiding this comment.
This is redundant since BaseExecutor should be initializing that now
| raise RuntimeError(f"{type(self)} cannot handle workloads of type {type(w)}") | ||
| elif AIRFLOW_V_3_3_PLUS and isinstance(workload, workloads.ExecuteCallback): | ||
| command = [workload] # type: ignore[list-item] | ||
| key = workload.callback.id # type: ignore[assignment] |
There was a problem hiding this comment.
Here and elsewhere: it's semantically the same thing, but use workload.callback.key as a key please, it will makes things more uniform
| from sqlalchemy.orm import Session | ||
|
|
||
| from airflow.executors import workloads | ||
| from airflow.executors.workloads.types import WorkloadKey |
There was a problem hiding this comment.
This will need to be wrapped in a version check. The user could be running latest provider package with core 3.0 and this will blow up. It'll be something like
if TYPE_CHECKING:
if AIRFLOW_V_3_3_PLUS:
from airflow.executors.workloads.types import WorkloadKey
else:
WorkloadKey: TypeAlias = TaskInstanceKey # type: ignore[no-redef, misc]Enables the ECS executor to dispatch ExecuteCallback workloads (deadline alerts) alongside regular ExecuteTask workloads. Builds on apache#65392 which widened BaseExecutor signatures to accept WorkloadKey. - supports_callbacks = True (gated on AIRFLOW_V_3_3_PLUS) - Widen key types to WorkloadKey throughout EcsQueuedTask / EcsTaskCollection - Branch _process_workloads on ExecuteTask vs ExecuteCallback - Add AIRFLOW_V_3_3_PLUS to version_compat.py - Unit tests for queueing, processing, serialization, sync, mixed keys
08bb01a to
726fbd1
Compare
Renames (mirrors the merged Lambda callback PR — straight rename, no shim, executor-internal surface): sync_running_tasks -> sync_running_workloads attempt_task_runs -> attempt_workload_runs pending_tasks (attr) -> pending_workloads __update_running_task -> __update_running_workload __handle_failed_task -> __handle_failed_workload Fix CI on older Airflow compat tests: - Restore queue_workload() override. Airflow 3.3+ BaseExecutor routes ExecuteCallback natively, but pre-3.3 raises ValueError for anything not ExecuteTask. Override works across versions. - Import AIRFLOW_V_3_3_PLUS from tests_common (main bumped to 3.3). check-airflow-v-imports-in-tests hook disallows provider-internal version_compat imports from test files.
726fbd1 to
233ec6b
Compare
|
|
||
| supports_multi_team: bool = True | ||
|
|
||
| if AIRFLOW_V_3_3_PLUS: |
There was a problem hiding this comment.
You should not need this? You should be able to declare this variable regardless o the Airflow version?
Implements executor callback support for the AWS ECS Executor.
Sibling to merged Lambda PR #63035; builds on #62645 (merged) and #65392 (merged).
related: #62887
Changes
supports_callbacks = Truegated onAIRFLOW_V_3_3_PLUSso base executor routesExecuteCallbackto ECS.TaskInstanceKeytoWorkloadKeyinecs_executor.py,EcsQueuedTask,EcsTaskCollection.queue_workload()override branches onExecuteTaskvsExecuteCallback(kept until min Airflow version reaches 3.3, after whichBaseExecutor.queue_workloadroutes both natively; mirrors merged Lambda). No tracking issue yet — happy to open one for the override removal as a follow-up._process_workloads()dispatches both workload types viaexecute_async().AirflowProviderDeprecationWarningshims for the public ones (matches merged Lambda):sync_running_tasks→sync_running_workloads,attempt_task_runs→attempt_workload_runs,pending_tasks→pending_workloads__update_running_task→__update_running_workload,__handle_failed_task→__handle_failed_workloadBaseExecutorsignatures — nocast()needed,log_task_event()skips callback keys internally.Tests
CI green: ruff, mypy, full test suite, plus compat suites on Airflow 2.11.1 / 3.0.6 / 3.1.8 / 3.2.0.
End-to-end re-verified 2026-05-02 against rebased main on real ECS Fargate. Each DAG runs a regular
ExecuteTask(slow_task120s) followed by a deadlineExecuteCallback:ecs_callback_simpleecs_callback_contextecs_callback_failureRuntimeErrorecs_callback_notifier_class__call__pattern)Verified:
PENDING → QUEUED → RUNNING → SUCCESS/FAILEDfor both workload types;__handle_failed_workload+pending_workloads.append(...)reschedule callbacks correctly;log_task_eventandself.fail()with string callback keys handled correctly by #65392's base class. Callback UUIDs and ECS task ARNs from the run are available on request.Backwards compatibility
Existing task path unchanged. Callback paths gated behind
AIRFLOW_V_3_3_PLUS. Public renamed methods/attrs preserved as deprecation shims so subclassers continue to work.Was generative AI tooling used to co-author this PR?
Generated-by: Claude Code (Opus 4.7) following the guidelines