You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
{{ message }}
This repository was archived by the owner on Aug 19, 2024. It is now read-only.
Copy file name to clipboardexpand all lines: Partitioned.md
+33-39
Original file line number
Diff line number
Diff line change
@@ -1,17 +1,17 @@
1
1
**Draft**
2
2
3
3
# Motivation
4
-
When operating in distributed memory systems a data container (such as tensors or data-frames) may be partitioned into several smaller chunks which might be allocated in distinct (distributed) address spaces. An implementation of operations defined specifically for such a partitioned data container will likely need to use specifics of the structure to provide good performance. Most prominently, making use of the data locality can be vital. For a consumer of such a partitioned container it will be inconvenient (if not impossible) to write specific code for any possible partitioned and distributed data container. On the other hand, like with the array/dataframe API interface, it would be much better to have a standard way of extracting the meta data about the partitioned and distributed nature of a given container.
4
+
When operating in distributed memory systems a data container (such as tensors or data-frames) may be partitioned into several smaller chunks which might be allocated in distinct (distributed) address spaces. An implementation of operations defined specifically for such a partitioned data container will likely need to use specifics of the structure to provide good performance. Most prominently, making use of the data locality can be vital. For a consumer of such a partitioned container it will be inconvenient (if not impossible) to write specific code for any possible partitioned and distributed data container. On the other hand, like with the array/dataframe API, it would be much better to have a standard way of extracting the meta data about the partitioned and distributed nature of a given container.
5
5
6
6
The goal of the `__partitioned__` protocol is to allow partitioned and distributed data containers to expose information to consumers so that unnecessary copying can be avoided as much as possible.
7
7
8
8
# Scope
9
9
Currently the focus is dense data structures with rectangular shapes, such as dense nd-arrays and DataFrames. The data structures compose their data-(index-)space into several rectangular partitions which form a multi-dimensional grid.
10
10
11
-
While the interface is designed to be generic enough to cover any distribution backend, the currently considered backends are Ray, Dask and MPI-like (SPMD assigning ids to each process/rank/PE).
11
+
While the protocol is designed to be generic enough to cover any distribution backend, the currently considered backends are Ray, Dask and MPI-like (SPMD assigning ids to each process/rank/PE).
12
12
13
-
# Partitioned Interface
14
-
A conforming implementation of the partitioned interface standard must provide and support a data structure object having a `__partitioned__`method which returns a Python dictionary with the following fields:
13
+
# Partitioned Protocol
14
+
A conforming implementation of the partitioned protocol standard must provide and support a data structure object having a `__partitioned__`property which returns a Python dictionary with the following fields:
15
15
*`shape`: a tuple defining the number of elements for each dimension of the global data-(index-)space.
16
16
*`partition_tiling`: a tuple defining the number of partitions for each dimension of the container's global data-(index-)space.
17
17
*`partitions`: a dictionary mapping a position in the partition grid (as defined by `partition_tiling`) to a dictionary providing the partition object, the partition shape and locality information.
@@ -20,14 +20,18 @@ A conforming implementation of the partitioned interface standard must provide a
20
20
21
21
The `__partitioned__` dictionary must be pickle'able.
22
22
23
-
In addition to the above specification any implemention is encouraged to provide extra information which could be potentially beneficial for consuming the distributed data structure.
24
-
25
23
## `shape`
26
24
The `shape` field provides the dimensionality and sizes per dimension of the underlying data-structure.
27
25
28
26
## `partition_tiling`
29
27
The shape of the partition grid must be of the same dimensionality as the underlying data-structure. `partition_tiling` provides the number of partitions along each dimension. Specifying `1` in a given dimension means the dimension is not cut.
30
28
29
+
## `get`
30
+
A callable must return raw data object when called with a handle (or sequence of handles) provided in the `data` field of an entry in `partition`. Raw data objects are standard data structures: DataFrame or nd-array.
31
+
32
+
## `locals`
33
+
A short-cut for SPMD environments allows processes/ranks to quickly extract their local partition. It saves processes from parsing the `partitions` dictionary for the local rank/address which is helpful when the number of ranks/processes/PEs is large.
34
+
31
35
## `partitions`
32
36
A dictionary mapping a position in the grid to detailed information about the partition at the given position.
33
37
* All positions in the partition-grid as defined by `partition_tiling` must be present in the dictionary as a key.
@@ -41,41 +45,31 @@ Each key/position maps to
41
45
42
46
A consumer must verify it supports the provided object types and locality information; it must throw an exception if not.
43
47
44
-
In addition to the above required keys a container is encouraged to provide more information which could be potentially beneficial for consuming the distributed data structure. For example, a DataFrame structure might add the row and column labels for any given partition.
45
-
46
-
*`start`
47
-
* The offset of the starting element in the partition from first element in the global index space of the underlying data-structure. It has the same dimensionality as the underlying data-structure and is given as a tuple. The stop indices can be computed by the sum of `start` and `shape`.
48
-
*`shape`
49
-
* Number of elements in each dimension. The shape of the partition has the same dimensionality as the underlying data-structure and is given as tuple.
50
-
*`data`
51
-
* The actual data of the partition, potentially provided as a handle.
52
-
* All data/partitions must be of the same type.
53
-
* The actual partition type is opaque to the `__partitioned__`
54
-
* The consumer needs to deal with it like it would in a non-partitioned case. For example, if the consumer can deal with array it could check for the existence of the array/dataframe APIs (https://github.com/data-apis/array-api, https://github.com/data-apis/dataframe-api). Other conforming consumers could hard-code a very specific type check.
55
-
* Whenever possible the data should be provided as references. References are mandatory of non-SPMD backedns. This avoids unnecessary data movement.
56
-
* Ray: ray.ObjectRef
57
-
* Dask: dask.Future
58
-
* It is recommended to access the actual data through the callable in the 'get' field of `__partitioned__`. This allows consumers to avoid checking handle types and container types.
59
-
60
-
* For SPMD-MPI-like backends: partitions which are not locally available may be `None`. This is the recommended behavior unless the underlying backend supports references (such as promises/futures) to avoid unnecessary data movement.
61
-
*`location`
62
-
* A sequence of locations where data can be accessed locally, e.g. without extra communication
63
-
* The location information must include all necessary data to uniquely identify the location of the partition/data. The exact information depends on the underlying distribution system:
64
-
* Ray: ip-address
65
-
* Dask: worker-Id (name, ip, or ip:port)
66
-
* SPMD/MPI-like frameworks such as MPI, SHMEM etc.: rank
67
-
68
-
## `get`
69
-
A callable must return raw data object when called with a handle (or sequence of handles) provided in the `data` field of an entry in `partition`. Raw data objects are standard data structures: DataFrame or nd-array.
70
-
71
-
## `locals`
72
-
The short-cut for SPMD environments allows processes/ranks to quickly extract their local partition. It saves processes from parsing the `partitions` dictionary for the local rank/address which is helpful when the number of ranks/processes/PEs is large.
73
-
48
+
### `start`
49
+
* The offset of the starting element in the partition from first element in the global index space of the underlying data-structure. It has the same dimensionality as the underlying data-structure and is given as a tuple. The stop indices can be computed by the sum of `start` and `shape`.
50
+
### `shape`
51
+
* Number of elements in each dimension. The shape of the partition has the same dimensionality as the underlying data-structure and is given as tuple.
52
+
### `data`
53
+
* The actual data of the partition, potentially provided as a handle.
54
+
* All data/partitions must be of the same type.
55
+
* The actual partition type is opaque to the `__partitioned__`
56
+
* The consumer needs to deal with it like it would in a non-partitioned case. For example, if the consumer can deal with array it could check for the existence of the array/dataframe APIs (https://github.com/data-apis/array-api, https://github.com/data-apis/dataframe-api). Other conforming consumers could hard-code a very specific type check.
57
+
* Whenever possible the data should be provided as references. References are mandatory of non-SPMD backedns. This avoids unnecessary data movement.
58
+
* Ray: ray.ObjectRef
59
+
* Dask: dask.Future
60
+
* It is recommended to access the actual data through the callable in the 'get' field of `__partitioned__`. This allows consumers to avoid checking handle types and container types.
61
+
* For SPMD-MPI-like backends: partitions which are not locally available may be `None`. This is the recommended behavior unless the underlying backend supports references (such as promises/futures) to avoid unnecessary data movement.
62
+
### `location`
63
+
* A sequence of locations where data can be accessed locally, e.g. without extra communication
64
+
* The location information must include all necessary data to uniquely identify the location of the partition/data. The exact information depends on the underlying distribution system:
65
+
* Ray: ip-address
66
+
* Dask: worker-Id (name, ip, or ip:port)
67
+
* SPMD/MPI-like frameworks such as MPI, SHMEM etc.: rank
74
68
75
69
## Examples
76
70
### 1d-data-structure (64 elements), 1d-partition-grid, 4 partitions on 4 nodes, blocked distribution, partitions are of type `Ray.ObjRef`, Ray
77
71
```python
78
-
__partitioned_interface__= {
72
+
__partitioned__= {
79
73
'shape': (64,),
80
74
'partition_tiling': (4,),
81
75
'partitions': {
@@ -105,7 +99,7 @@ __partitioned_interface__ = {
105
99
```
106
100
### 2d-structure (64 elements), 2d-partition-grid, 4 partitions on 2 nodes, block-cyclic distribution, partitions are of type `dask.Future`, dask
107
101
```python
108
-
__partitioned_interface__ = {
102
+
__partitioned__ = {
109
103
'shape’: (8, 8),
110
104
'partition_tiling’: (2, 2),
111
105
'partitions’: {
@@ -135,7 +129,7 @@ __partitioned_interface__ = {
135
129
```
136
130
### 2d-structure (64 elements), 1d-partition-grid, 4 partitions on 2 ranks, row-block-cyclic distribution, partitions are of type `pandas.DataFrame`, MPI
0 commit comments