-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add from_partitioned
to create Dataset from any data source implements partitioned
#18966
Conversation
MLDataset is deprecated, so I don't think we should be accepting patches to it. |
Sounds good. Btw, the integration point for Datasets would be to define a custom datasource (e.g., |
I did not use Datasource API for a few reasons. 1. data can be in ray object store before |
I did not use Datasource API for a few reasons:
|
Why is that not possible with Datasource? |
Broader discussion started here: data-apis/consortium-feedback#7 |
I believe both of these are possible with Datasource. For locality, Ray will internally manage locality-aware execution, the use of node: labels is not recommended since it interferes with auto-scaling and fault tolerance. |
The data could be in dask, that make things hard. Anyway, I'll update it once the protocol settles |
If the data is in Dask on Ray, then locality scheduling will apply to those objects. We don't support Dask unless it's run via Dask on Ray. Alternatively, we could modify the data source API to allow custom read tasks to be generated (e.g., a PartitionedDataSource could generate read tasks that run on specific nodes according to locality). Can you let me know if one of the above alternatives works? |
@@ -511,6 +511,29 @@ def from_modin(df: "modin.DataFrame") -> Dataset[ArrowRow]: | |||
parts = unwrap_partitions(df, axis=0) | |||
return from_pandas(parts) | |||
|
|||
def from_partitioned(data) -> Dataset[ArrowRow]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be refactored into a PartitionedDataSource.
from_partitioned
to create MLDataset from any data source implements partitioned
from_partitioned
to create Dataset from any data source implements partitioned
Yes, this is generally understood, and as long as we consider "native" ray objects only, this will work just fine without anything special. If we consider data which comes from somewhere else (say, like "dask" or "YetOnotherFancyFamework") the protocol allows us to manually guarantee locality - probably using label:node would be most appropriate - if only for tasks which put the data into ray space. The protocol tries to allow this kind of interoperability without requiring consumers to necessarily support all frameworks. Notice: ray limits the possibilities to support proper zero-copy when consuming non-ray objects. But at least we can avoid data transfer between nodes. |
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 14 days if no further activity occurs. Thank you for your contributions.
|
Hi again! The issue will be closed because there has been no more activity in the 14 days since the last message. Please feel free to reopen or open a new issue if you'd still like it to be addressed. Again, you can always ask for help on our discussion forum or Ray's public slack channel. Thanks again for opening the issue! |
Why are these changes needed?
We intend to propose a protocol to make large, distributed, partitioned data exchange between frameworks(like ray, modin, dask) easier. Several PRs are in progress, please check here.
It's also possible to do for ray dataset, but to start with, MLDataset is simpler.
Related issue number
Checks
scripts/format.sh
to lint the changes in this PR.