From 1caf113fe43a5b7d9d70caa755541d872aad19a2 Mon Sep 17 00:00:00 2001 From: "Igoshev, Yaroslav" Date: Wed, 2 Jun 2021 20:52:39 +0300 Subject: [PATCH] FEAT-#3451: Support `__partitioned__` protocol Signed-off-by: Igoshev, Yaroslav --- modin/pandas/base.py | 62 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/modin/pandas/base.py b/modin/pandas/base.py index 0dcb73fcc3d..18191936d68 100644 --- a/modin/pandas/base.py +++ b/modin/pandas/base.py @@ -43,7 +43,7 @@ import warnings import pickle as pkl -from modin.utils import try_cast_to_pandas, _inherit_docstrings +from modin.utils import try_cast_to_pandas, _inherit_docstrings, get_current_backend from modin.error_message import ErrorMessage from modin.pandas.utils import is_scalar from modin.config import IsExperimental @@ -3010,6 +3010,66 @@ def __xor__(self, other): def __rxor__(self, other): return self._binary_op("__rxor__", other, axis=0) + @property + def __partitioned__(self): + """Implementation of github.com/IntelPython/DPPY-Spec/issues/3.""" + from .dataframe import DataFrame + + is_dataframe = isinstance(self, DataFrame) + backend = get_current_backend() + + if backend in ("PandasOnRay",): + from modin.engines.ray.task_wrapper import RayTask + + get = RayTask.materialize + part_attr = "oid" + elif backend in ("PandasOnDask",): + from modin.engines.dask.task_wrapper import DaskTask + + get = DaskTask.materialize + part_attr = "future" + else: + raise NotImplementedError( + f"'__partitioned__' is not supported by '{backend}' backend." + ) + + parts = self._query_compiler._modin_frame._partitions + n_rparts = len(parts) + n_cparts = len(parts[0]) + # Now compute partition info, including global start and shape of each + partitions = {} + curr_x = 0 + for i in range(n_rparts): + curr_y = 0 + for j in range(n_cparts): + part = parts[i][j] + curr_part = getattr(part, part_attr) + curr_ip = part._ip_cache + curr_shape = ( + (part.length(), part.width()) if is_dataframe else (part.length(),) + ) + partitions[(i, j)] = { + "start": (curr_x, curr_y), + "shape": curr_shape, + "data": curr_part, + "location": [ + part.ip() + if curr_ip is None + else curr_ip + if isinstance(curr_ip, str) + else part.ip() + ], + } + curr_y += curr_shape[1] if is_dataframe else 0 # in inner loop + curr_x += curr_shape[0] # in outer loop + + return { + "partition_tiling": (n_rparts, n_cparts) if is_dataframe else (n_rparts,), + "partitions": partitions, + "get": lambda x: get(x)[0] + # we don't set 'locals' because this is controller-worker, not SPMD + } + @property def size(self): return len(self._query_compiler.index) * len(self._query_compiler.columns)