Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet statistics are collected twice #365

Open
fjetter opened this issue Oct 26, 2023 · 10 comments
Open

Parquet statistics are collected twice #365

fjetter opened this issue Oct 26, 2023 · 10 comments

Comments

@fjetter
Copy link
Member

fjetter commented Oct 26, 2023

Currently, the parquet reader is using split_row_groups='infer' to infer whether it should split a file into multiple dask tasks or not which is done by collecting parquet statistics from every file.

Running the code below triggers this statistics collection twice. Once during the definition of the computation / while I'm mutating the dataframe and once as soon as I call compute.

import dask_expr as dd

VAR1 = datetime(1998, 9, 2)

lineitem_ds = dd.read_parquet("s3://coiled-runtime-ci/tpc-h/scale-1000/lineitem")

lineitem_filtered = lineitem_ds[lineitem_ds.l_shipdate <= VAR1]
lineitem_filtered["sum_qty"] = lineitem_filtered.l_quantity
lineitem_filtered["sum_base_price"] = lineitem_filtered.l_extendedprice
lineitem_filtered["avg_qty"] = lineitem_filtered.l_quantity
lineitem_filtered["avg_price"] = lineitem_filtered.l_extendedprice

# This line now triggers a statistics collection iff `split_row_groups` is not False
lineitem_filtered["sum_disc_price"] = lineitem_filtered.l_extendedprice * (
    1 - lineitem_filtered.l_discount
)


lineitem_filtered["sum_charge"] = (
    lineitem_filtered.l_extendedprice
    * (1 - lineitem_filtered.l_discount)
    * (1 + lineitem_filtered.l_tax)
)

lineitem_filtered["avg_disc"] = lineitem_filtered.l_discount
lineitem_filtered["count_order"] = lineitem_filtered.l_discount

gb = lineitem_filtered.groupby(["l_returnflag", "l_linestatus"])

total = gb.agg(
    {
        "sum_qty": "sum",
        "sum_base_price": "sum",
        "sum_disc_price": "sum",
        "sum_charge": "sum",
        "avg_qty": "mean",
        "avg_price": "mean",
        "avg_disc": "mean",
        "count_order": "count",
    }
)

# Once I compute, another stats collection is triggered
total.compute()

image

related to #363

@fjetter
Copy link
Member Author

fjetter commented Oct 26, 2023

My naive expectation would be that the second collection is not necessary since we're doing some caching. Is this a false cache miss or do we actually have to collect this twice for some reason?

@rjzamora
Copy link
Member

which is done by collecting parquet statistics from every file.

We only want to check the first file. If we are collecting all statistics, then we are using pyarrow.dataset in the wrong way (or need to use something else to check the size of the first file).

@fjetter
Copy link
Member Author

fjetter commented Oct 26, 2023

We only want to check the first file.

I'm surprised that this line requires us checking anything.

@rjzamora
Copy link
Member

I'm surprised that this line requires us checking anything.

Can you clarify what you mean by "this line"?

@fjetter
Copy link
Member Author

fjetter commented Oct 26, 2023

lineitem_filtered["sum_disc_price"] = lineitem_filtered.l_extendedprice * (
    1 - lineitem_filtered.l_discount
)

this is triggering the collection. I assume it steps into some "I have two frames that need to be aligned, let's better check if we know the divisions" logic that is not necessary here

@fjetter
Copy link
Member Author

fjetter commented Oct 26, 2023

Patrick's example in #363 (comment) is essentially the same. He defines the dataframe code up to this point and then calls optimize which is triggering the collection

@rjzamora
Copy link
Member

Oh okay - Yeah, then I agree. We shouldn't need to do anything "new" at that step. In general, the dask-expr API will trigger whatever logic is necessary to calculate the current _meta, but shouldn't need to do anything with divisions/partitions if that information isn't needed to calculate that metadata.

@rjzamora
Copy link
Member

rjzamora commented Oct 26, 2023

Are you sure that the first time you are collecting metadata isn't in dd.read_parquet("s3://coiled-runtime-ci/tpc-h/scale-1000/lineitem")? [EDIT: Never mind. That component shouldn't trigger any kind of graph computation]

@rjzamora
Copy link
Member

Okay, I spent some time looking into this and the problem is that an element-wise operation between collections will currently trigger an eager expr.are_co_aligned(self.expr, other) check.

This means that we end up asking the root IO expression for it's divisions before any kind of optimization pass (column projection, combine similar, etc). Later on, when we call compute (or just optimize), the optimization pass modifies the column projection, and so we end up needing to generate a fresh partitioning plan.

I'm pretty sure I can revise the ReadParquet caching logic to avoid the need to generate the partitioning plan twice.

@phofl
Copy link
Collaborator

phofl commented Oct 26, 2023

#366

This is an improvement for this particular problem, the general issue remains though

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants