Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dynamic task mapping into TaskSDK runtime #46032

Merged
merged 3 commits into from
Feb 6, 2025

Conversation

ashb
Copy link
Member

@ashb ashb commented Jan 25, 2025

The big change here (other than just moving code around) is to introduce a
conceptual separation between Definition/Execution time and Scheduler time.

This means that the expansion of tasks (creating the TaskInstance rows with
different map_index values) is now done on the scheduler, and we now
deserialize to different classes. For example, when we deserialize the
DictOfListsExpandInput it gets turned into an instance of
SchedulerDictOfListsExpandInput. This is primarily designed so that DB access
is kept 100% out of the TaskSDK.

Some of the changes here are on the "wat" side of the scale, and this is
mostly designed to not break 100% of our tests, and we have #45549 to look at
that more holistically.

To support the "reduce" style task which takes as input a sequence of all the
pushed (mapped) XCom values, and to keep the previous behaviour of not loading
all values in to memory at once, we have added a new HEAD route to the Task
Execution interface that returns the number of mapped XCom values so that it
is possible to implement __len__ on the new LazyXComSequence class.

I have deleted a tranche of tests from tests/models that were to do with
runtime behavoir and and now tested in the TaskSDK instead.

Testing:

I have been testing this with Celery Executor and the example_dynamic_task_mapping dag, which is this:

with DAG(dag_id="example_dynamic_task_mapping", schedule=None, start_date=datetime(2022, 3, 4)) as dag:
    @task
    def add_one(x: int):
        return x + 1

    @task
    def sum_it(values):
        total = sum(values)
        print(f"Total was {total}")

    added_values = add_one.expand(x=[1, 2, 3])
    sum_it(added_values)

It now works, and the sum_it tasks logs show:

{"timestamp":"2025-02-04T14:24:05.264679Z","level":"info","event":"Total was 9","chan":"stdout","logger":"task"}

Closes #44360


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@ashb ashb force-pushed the mapped-tasks-tasksdk-execution-time branch 4 times, most recently from 69bca00 to ee19629 Compare January 31, 2025 23:58
@ashb ashb force-pushed the mapped-tasks-tasksdk-execution-time branch from ee19629 to 99f8c68 Compare February 4, 2025 17:42
@ashb ashb changed the title [WIP] Add dynamic task mapping into TaskSDK runtime Add dynamic task mapping into TaskSDK runtime Feb 4, 2025
@ashb ashb added the area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK label Feb 4, 2025
@uranusjr
Copy link
Member

uranusjr commented Feb 5, 2025

This looks good to me. I’d be even more confident if the map_index changes are taken out, but I’ll take Ash’s words they don’t affect anything else if they’re left in. Aside from that it’s just the seemingly redundant type() calls left. We should remove them if possible, or find out why they are needed.

@ashb
Copy link
Member Author

ashb commented Feb 5, 2025

This looks good to me. I’d be even more confident if the map_index changes are taken out, but I’ll take Ash’s words they don’t affect anything else if they’re left in. Aside from that it’s just the seemingly redundant type() calls left. We should remove them if possible, or find out why they are needed.

Yeah, I'll back out the map index change and try/remove the type as it shouldn't be needed. Just sorting out all the typing and static checks right now first.

@ashb ashb force-pushed the mapped-tasks-tasksdk-execution-time branch from 99f8c68 to 35fca81 Compare February 5, 2025 14:57
@ashb ashb requested a review from potiuk as a code owner February 5, 2025 14:57
@ashb ashb force-pushed the mapped-tasks-tasksdk-execution-time branch 3 times, most recently from 30c389e to a876cb0 Compare February 5, 2025 17:10
@ashb ashb requested a review from hussein-awala as a code owner February 5, 2025 17:10
@ashb ashb force-pushed the mapped-tasks-tasksdk-execution-time branch from a876cb0 to 313a98a Compare February 5, 2025 23:27
Copy link
Member

@jedcunningham jedcunningham left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM.

airflow/models/baseoperator.py Outdated Show resolved Hide resolved
airflow/models/baseoperator.py Outdated Show resolved Hide resolved
task_sdk/src/airflow/sdk/api/client.py Outdated Show resolved Hide resolved
task_sdk/src/airflow/sdk/definitions/xcom_arg.py Outdated Show resolved Hide resolved
task_sdk/src/airflow/sdk/execution_time/task_runner.py Outdated Show resolved Hide resolved
task_sdk/tests/conftest.py Outdated Show resolved Hide resolved
@ashb ashb force-pushed the mapped-tasks-tasksdk-execution-time branch 9 times, most recently from b35bb7e to ce56376 Compare February 6, 2025 16:55
ashb and others added 3 commits February 6, 2025 17:46
The big change here (other than just moving code around) is to introduce a
conceptual separation between Definition/Execution time and Scheduler time.

This means that the expansion of tasks (creating the TaskInstance rows with
different map_index values) is now done on the scheduler, and we now
deserialize to different classes. For example, when we deserialize the
`DictOfListsExpandInput` it gets turned into an instance of
SchedulerDictOfListsExpandInput. This is primarily designed so that DB access
is kept 100% out of the TaskSDK.

Some of the changes here are on the "wat" side of the scale, and this is
mostly designed to not break 100% of our tests, and we have #45549 to look at
that more holistically.

To support the "reduce" style task which takes as input a sequence of all the
pushed (mapped) XCom values, and to keep the previous behaviour of not loading
all values in to memory at once, we have added a new HEAD route to the Task
Execution interface that returns the number of mapped XCom values so that it
is possible to implement `__len__` on the new LazyXComSequence class.

I have deleted a tranche of tests from tests/models that were to do with
runtime behavoir and and now tested in the TaskSDK instead.
@ashb
Copy link
Member Author

ashb commented Feb 6, 2025

Use the command line to resolve conflicts before continuing.
providers/src/airflow/providers/cncf/kubernetes/operators/pod.py

This is a "fake" conflict. git on the CLI does it without complaining (it just got renamed). If this current run passes I am merging after rebasing without waiting for next tests to pass.

@ashb ashb force-pushed the mapped-tasks-tasksdk-execution-time branch from ce36e6d to 972df81 Compare February 6, 2025 17:57
@ashb ashb merged commit aaaea35 into main Feb 6, 2025
7 checks passed
@jedcunningham jedcunningham deleted the mapped-tasks-tasksdk-execution-time branch February 6, 2025 17:58
insomnes pushed a commit to insomnes/airflow that referenced this pull request Feb 6, 2025
The big change here (other than just moving code around) is to introduce a
conceptual separation between Definition/Execution time and Scheduler time.

This means that the expansion of tasks (creating the TaskInstance rows with
different map_index values) is now done on the scheduler, and we now
deserialize to different classes. For example, when we deserialize the
`DictOfListsExpandInput` it gets turned into an instance of
SchedulerDictOfListsExpandInput. This is primarily designed so that DB access
is kept 100% out of the TaskSDK.

Some of the changes here are on the "wat" side of the scale, and this is
mostly designed to not break 100% of our tests, and we have apache#45549 to look at
that more holistically.

To support the "reduce" style task which takes as input a sequence of all the
pushed (mapped) XCom values, and to keep the previous behaviour of not loading
all values in to memory at once, we have added a new HEAD route to the Task
Execution interface that returns the number of mapped XCom values so that it
is possible to implement `__len__` on the new LazyXComSequence class.

I have deleted a tranche of tests from tests/models that were to do with
runtime behavoir and and now tested in the TaskSDK instead.
insomnes pushed a commit to insomnes/airflow that referenced this pull request Feb 6, 2025
The big change here (other than just moving code around) is to introduce a
conceptual separation between Definition/Execution time and Scheduler time.

This means that the expansion of tasks (creating the TaskInstance rows with
different map_index values) is now done on the scheduler, and we now
deserialize to different classes. For example, when we deserialize the
`DictOfListsExpandInput` it gets turned into an instance of
SchedulerDictOfListsExpandInput. This is primarily designed so that DB access
is kept 100% out of the TaskSDK.

Some of the changes here are on the "wat" side of the scale, and this is
mostly designed to not break 100% of our tests, and we have apache#45549 to look at
that more holistically.

To support the "reduce" style task which takes as input a sequence of all the
pushed (mapped) XCom values, and to keep the previous behaviour of not loading
all values in to memory at once, we have added a new HEAD route to the Task
Execution interface that returns the number of mapped XCom values so that it
is possible to implement `__len__` on the new LazyXComSequence class.

I have deleted a tranche of tests from tests/models that were to do with
runtime behavoir and and now tested in the TaskSDK instead.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:serialization area:task-execution-interface-aip72 AIP-72: Task Execution Interface (TEI) aka Task SDK area:task-sdk
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Handling "Task Mapping" in the Task SDK
5 participants