Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

We can't properly deal with delayed inputs #777

Open
phofl opened this issue Jan 19, 2024 · 6 comments
Open

We can't properly deal with delayed inputs #777

phofl opened this issue Jan 19, 2024 · 6 comments
Milestone

Comments

@phofl
Copy link
Collaborator

phofl commented Jan 19, 2024

Every argument could theoretically be a delayed object, similar to how every argument could be a dask-expr collection, we can't deal with this yet since we never check for them

I created a very naive implementation of a delayed expression _Delayed to capture some things, but we should think critically about it before moving ahead with this issue.

I suggest that we unpack delayed object similar to what we do with collections, in the constructor of Expr

@phofl phofl added this to the TODO Features milestone Jan 19, 2024
@delucchi-cmu
Copy link

This introduced some big performance regressions in our framework that makes heavy use of @dask.delayed. We do some silly things like create 300k delayed objects so we have extreme control over joining dataframes from disparate sources.

We're getting around it for now by using the legacy dataframe directly.

@phofl
Copy link
Collaborator Author

phofl commented Apr 27, 2024

Do you have a reproducer? I suspect that this is actually something different that we can address more easily

@delucchi-cmu
Copy link

Simple code snippet.py:

import numpy as np
import dask.dataframe as dd
import pandas as pd

if __name__ == "__main__":
    a = dd.from_dict({"a": np.arange(300_000)}, npartitions=30_000)
    parts = a.to_delayed()
    dd.from_delayed(
        parts[0], meta=pd.DataFrame.from_dict({"a": pd.Series(dtype=np.int64)})
    ).compute()

Starting from a fresh virtual environment:

>> pip install numpy pandas 'dask<=2024.2.0'
>> time python snippet.py

real	0m1.094s
user	0m1.435s
sys	0m1.383s

>> pip install 'dask>=2024.3.1'
>> time python snippet.py

real	0m1.127s
user	0m1.507s
sys	0m1.360s

>> pip install dask-expr
>> time python snippet.py

real	0m48.497s
user	0m48.845s
sys	0m1.381s

@phofl
Copy link
Collaborator Author

phofl commented Apr 30, 2024

Thx, that's helpful

put up a fix here: #1048

@delucchi-cmu
Copy link

I suspect there is more than one operation that is being slowed by the addition of dask-expr. Using a similar reproducer as before:

    a = dd.from_dict({"a": np.arange(300_000)}, npartitions=300_000)
    parts = a.to_delayed(optimize_graph=False)
3x slowdown, unchanged by #1048
>> pip install numpy pandas 'dask<=2024.2.0'
>> time python snippet.py

real	0m5.882s
user	0m7.840s
sys	0m1.521s

>> pip install 'dask>=2024.3.1'
>> time python snippet.py

real	0m5.936s
user	0m7.882s
sys	0m1.554s

>> pip install dask-expr
Successfully installed dask-expr-1.0.14
>> time python snippet.py

real	0m20.619s
user	0m22.504s
sys	0m1.365s

>> pip install <dask-expr from source to include PR #1048>
>> time python snippet.py

real	0m20.285s
user	0m22.208s
sys	0m1.568s

@phofl
Copy link
Collaborator Author

phofl commented May 2, 2024

Yeah this is a know limitation unfortunately. Going to delayed objects roundtrips through the legacy implementation, which materialises the graph. That causes the slowdown here. Improvements are certainly very welcome

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants