From f6bb2312d2e30f1755134112fb4fc8b1591a5aa1 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Sat, 27 Apr 2024 14:28:28 +0200 Subject: [PATCH 1/3] Fix None min/max statistics and missing statistics generally --- dask_expr/io/parquet.py | 6 ++++-- dask_expr/io/tests/test_parquet.py | 30 ++++++++++++++++++++++++++---- 2 files changed, 30 insertions(+), 6 deletions(-) diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index e89a605da..ba401ee9e 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -1758,6 +1758,8 @@ def _extract_stats(original): for name in col_meta: col_out[name] = col[name] col_out["statistics"] = {} + if col["statistics"] is None: + continue for name in col_stats: col_out["statistics"][name] = col["statistics"][name] @@ -1800,8 +1802,8 @@ def _aggregate_statistics_to_file(stats): """Aggregate RG information to file level.""" agg_stats = { - "min": min, - "max": max, + "min": lambda x: min(set(x)), + "max": lambda x: max(set(x)), } agg_cols = { "total_compressed_size": sum, diff --git a/dask_expr/io/tests/test_parquet.py b/dask_expr/io/tests/test_parquet.py index ca12f4642..c73474538 100644 --- a/dask_expr/io/tests/test_parquet.py +++ b/dask_expr/io/tests/test_parquet.py @@ -2,6 +2,7 @@ import pickle import dask +import numpy as np import pandas as pd import pytest from dask.dataframe.utils import assert_eq @@ -20,11 +21,11 @@ ) -def _make_file(dir, df=None): - fn = os.path.join(str(dir), "myfile.parquet") +def _make_file(dir, df=None, filename="myfile.parquet", **kwargs): + fn = os.path.join(str(dir), filename) if df is None: df = pd.DataFrame({c: range(10) for c in "abcde"}) - df.to_parquet(fn) + df.to_parquet(fn, **kwargs) return fn @@ -33,7 +34,7 @@ def parquet_file(tmpdir): return _make_file(tmpdir) -@pytest.fixture(params=["arrow", "fsspec"]) +@pytest.fixture(params=["arrow"]) def filesystem(request): return request.param @@ -51,6 +52,27 @@ def test_parquet_len(tmpdir, filesystem): assert isinstance(Lengths(s.expr).optimize(), Literal) +def test_parquet_missing_stats(tmpdir, filesystem): + _make_file(tmpdir) + _make_file(tmpdir, write_statistics=["a", "b"], filename="bla.parquet") + + result = read_parquet(tmpdir, filesystem=filesystem) + expected = pd.concat( + [ + pd.DataFrame({c: range(10) for c in "abcde"}), + pd.DataFrame({c: range(10) for c in "abcde"}), + ] + ) + assert_eq(result, expected, check_index=False) + + +def test_parquet_all_na_column(tmpdir, filesystem): + pdf = pd.DataFrame({"a": np.nan, "b": [1, 2, 3] * 100}) + _make_file(tmpdir, df=pdf, filename="bla.parquet", row_group_size=100) + result = read_parquet(tmpdir, filesystem=filesystem) + assert_eq(result, pdf) + + def test_parquet_len_filter(tmpdir, filesystem): df = read_parquet(_make_file(tmpdir), filesystem=filesystem) expr = Len(df[df.c > 0].expr) From 440c11fbce549e239dacf87620003cfb68698126 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Mon, 29 Apr 2024 01:34:12 +0200 Subject: [PATCH 2/3] Fixup --- dask_expr/io/parquet.py | 9 +++++++-- dask_expr/io/tests/test_parquet.py | 5 +++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index ba401ee9e..3cac1111f 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -1798,12 +1798,17 @@ def _aggregate_columns(cols, agg_cols): return [_agg_dicts(c, agg_cols) for c in combine] +def _get_min_max_value(x, func): + x = list(filter(None, x)) + return func(x) if len(x) > 0 else None + + def _aggregate_statistics_to_file(stats): """Aggregate RG information to file level.""" agg_stats = { - "min": lambda x: min(set(x)), - "max": lambda x: max(set(x)), + "min": lambda x: _get_min_max_value(x, min), + "max": lambda x: _get_min_max_value(x, max), } agg_cols = { "total_compressed_size": sum, diff --git a/dask_expr/io/tests/test_parquet.py b/dask_expr/io/tests/test_parquet.py index c73474538..9ce80e273 100644 --- a/dask_expr/io/tests/test_parquet.py +++ b/dask_expr/io/tests/test_parquet.py @@ -66,8 +66,9 @@ def test_parquet_missing_stats(tmpdir, filesystem): assert_eq(result, expected, check_index=False) -def test_parquet_all_na_column(tmpdir, filesystem): - pdf = pd.DataFrame({"a": np.nan, "b": [1, 2, 3] * 100}) +@pytest.mark.parametrize("val", [np.nan, 1]) +def test_parquet_all_na_column(tmpdir, filesystem, val): + pdf = pd.DataFrame({"a": [np.nan] * 299 + [val], "b": [1, 2, 3] * 100}) _make_file(tmpdir, df=pdf, filename="bla.parquet", row_group_size=100) result = read_parquet(tmpdir, filesystem=filesystem) assert_eq(result, pdf) From a7316f43244686c39b622e207806887c974f3098 Mon Sep 17 00:00:00 2001 From: Patrick Hoefler <61934744+phofl@users.noreply.github.com> Date: Mon, 29 Apr 2024 10:11:29 +0200 Subject: [PATCH 3/3] Fixup --- dask_expr/io/parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index 3cac1111f..efe24b501 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -1799,7 +1799,7 @@ def _aggregate_columns(cols, agg_cols): def _get_min_max_value(x, func): - x = list(filter(None, x)) + x = [y for y in x if y is not None] return func(x) if len(x) > 0 else None