Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data Skew #417

Open
mrocklin opened this issue Nov 22, 2023 · 1 comment
Open

Data Skew #417

mrocklin opened this issue Nov 22, 2023 · 1 comment

Comments

@mrocklin
Copy link
Member

I'm curious about mechanisms to address data skew in sorting/shuffling/merge operations, where some values are far more common than others. This comes up a lot when talking to Spark people as a common thing to watch out for. It's also a non-trivial pain point for some dask.dataframe users.

I think a useful benchmark to explore and become sensitive to these workloads is the RenRe workflow some folks here have.

Quantiles help, but maybe not enough?

Maybe what we do today with pre-computing quantiles helps with some of this. It means that common values are more likely to be alone in their own partition, rather than with lots of other data (this would happen if we hash, for example). Maybe that's why it has been less of a concern for us than for Spark people historically?

Beyond that though, let's consider the case where there is a single value that is very common, and we're doing some join-like operation that expands it even further. How can we avoid memory blowing up?

Solutions

Spark uses iterative algorithms, kind of like how we operate in dask.bag today. They consume chunks of dataframes and emit chunks of dataframes. We could maybe do something similar in some cases

After blockwise fusion could look at fused expressions and find any that have joins in them. Probably we could switch these to run the same set of blockwise operations, but in a for loop over smaller bits of data coming in. This is only beneficial if there are one of two other kinds of nodes in the fused expression after the join / expanding operation:

  1. The beginning of an ApplyConcatApply sequence. This likely reduces the data, and then we can throw in an extra concat and combine at the end of the first apply stage.
  2. Writing to parquet/csv where we probably write each chunk as a row group or something to some data sink

Timing

This feels important, but not urgent. I don't think that we should think too much about it until after we've got dask-expr replacing dask.dataframe and good performance on benchmarks that don't include much skew.

Probably the one thing to do early-ish is to see how dask-expr and the new p2p system perform on the RenRe benchmark. cc @fjetter

@fjetter
Copy link
Member

fjetter commented Nov 22, 2023

I remember the RenRe problem to suffer mostly from bloating data. There was an outer join + groupby that generated so many groups that we couldn't handle it without split_out. With split_out shuffle perf was just abysmal. Haven't looked at this in a long time, though.

Regardless of whether this is a good example, data skew is of course still an issue we should think about.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants