Skip to content

Map partitions on a series that yields a scalar per partition cannot be computed #625

Open
@fjetter

Description

@fjetter
from dask_expr.datasets import timeseries
from dask.utils import M

df = timeseries()
df.x.map_partitions(M.min).compute()
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
Cell In[4], line 1
----> 1 df.x.map_partitions(M.min).compute()

File ~/workspace/dask-expr/dask_expr/_collection.py:265, in FrameBase.compute(self, fuse, **kwargs)
    263     out = out.repartition(npartitions=1)
    264 out = out.optimize(fuse=fuse)
--> 265 return DaskMethodsMixin.compute(out, **kwargs)

File ~/workspace/dask/dask/base.py:342, in DaskMethodsMixin.compute(self, **kwargs)
    318 def compute(self, **kwargs):
    319     """Compute this dask collection
    320
    321     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    340     dask.compute
    341     """
--> 342     (result,) = compute(self, traverse=False, **kwargs)
    343     return result

File ~/workspace/dask/dask/base.py:628, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    625     postcomputes.append(x.__dask_postcompute__())
    627 with shorten_traceback():
--> 628     results = schedule(dsk, keys, **kwargs)
    630 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/workspace/dask/dask/dataframe/dispatch.py:67, in concat(dfs, axis, join, uniform, filter_warning, ignore_index, **kwargs)
     65     return dfs[0]
     66 else:
---> 67     func = concat_dispatch.dispatch(type(dfs[0]))
     68     return func(
     69         dfs,
     70         axis=axis,
   (...)
     75         **kwargs,
     76     )

File ~/workspace/dask/dask/utils.py:635, in Dispatch.dispatch(self, cls)
    633         self._lazy.pop(toplevel, None)
    634         return self.dispatch(cls)  # recurse
--> 635 raise TypeError(f"No dispatch for {cls}")

TypeError: No dispatch for <class 'numpy.float64'>

So far, I believe the reason for this is that the min is returning a scalar float (the type that is tried to be dispatched) instead of a Series with one element. This then fails in the concat operation when computing the series.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions