diff --git a/dask_expr/io/parquet.py b/dask_expr/io/parquet.py index e89a605da..efe24b501 100644 --- a/dask_expr/io/parquet.py +++ b/dask_expr/io/parquet.py @@ -1758,6 +1758,8 @@ def _extract_stats(original): for name in col_meta: col_out[name] = col[name] col_out["statistics"] = {} + if col["statistics"] is None: + continue for name in col_stats: col_out["statistics"][name] = col["statistics"][name] @@ -1796,12 +1798,17 @@ def _aggregate_columns(cols, agg_cols): return [_agg_dicts(c, agg_cols) for c in combine] +def _get_min_max_value(x, func): + x = [y for y in x if y is not None] + return func(x) if len(x) > 0 else None + + def _aggregate_statistics_to_file(stats): """Aggregate RG information to file level.""" agg_stats = { - "min": min, - "max": max, + "min": lambda x: _get_min_max_value(x, min), + "max": lambda x: _get_min_max_value(x, max), } agg_cols = { "total_compressed_size": sum, diff --git a/dask_expr/io/tests/test_parquet.py b/dask_expr/io/tests/test_parquet.py index ca12f4642..9ce80e273 100644 --- a/dask_expr/io/tests/test_parquet.py +++ b/dask_expr/io/tests/test_parquet.py @@ -2,6 +2,7 @@ import pickle import dask +import numpy as np import pandas as pd import pytest from dask.dataframe.utils import assert_eq @@ -20,11 +21,11 @@ ) -def _make_file(dir, df=None): - fn = os.path.join(str(dir), "myfile.parquet") +def _make_file(dir, df=None, filename="myfile.parquet", **kwargs): + fn = os.path.join(str(dir), filename) if df is None: df = pd.DataFrame({c: range(10) for c in "abcde"}) - df.to_parquet(fn) + df.to_parquet(fn, **kwargs) return fn @@ -33,7 +34,7 @@ def parquet_file(tmpdir): return _make_file(tmpdir) -@pytest.fixture(params=["arrow", "fsspec"]) +@pytest.fixture(params=["arrow"]) def filesystem(request): return request.param @@ -51,6 +52,28 @@ def test_parquet_len(tmpdir, filesystem): assert isinstance(Lengths(s.expr).optimize(), Literal) +def test_parquet_missing_stats(tmpdir, filesystem): + _make_file(tmpdir) + _make_file(tmpdir, write_statistics=["a", "b"], filename="bla.parquet") + + result = read_parquet(tmpdir, filesystem=filesystem) + expected = pd.concat( + [ + pd.DataFrame({c: range(10) for c in "abcde"}), + pd.DataFrame({c: range(10) for c in "abcde"}), + ] + ) + assert_eq(result, expected, check_index=False) + + +@pytest.mark.parametrize("val", [np.nan, 1]) +def test_parquet_all_na_column(tmpdir, filesystem, val): + pdf = pd.DataFrame({"a": [np.nan] * 299 + [val], "b": [1, 2, 3] * 100}) + _make_file(tmpdir, df=pdf, filename="bla.parquet", row_group_size=100) + result = read_parquet(tmpdir, filesystem=filesystem) + assert_eq(result, pdf) + + def test_parquet_len_filter(tmpdir, filesystem): df = read_parquet(_make_file(tmpdir), filesystem=filesystem) expr = Len(df[df.c > 0].expr)