Add ExecuteCallback support to Edge Executor#63498
Add ExecuteCallback support to Edge Executor#63498wjddn279 wants to merge 13 commits intoapache:mainfrom
Conversation
|
@jscheffl @dheerajturaga @ferruzzi Hello! Following the multi-team work, I've been assigned to add functionality to the edge executor. As before, I've opened a draft PR first to discuss the direction before starting development. This task involves changing the Deadline Alerts callback work so that it can be executed on the worker. The edge_executor is currently structured to directly update the task list in the
All three approaches could work without any functional issues. I'd love to hear your thoughts. |
Thanks for raising a short check. Actually have not thought about this and have no strong opinion developed. An callback also is belonging to a Dag, Task, correct? So these details still could be used to populate details? Note that the table structure and PK is originating from the time of Airflow 2 before a Task had a UUID. Therefore Dag_id, Task_id and Run_id were primary keys as it was the job PK. I matter of change I'd be OK to change the DB layout and switch to Task instance UUID or some other artificial UUID would also be OK and keeping the existing fields informational. In this case if these are not PK anymore you could fill will empty or a placeholder if not relevant/existing. But this would also impact REST API services, maybe some adjustment there are also needed (e.g. /jobs/state/...) as they carry the existing UUID. Note that Core and worker might a bit diverging so if REST API is changed at least the existing endpoints need to be kept for compatibility. If technically not tooo much overhead I'd slightly prefer a single table for all, then only 1 query is needed to poll for jobs. Other opinions? |
|
Yes, I have no objection to keeping it as a single table. I generally agree with the opinion you've provided, and I'll implement the details and discuss further from there. Thank you! |
Cool! @dheerajturaga + @ferruzzi still other opinions welcome :-D |
|
Edge is more or less your baby, so I'm going to defer to whatever you think here.
As of right now, all deadline callbacks are tied to a dagrun. They are created and cleaned up as part of the dagrun lifecycle, so that means they obviously also have access to dag, dagbundle, serialized_dag, etc but not to Task. That may (and likely will?) change in the future, but that's what's active right now. One thing to note is that there is a lot of work going on to unify TaskInstance and ExedcutorCallback into a shared interface we're calling Workload. So pretty much anywhere that had TaskInstanceKey or something like that would ideally be migrated to use Edit: Jens - I know you are aware of the other one and have reviewed it, that was more just a general comment for anyone who comes across this and wants context. |
2419908 to
ff72b46
Compare
1ac699c to
7e1bc89
Compare
7e1bc89 to
9b3c5a4
Compare
9b3c5a4 to
9d1b11d
Compare
089414b to
ccda321
Compare
ccda321 to
2fec49c
Compare
|
|
||
| command: Annotated[ | ||
| ExecuteTask, | ||
| ExecuteTypeBody, |
There was a problem hiding this comment.
Sorry, maybe coming a bit late during second pass: So far the API was always backwards compatible, means if you have a Edge worker with a previous version running it would happily continue to run and softly "drain" if there is a provider or Airflow core version mis-match.
Now here the return type for job fetching changes, means if the worker would attempt to fetch a job with an old version, the job would be assigned but the worker (probably?) fails in de-serializing the content and fails. Probably before draining.
Have you tested this and how does it behave?
(Compared to Task SDK there is currently no Cadwyn layer for versioning, this is in the backlog...)
If this is a problem can we make it somehow compatible or make a parallel endpoint such that it is not failing on old clients until they drain and jobs are not pulled and never executed?
jscheffl
left a comment
There was a problem hiding this comment.
Sorry in the second pass review came along one problem.
Additionally can I kindly request that you add one example /test callback into the integration_test.py Dag in providers/edge3/src/airflow/providers/edge3/example_dags/integration_test.py which serves as a (currently manual) system test?
|
Yes — I found a few issues while testing locally. I'll apply the fixes and request another review. |
|
pending until #66141 |
related: #62887
NOTE
Make draft pr to discuss direction of implementation with code owners
Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.