Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start porting mapped task to SDK #45627

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

ashb
Copy link
Member

@ashb ashb commented Jan 13, 2025

This PR restructures the Mapped Operator and Mapped Task Group code to live in
the Task SDK at definition time.

The big thing this change does not do is make it possible to execute mapped
tasks via the Task Execution API server etc -- that is up next (#44360).

There were some un-avoidable changes to the scheduler/expansion part of mapped
tasks here. Of note:

BaseOperator.get_mapped_ti_count has moved from an instance method on
BaseOperator to be a class method. The reason for this was that with the move
of more and more of the "definition time" code into the TaskSDK BaseOperator
and AbstractOperator it is no longer possible to add DB-accessing code to a
base class and have it apply to the subclasses. (i.e.
airflow.models.abstractoperator.AbstractOperator is now not always in the
MRO for tasks. Eventually that class will be deleted, but not yet)

On a similar vein XComArg's get_task_map_length is also moved to a single
dispatch class method on the TaskMap model since now the definition time
objects live in the TaskSDK, and there is no realistic way to get a per-type
subclass with DB logic (i.e. it's very complex to end up with a
PlainDBXComArg, a MapDBXComArg, etc. that we can attach the method too)

For those who aren't aware, singledispatch (and singledispatchmethod) are a
part of the standard library when the type of the first argument is used to
determine which implementation to call. If you are familiar with C++ or Java
this is very similar to method overloading, the one caveat is that it only
examines the type of the first argument, not the full signature.

The long term goal here is to have a clean separation between "runtime/definition time" behaviour (i.e. creating mapped tasks, or running a mapped task) and expanding a mapped task (which is a scheduling-time operation only)


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@ashb ashb added the area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK label Jan 13, 2025
@ashb ashb requested a review from uranusjr as a code owner January 13, 2025 21:26
@ashb
Copy link
Member Author

ashb commented Jan 13, 2025

Mypy is seriously unhappy. Oh well

@ashb ashb force-pushed the mapped-tasks-to-task-sdk branch from 6afcde8 to 7178c24 Compare January 13, 2025 22:15
@ashb ashb force-pushed the mapped-tasks-to-task-sdk branch from 7178c24 to 29e8600 Compare January 13, 2025 22:48
@ashb
Copy link
Member Author

ashb commented Jan 13, 2025

Oh also singlediaptch and singledispathmethod don't play great with type hints in 3.9. Worked around that easily enough now though.

@ashb ashb requested review from kaxil and removed request for bolkedebruin and XD-DENG January 13, 2025 22:50
from airflow.sdk.definitions.dag import DAG
from airflow.sdk.definitions.mappedoperator import ValidationSource
Copy link
Member

@kaxil kaxil Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated: ValidationSource is a weird name but it existed before the PR anyway

Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass, will do a more detailed look in an hour

airflow/models/taskmap.py Outdated Show resolved Hide resolved
airflow/models/xcom_arg.py Outdated Show resolved Hide resolved
task_sdk/src/airflow/sdk/definitions/baseoperator.py Outdated Show resolved Hide resolved
task_sdk/src/airflow/sdk/definitions/mappedoperator.py Outdated Show resolved Hide resolved
task_sdk/src/airflow/sdk/definitions/mappedoperator.py Outdated Show resolved Hide resolved
task_sdk/src/airflow/sdk/definitions/mappedoperator.py Outdated Show resolved Hide resolved
task_sdk/src/airflow/sdk/definitions/mappedoperator.py Outdated Show resolved Hide resolved
task_sdk/src/airflow/sdk/definitions/mappedoperator.py Outdated Show resolved Hide resolved
airflow/models/abstractoperator.py Outdated Show resolved Hide resolved
airflow/models/baseoperator.py Outdated Show resolved Hide resolved
airflow/models/xcom_arg.py Outdated Show resolved Hide resolved
airflow/models/xcom_arg.py Show resolved Hide resolved
airflow/models/xcom_arg.py Show resolved Hide resolved
Copy link
Member

@kaxil kaxil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few comments but the code looks good, minor adjustments needed to get tests passing

@ashb ashb force-pushed the mapped-tasks-to-task-sdk branch 6 times, most recently from 5b2702f to 8967c4b Compare January 17, 2025 16:41
@ashb ashb force-pushed the mapped-tasks-to-task-sdk branch from 8967c4b to 6f57645 Compare January 17, 2025 17:17
@ashb ashb requested a review from mobuchowski as a code owner January 18, 2025 10:09
@ashb ashb force-pushed the mapped-tasks-to-task-sdk branch from a01009c to 6689925 Compare January 18, 2025 10:10
This PR restructures the Mapped Operator and Mapped Task Group code to live in
the Task SDK at definition time.

The big thing this change _does not do_ is make it possible to execute mapped
tasks via the Task Execution API server etc -- that is up next.

There were some un-avoidable changes to the scheduler/expansion part of mapped
tasks here. Of note:

`BaseOperator.get_mapped_ti_count` has moved from an instance method on
BaseOperator to be a class method. The reason for this was that with the move
of more and more of the "definition time" code into the TaskSDK BaseOperator
and AbstractOperator it is no longer possible to add DB-accessing code to a
base class and have it apply to the subclasses. (i.e.
`airflow.models.abstractoperator.AbstractOperator` is now _not always_ in the
MRO for tasks. Eventually that class will be deleted, but not yet)

On a similar vein XComArg's `get_task_map_length` is also moved to a single
dispatch class method on the TaskMap model since now the definition time
objects live in the TaskSDK, and there is no realistic way to get a per-type
subclass with DB logic (i.e. it's very complex to end up with a
PlainDBXComArg, a MapDBXComArg, etc. that we can attach the method too)

For those who aren't aware, singledispatch (and singledispatchmethod) are a
part of the standard library when the type of the first argument is used to
determine which implementation to call. If you are familiar with C++ or Java
this is very similar to method overloading, the one caveat is that it _only_
examines the type of the first argument, not the full signature.
@ashb ashb force-pushed the mapped-tasks-to-task-sdk branch from 6689925 to 5992524 Compare January 18, 2025 10:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:serialization area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK area:task-sdk
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants