diff --git a/dask_expr/_repartition.py b/dask_expr/_repartition.py index eb3588586..fa47e7b02 100644 --- a/dask_expr/_repartition.py +++ b/dask_expr/_repartition.py @@ -7,7 +7,7 @@ import pandas as pd from dask.base import tokenize from dask.dataframe import methods -from dask.dataframe.core import _map_freq_to_period_start, split_evenly +from dask.dataframe.core import _concat, _map_freq_to_period_start, split_evenly from dask.dataframe.utils import is_series_like from dask.utils import iter_chunks, parse_bytes from pandas.api.types import is_datetime64_any_dtype, is_numeric_dtype @@ -161,7 +161,7 @@ def _layer(self): new_partitions_boundaries = self._partitions_boundaries return { (self._name, i): ( - methods.concat, + _concat, [(self.frame._name, j) for j in range(start, end)], ) for i, (start, end) in enumerate( diff --git a/dask_expr/tests/test_collection.py b/dask_expr/tests/test_collection.py index 9bce6b200..f1b57abdc 100644 --- a/dask_expr/tests/test_collection.py +++ b/dask_expr/tests/test_collection.py @@ -8,6 +8,7 @@ import dask import numpy as np +import pandas as pd import pytest from dask.dataframe._compat import PANDAS_GE_210 from dask.dataframe.utils import UNKNOWN_CATEGORIES @@ -1918,3 +1919,10 @@ def test_axes(df, pdf): [assert_eq(d, p) for d, p in zip(df.axes, pdf.axes)] assert len(df.x.axes) == len(pdf.x.axes) assert_eq(df.x.axes[0], pdf.x.axes[0]) + + +def test_map_partitions_series(df, pdf): + result = df.x.map_partitions(M.min).compute() + assert isinstance(result, pd.Series) + assert len(result) == df.npartitions + assert min(pdf.x) == min(result)