Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reading from parquet files with more than one row group triggers expensive statistics collection #363

Open
phofl opened this issue Oct 26, 2023 · 27 comments

Comments

@phofl
Copy link
Collaborator

phofl commented Oct 26, 2023

cc @rjzamora

We should turn this off by default and use the logic one file = one partition. Could you take a look? I am not very familiar with the read_parquet stuff.

def read_data(filename):
    path = "s3://coiled-runtime-ci/tpc-h/scale-100/" + filename + "/"
    return dd.read_parquet(path, engine="pyarrow", filesystem=None)

if __name__ == "__main__":
    from distributed import Client
    client = Client()

    var1 = datetime.strptime("1995-01-01", "%Y-%m-%d")
    var2 = datetime.strptime("1997-01-01", "%Y-%m-%d")

    line_item_ds = read_data("lineitem")

    lineitem_filtered = line_item_ds[
        (line_item_ds["l_shipdate"] >= var1) & (line_item_ds["l_shipdate"] < var2)
        ]
    lineitem_filtered["l_year"] = 1  # lineitem_filtered["l_shipdate"].dt.year
    lineitem_filtered["revenue"] = lineitem_filtered["l_extendedprice"] * (
        1.0 - lineitem_filtered["l_discount"]
    )

    lineitem_filtered.optimize()

This is how we found that particular issue

cc @fjetter

@mrocklin
Copy link
Member

We should turn this off by default and use the logic one file = one partition

Sometimes this is important, but only when the file is big I think. (Some people have many-gigabyte files and use row groups heavily)

@phofl
Copy link
Collaborator Author

phofl commented Oct 26, 2023

I agree, but they can still enable this if necessary, I don’t want to remove this functionality, just turn it off by default

@mrocklin
Copy link
Member

Ideally I'd like for the right thing to happen automatically for these users without them having to know about the configuration setting (in general I think we're trying to make dask dataframe require less configuration). I suspect that if we were to look at a couple of files and see how large they are that we could make this decision for them with high certainty.

@fjetter
Copy link
Member

fjetter commented Oct 26, 2023

I'm a strong -1 to have the current logic by default the way it is right now. I don't think it's sensible to trigger a full dataset scan the way we're doing it right now particularly not during graph generation.

Part of my scepticism comes from the fact that this is done during graph generation and users may be surprised that this is happening, particularly if there is not a large/remote cluster already connected when the dataset is opened this can cause significant delay for the read_parquet call.
Even more than that, I'm not entirely convinced the current logic is indeed what we want as a default. I agree that there are cases where this might make sense but I'm not convinced that this is true for the general user population.

@mrocklin
Copy link
Member

I don't think it's sensible to trigger a full dataset scan the way we're doing it right now particularly not during graph generation.

I agree that this is surprising. Some graph-generation-time check may be necessary, at least if we want to know things like how many partitions are in a dataset.

It may also make you feel better (or worse) knowing that Spark does a similar thing. There's a bit of a delay after every spark.read.parquet call as they go and sniff metadata.

My expectation is that we still do this in general, but are able to avoid it in the common case of files smaller than, say, 256 MiB.

@phofl
Copy link
Collaborator Author

phofl commented Oct 26, 2023

This actually happens before graph generation, triggering optimise is enough, that's not a great ux (@hendrikmakait and I were debugging locally this morning and the wait was quite long).

I think we run into the danger of being too smart which will make things signficantly worse, we should avoid these pre-emptive computations as much as possible since this is a major pain in the current dask dataframe implementation. Doing this with a single file sounds fine, but a directory could be arbitrary huge

@fjetter
Copy link
Member

fjetter commented Oct 26, 2023

Some graph-generation-time check may be necessary, at least if we want to know things like how many partitions are in a dataset.

Reading every parquet footer is different than asking the storage backend how many files there are.

It may also make you feel better (or worse) knowing that Spark does a similar thing

similar, maybe. However, the current logic is triggering a full dataset scan. If we want to probe a couple of files that's the one thing but touching every file once is way too expensive

@mrocklin
Copy link
Member

I agree that probing a couple files first is probably the right way to go, and then if we find that they're large then we'll probably need to read every footer

Reading every parquet footer is different than asking the storage backend how many files there are.

Yeah, to be clear I'm talking about the situation where people have large parquet files. This does occur (for example we have customers where it occurs). However, it's also rare, and where I think we have strong agreement is that we should try to avoid this cost when possible, which we think is the common case.

@mrocklin
Copy link
Member

I think that we're all saying the same thing here, which is that we should probe a couple of files for their size, and if they're modest, just use one partiton per file.

I also think (but I'm not sure that we're agreed here) that if the files are large, then we should scan all footers whenever we need to determine divisions (which is hopefully very late, but maybe isn't). We should do this in a compute call in case we have a cluster lying around to do this work for us.

@rjzamora
Copy link
Member

The current behavior is absolutely better than a file-to-partition default. We just look at the first file and check if it is larger than blocksize. If it isn’t, we DO use file-to-partition mapping. So, just change the blocksize or suggest a better way to control the blocksize default.

The adaptive behavior we have now has been a much better UX than we ever had before.

@fjetter
Copy link
Member

fjetter commented Oct 26, 2023

The current behavior is absolutely better than a file-to-partition default.
The adaptive behavior we have now has been a much better UX than we ever had before.

Can you elaborate? Given the investigation we've been doing in the past days, I'm very hesitant to make any general claims like this.
Generally, I'd like to learn more about what factors led to this decision.

@mrocklin
Copy link
Member

What I'm hearing from @rjzamora is that he's already doing something similar to what is being suggested, look at a couple of files in order to make a quick determination.

If so, then probably the limit here is set far too low. We're triggering this situation with files that are 128 MB, which I think is a very normal size.

@fjetter
Copy link
Member

fjetter commented Oct 26, 2023

already doing something similar to what is being suggested

I'm not suggesting much here other than turning this off by default and revisit the logic. I'm lacking the context of why this was necessary or how this improved UX but from what I can see right now this is not an improvement and this is potentially even harmful.

I'm actually not even convinced that we want to read all the footers even if the file(s) end(s) up being large.

@rjzamora
Copy link
Member

Yes, we look at the metadata of the first file, and check if the uncompressed byte size is below the limit. In your case, it must be larger than 128MiB.

Since adding this behavior, I’ve gone from fielding tens of questions/complaints about bad read_parquet partitioning to practically 0.

in dask-express, you have the extra consideration that you may need to parse the metadata multiple times. Therefore, we could consider different befaults or checks (e.g. we could check the literal file size to avoid any metadata parsing). However, I will be -1 to a file-to-partition default (just from experience).

@mrocklin
Copy link
Member

@fjetter the use case that matters here is when the user has a single very large parquet file, like a 100 GB file. Sometimes people do this rather than create many small files.

@mrocklin
Copy link
Member

Since adding this behavior, I’ve gone from fielding tens of questions/complaints about bad read_parquet partitioning to practically 0.

@rjzamora are these somewhere we can see?

@mrocklin
Copy link
Member

Yes, we look at the metadata of the first file, and check if the uncompressed byte size is below the limit. In your case, it must be larger than 128MiB.

Where is this set? I looked around for this for a while in the last couple of days. I thought for a bit that it was the blocksize_default in arrow.py, but that didn't seem to have an effect.

@fjetter
Copy link
Member

fjetter commented Oct 26, 2023

@fjetter the use case that matters here is when the user has a single very large parquet file, like a 100 GB file. Sometimes people do this rather than create many small files.

Yes, I do understand this.

@mrocklin
Copy link
Member

Yes, I do understand this

Cool. Just checking. If a user presents with a 100 GB file I think that we should spend the effort to read the footer, rather than assume a single partition. Do you agree?

My sense is that the main thing to tweak here is the size at which we start defaulting to many-partitions-per-file. 128 MiB files should, I think, be single partitions, and 100 GB files should be treated with many partitions. I think that it'll get interesting trying to figure out the boundary. Do you agree with this general framing of the problem?

@rjzamora
Copy link
Member

I’ve been using blocksize=“256MiB” for RAPIDS benchmarking (with single parquet files at sf100). I think there are many users out there with large parquet files. It may be somewhat of an anti pattern, but it is not at all uncommon.

@mrocklin
Copy link
Member

I’ve been using blocksize=“256MiB” for RAPIDS benchmarking

Where is this default set?

@fjetter
Copy link
Member

fjetter commented Oct 26, 2023

Cool. Just checking. If a user presents with a 100 GB file I think that we should spend the effort to read the footer, rather than assume a single partition. Do you agree?

I don't object to reading the footer and I don't object to splitting the file up into row groups.

I am merely suggesting that the current logic is harmful for most users and only beneficial for few. I'm not trying to make a case for a better logic here. This is an entirely separate topic.

I think that it'll get interesting trying to figure out the boundary. Do you agree with this general framing of the problem?

As I said, I don't even think we should scan the entire dataset so I do not agree with this framing of the problem.

I'm trying really hard here to not start a conversation and design process about how such a mechanism should work. I have thoughts about this but this is not necessarily a priority right now.
I'm merely pointing out that the current mechanism is not a good default and we should turn it off and then have a conversation about what a good default should be.

@rjzamora
Copy link
Member

@mrocklin
Copy link
Member

Hrm, I believe I've tried changing that value and didn't see the behavior change. I've tried tracking that value back to where it's used, but I wasn't able to do it easily.

I'll try changing the value again and see if things work as I expect. If not, I might ask for your help in understanding this code.

@mrocklin
Copy link
Member

OK, I had changed this value from 128 to 256 but it still triggered on a file of size 131572280. When I moved it up to 512 MB it stopped triggering.

Maybe it is looking at the size of a full partition in memory or something?

Also, just a quick note, but this is the path I took in order to find out out/where this was used:

  • default_blocksize defined on the Engine class in dask/dataframe/io/parquet/utils.py
  • collected into blocksize in core.py (maybe we should just make this a default keyword argument?)
  • passed into engine.read_metadata
  • passed into engine.cls._collect_dataset_info
  • There's this crazy table there with some decision tree
  • passed into _infer_split_row_groups
  • found this line aggregate_files or np.sum(row_group_sizes) > blocksize

FWIW, I found this developer experience pretty frustrating. Maybe parquet is inherently really complex, but I suspect that it's not this complex. This feels like substantial technical debt.

@rjzamora
Copy link
Member

Maybe it is looking at the size of a full partition in memory or something?

Yes, sort of. It is using the uncompressed storage size metadata in the footer of the first file.

Also, just a quick note, but this is the path I took in order to find out out/where this was used:
...
FWIW, I found this developer experience pretty frustrating. Maybe parquet is inherently really complex, but I suspect that it's not this complex. This feels like substantial technical debt.

You are correct. It really doesn't need to be this complex. Most of the ugliness is just meant (1) to avoid breaking changes for external Engine users and for people still using split_row_groups etc, and (2) to make the blocksize default engine dependent. If I had to time to rewrite things from scratch with a fresh API, things would certainly look different.

@mrocklin
Copy link
Member

mrocklin commented Oct 26, 2023 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants