Dynamic Task Mapping with range(prev_task_output)
#42884
Unanswered
infused-kim
asked this question in
Q&A
Replies: 1 comment
-
if you are worried about the DB you can use custom XCom backends. In DB you will get only a link, and the actual lists will be stored in S3, GCS, or whatever storage you choose. The lists themselves should not be too long though - as they should fit into memory - though - I think there were some optimisations implemented to allow streaming of that data not sure if this is implemented/works likley @uranusjr might know. |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Hello everyone.
I have a DAG that will in the future need to launch over 1,000 dynamic tasks.
I have one task that queries my database and figures out the batches and how to split the work and then returns a list with task numbers like
[0, 1, 2, 3]
that are then passed to the dynamic task, which then generateslen(prev_task_output)
dynamic tasks.Everything works very well right now as we have a low volume of data and tasks.
But we expect the data to grow quickly and eventually we will need run thousands of tasks at a frequent schedule.
My concern is that these lists will be stored in the DB through xcom and will eventually slow down the DB and airflow.
Pre-2.0 I have trained myself to only use xcom for tiny amounts of data.
The ideal solution would be for my first dag to just return the number of tasks that need to be run and then to pass something like
range(prev_task_output)
to my dynamic task.Unfortunately that doesn’t work. I also tried return the
range()ed
result inprev_task_output.map()
, but it didn’t work because it’s run as part of the pre-task of the dynamic task and doesn’t expand the one xcom result into multiple.Is my concern valid or is it fine to return thousands of xcom values?
Are there any existing solutions that I have missed that could achieve what I want?
And finally, would this be of interest to the community and would a PR be welcome, that adds the feature to modify the result before it’s passed to the dynamic task?
Beta Was this translation helpful? Give feedback.
All reactions