Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#3451: Support __partitioned__ protocol #3452

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import warnings
import pickle as pkl

from modin.utils import try_cast_to_pandas, _inherit_docstrings
from modin.utils import try_cast_to_pandas, _inherit_docstrings, get_current_backend
from modin.error_message import ErrorMessage
from modin.pandas.utils import is_scalar
from modin.config import IsExperimental
Expand Down Expand Up @@ -3010,6 +3010,66 @@ def __xor__(self, other):
def __rxor__(self, other):
return self._binary_op("__rxor__", other, axis=0)

@property
def __partitioned__(self):
"""Implementation of github.com/IntelPython/DPPY-Spec/issues/3."""
from .dataframe import DataFrame

is_dataframe = isinstance(self, DataFrame)
backend = get_current_backend()

if backend in ("PandasOnRay",):
from modin.engines.ray.task_wrapper import RayTask

get = RayTask.materialize
part_attr = "oid"
elif backend in ("PandasOnDask",):
from modin.engines.dask.task_wrapper import DaskTask

get = DaskTask.materialize
part_attr = "future"
else:
raise NotImplementedError(
f"'__partitioned__' is not supported by '{backend}' backend."
)

parts = self._query_compiler._modin_frame._partitions
n_rparts = len(parts)
n_cparts = len(parts[0])
# Now compute partition info, including global start and shape of each
partitions = {}
curr_x = 0
for i in range(n_rparts):
curr_y = 0
for j in range(n_cparts):
part = parts[i][j]
curr_part = getattr(part, part_attr)
curr_ip = part._ip_cache
curr_shape = (
(part.length(), part.width()) if is_dataframe else (part.length(),)
)
partitions[(i, j)] = {
"start": (curr_x, curr_y),
"shape": curr_shape,
"data": curr_part,
"location": [
part.ip()
if curr_ip is None
else curr_ip
if isinstance(curr_ip, str)
else part.ip()
],
}
curr_y += curr_shape[1] if is_dataframe else 0 # in inner loop
curr_x += curr_shape[0] # in outer loop

return {
"partition_tiling": (n_rparts, n_cparts) if is_dataframe else (n_rparts,),
"partitions": partitions,
"get": lambda x: get(x)[0]
# we don't set 'locals' because this is controller-worker, not SPMD
}

@property
def size(self):
return len(self._query_compiler.index) * len(self._query_compiler.columns)
Expand Down