|
1 | 1 | **Draft**
|
2 | 2 |
|
3 | 3 | # Motivation
|
4 |
| -When operating in distributed memory systems a data container (such as tensors of data-frames) may be partitioned into several smaller chunks. An implementation of operations defined specifically for such a partitioned data container can easily make use of the specifics of the structure to provide good performance. Most prominently, making use of the data locality can be vital. For a consumer of such a partitioned container it will be inconvenient (if not impossible) to write specific code for any possible partitioned data container. On the other hand, like with the array interface, it would be much better to have a standard way of extracting the meta data about the partitioned nature of a given container. |
| 4 | +When operating in distributed memory systems a data container (such as tensors or data-frames) may be partitioned into several smaller chunks which might be allocated in distinct (distributed) address spaces. An implementation of operations defined specifically for such a partitioned data container will likely need to use specifics of the structure to provide good performance. Most prominently, making use of the data locality can be vital. For a consumer of such a partitioned container it will be inconvenient (if not impossible) to write specific code for any possible partitioned and distributed data container. On the other hand, like with the array interface, it would be much better to have a standard way of extracting the meta data about the partitioned and distributed nature of a given container. |
5 | 5 |
|
6 |
| -The goal of the `__partitioned_interface__` protocol is to allow partitioned data containers to expose partitioning information to consumers so that unnecessary copying can be avoided as much as possible. |
| 6 | +The goal of the `__partitioned__` protocol is to allow partitioned and distributed data containers to expose information to consumers so that unnecessary copying can be avoided as much as possible. |
7 | 7 |
|
8 | 8 | # Scope
|
9 |
| -Currently the focus is dense data structures with rectangular shapes, such as dense nd-arrays and DataFrames. The data structures compose their data-(index-)space into several rectangular partitions which form a regular, multi-dimensional grid. |
| 9 | +Currently the focus is dense data structures with rectangular shapes, such as dense nd-arrays and DataFrames. The data structures compose their data-(index-)space into several rectangular partitions which form a multi-dimensional grid. |
10 | 10 |
|
11 | 11 | While the interface is designed to be generic enough to cover any distribution backend, the currently considered backends are Ray, Dask and MPI-like (SPMD assigning ids to each process/rank/PE).
|
12 | 12 |
|
13 | 13 | # Partitioned Interface
|
14 |
| -A conforming implementation of the partitioned-interface standard must provide and support a data structure object having a `__partitioned_interface__` method which returns a Python dictionary with the following fields: |
15 |
| -* `shape`: a tuple defining the number of partitions per dimension of the container's global data-(index-)space. |
16 |
| -* `partitions`: a dictionary mapping a position in the partition grid (as defined by `shape`) to a dictionary providing the partition object, the partition shape and locality information. |
17 |
| -* `locals`: Only for SPMD/MPI-like: list of the positions of the locally owned partitions. The positions serve as look-up keys in the `partitions` dictionary. Must not be available if not SPMD/MPI-like. |
18 |
| -* `get`: A callable converting a handle into a data object. |
| 14 | +A conforming implementation of the partitioned interface standard must provide and support a data structure object having a `__partitioned__` method which returns a Python dictionary with the following fields: |
| 15 | +* `shape`: a tuple defining the number of elements for each dimension of the global data-(index-)space. |
| 16 | +* `partition_tiling`: a tuple defining the number of partitions for each dimension of the container's global data-(index-)space. |
| 17 | +* `partitions`: a dictionary mapping a position in the partition grid (as defined by `partition_tiling`) to a dictionary providing the partition object, the partition shape and locality information. |
| 18 | +* `locals`: Only for SPMD/MPI-like implementations: list of the positions of the locally owned partitions. The positions serve as look-up keys in the `partitions` dictionary. Must not be available if not SPMD/MPI-like (such as when Ray- or Dask-based). |
| 19 | +* `get`: A callable converting a sequence of handles into a sequence of data objects. |
19 | 20 |
|
20 |
| -In addition to the above |
21 |
| -* required keys a container is encouraged to provide more information that could be potentially beneficial for consuming the distributed data structure. |
22 |
| -* the dictionary must be pickle'able |
| 21 | +The `__partitioned__` dictionary must be pickle'able. |
| 22 | + |
| 23 | +In addition to the above specification any implemention is encouraged to provide extra information which could be potentially beneficial for consuming the distributed data structure. |
23 | 24 |
|
24 | 25 | ## `shape`
|
25 |
| -The shape of the partition grid must be of the same dimensionality as the underlying data-structure. `shape` provides the number of partitions along each dimension. Specifying `1` in a given dimension means the dimension is not cut. |
| 26 | +The `shape` field provides the dimensionality and sizes per dimension of the underlying data-structure. |
| 27 | + |
| 28 | +## `partition_tiling` |
| 29 | +The shape of the partition grid must be of the same dimensionality as the underlying data-structure. `partition_tiling` provides the number of partitions along each dimension. Specifying `1` in a given dimension means the dimension is not cut. |
26 | 30 |
|
27 | 31 | ## `partitions`
|
28 |
| -A dictionary mapping a position in the grid to information for each partition. |
29 |
| -* All positions in the partition-grid as defined by `shape` must be present in the dictionary as a key. |
30 |
| -* The position in the grid (key) is provided as a tuple with the dimensionality as the partition grid. |
| 32 | +A dictionary mapping a position in the grid to detailed information about the partition at the given position. |
| 33 | +* All positions in the partition-grid as defined by `partition_tiling` must be present in the dictionary as a key. |
| 34 | +* The position in the grid (key) is provided as a tuple with the same dimensionality as the partition grid. |
31 | 35 |
|
32 | 36 | Each key/position maps to
|
33 |
| -* 'start': global start indices of the partition (same dimensionality as the underlying data-structure) given as tuple |
34 |
| -* 'shape': shape of the partition (same dimensionality as the shape of the partition grid) given as tuple |
35 |
| -* 'data': the actual data provided as ObjRef, Future, array or DF or... |
36 |
| -* 'location': The location (or home) of the partition, given as ip-address or rank or... |
| 37 | +* 'start': The offset of the starting element in the partition from the first element in the global index space of the underlying data-structure, given as a tuple |
| 38 | +* 'shape': Shape of the partition (same dimensionality as the shape of the partition grid) given as a tuple |
| 39 | +* 'data': The actual data provided as ObjRef, Future, array or DF or... |
| 40 | +* 'location': The location (or home) of the partition, given as a sequence of ip-addresses or ranks or... |
37 | 41 |
|
38 |
| -A consumer must verify it supports the provided object types and locality information and should throw and exception if not. |
| 42 | +A consumer must verify it supports the provided object types and locality information; it must throw an exception if not. |
39 | 43 |
|
40 |
| -In addition to the above required keys a container is encouraged to provide more information that could be potentially beneficial for consuming the distributed data structure. For example, a DataFrame structure might add the row and column labels for any given partition. |
| 44 | +In addition to the above required keys a container is encouraged to provide more information which could be potentially beneficial for consuming the distributed data structure. For example, a DataFrame structure might add the row and column labels for any given partition. |
41 | 45 |
|
42 | 46 | * `start`
|
43 |
| - * The offset of the starting element in the partition from first element in the global index space of the underlying data-structure. It has the same dimensionality as the underlying data-structure and is given as a tuple. The stop indices are computed as the sum of `start` and `shape`. |
| 47 | + * The offset of the starting element in the partition from first element in the global index space of the underlying data-structure. It has the same dimensionality as the underlying data-structure and is given as a tuple. The stop indices can be computed by the sum of `start` and `shape`. |
44 | 48 | * `shape`
|
45 |
| - * The shape of the partition has the same dimensionality as the underlying data-structure and is given as tuple. |
| 49 | + * Number of elements in each dimension. The shape of the partition has the same dimensionality as the underlying data-structure and is given as tuple. |
46 | 50 | * `data`
|
| 51 | + * The actual data of the partition, potentially provided as a handle. |
47 | 52 | * All data/partitions must be of the same type.
|
48 |
| - * The partition type is opaque to the `__partitioned_interface__` |
| 53 | + * The actual partition type is opaque to the `__partitioned__` |
49 | 54 | * The consumer needs to deal with it like it would in a non-partitioned case. For example, if the consumer can deal with array it could check for the existence of the array interface. Other conforming consumers could hard-code a very specific type check.
|
50 |
| - * When the underlying backend supports it and for all non-SPMD backends partitions must be provided as references. This avoids unnecessary data movement. |
| 55 | + * Whenever possible the data should be provided as references. References are mandatory of non-SPMD backedns. This avoids unnecessary data movement. |
51 | 56 | * Ray: ray.ObjectRef
|
52 | 57 | * Dask: dask.Future
|
53 |
| - * It is recommended to access the actual data through the callable in the 'get' field of `__partitioned__` differentiating between different handle types can be avoided and type checks can be limited to basic types like pandas.DataFrame and numpy.ndarray. |
| 58 | + * It is recommended to access the actual data through the callable in the 'get' field of `__partitioned__`. This allows consumers to avoid differentiating between different handle types and type checks can be limited to basic types like pandas.DataFrame and numpy.ndarray. |
54 | 59 |
|
55 |
| - * For SPMD-MPI-like backends: partitions which are not locally available may be `None`. This is the recommended behavior unless the underlying backend supports references such as promises to avoid unnecessary data movement. |
| 60 | + * For SPMD-MPI-like backends: partitions which are not locally available may be `None`. This is the recommended behavior unless the underlying backend supports references (such as promises/futures) to avoid unnecessary data movement. |
56 | 61 | * `location`
|
| 62 | + * A sequence of locations where data can be accessed locally, e.g. without extra communication |
57 | 63 | * The location information must include all necessary data to uniquely identify the location of the partition/data. The exact information depends on the underlying distribution system:
|
58 | 64 | * Ray: ip-address
|
59 | 65 | * Dask: worker-Id (name, ip, or ip:port)
|
60 | 66 | * SPMD/MPI-like frameworks such as MPI, SHMEM etc.: rank
|
61 | 67 |
|
62 | 68 | ## `get`
|
63 |
| -This provides a callable which returns raw data object when called with a handle provided in the `data` field of an entry in `partition`. Raw data objects are standard data structures like pandas.DataFrame and numpy.ndarray. |
| 69 | +A callable must return raw data object when called with a handle (or sequence of handles) provided in the `data` field of an entry in `partition`. Raw data objects are standard data structures like pandas.DataFrame and numpy.ndarray. |
64 | 70 |
|
65 | 71 | ## `locals`
|
66 |
| -This is basically a short-cut for SPMD environments which allows processes/ranks to quickly extract the local partition. It saves processes from parsing the `partitions` dictionary for the local rank/address which is helpful when the number of ranks/processes/PEs is large. |
| 72 | +The short-cut for SPMD environments allows processes/ranks to quickly extract their local partition. It saves processes from parsing the `partitions` dictionary for the local rank/address which is helpful when the number of ranks/processes/PEs is large. |
67 | 73 |
|
68 | 74 |
|
69 | 75 | ## Examples
|
70 | 76 | ### 1d-data-structure (64 elements), 1d-partition-grid, 4 partitions on 4 nodes, blocked distribution, partitions are of type `Ray.ObjRef`, Ray
|
71 | 77 | ```python
|
72 | 78 | __partitioned_interface__ = {
|
73 |
| - 'shape': (4,), |
| 79 | + 'shape': (64,), |
| 80 | + 'partition_tiling': (4,), |
74 | 81 | 'partitions': {
|
75 | 82 | (0,): {
|
76 | 83 | 'start': (0,),
|
77 | 84 | 'shape': (16,),
|
78 | 85 | 'data': ObjRef0,
|
79 |
| - 'location': '1.1.1.1’, } |
| 86 | + 'location': ['1.1.1.1’], } |
80 | 87 | (1,): {
|
81 | 88 | 'start': (16,),
|
82 | 89 | 'shape': (16,),
|
83 | 90 | 'data': ObjRef1,
|
84 |
| - 'location': '1.1.1.2’, } |
| 91 | + 'location': ['1.1.1.2’], } |
85 | 92 | (2,): {
|
86 | 93 | 'start': (32,),
|
87 | 94 | 'shape': (16,),
|
88 | 95 | 'data': ObjRef2,
|
89 |
| - 'location': '1.1.1.3’, } |
| 96 | + 'location': ['1.1.1.3’], } |
90 | 97 | (3,): {
|
91 | 98 | 'start': (48,),
|
92 | 99 | 'shape': (16,),
|
93 | 100 | 'data': ObjRef3,
|
94 |
| - 'location': '1.1.1.4’, } |
| 101 | + 'location': ['1.1.1.4’], } |
95 | 102 | },
|
96 | 103 | 'get': lambda x: ray.get(x)
|
97 | 104 | }
|
98 | 105 | ```
|
99 | 106 | ### 2d-structure (64 elements), 2d-partition-grid, 4 partitions on 2 nodes, block-cyclic distribution, partitions are of type `dask.Future`, dask
|
100 | 107 | ```python
|
101 | 108 | __partitioned_interface__ = {
|
102 |
| - 'shape’: (2,2), |
| 109 | + 'shape’: (8, 8), |
| 110 | + 'partition_tiling’: (2, 2), |
103 | 111 | 'partitions’: {
|
104 | 112 | (1,1): {
|
105 | 113 | 'start': (4, 4),
|
106 | 114 | 'shape': (4, 4),
|
107 | 115 | 'data': future0,
|
108 |
| - 'location': 'Alice’, }, |
| 116 | + 'location': ['Alice’], }, |
109 | 117 | (1,0): {
|
110 | 118 | 'start': (4, 0),
|
111 | 119 | 'shape': (4, 4),
|
112 | 120 | 'data': future1,
|
113 |
| - 'location': '1.1.1.2:55667’, }, |
| 121 | + 'location': ['1.1.1.2:55667’], }, |
114 | 122 | (0,1): {
|
115 | 123 | 'start': (0, 4),
|
116 | 124 | 'shape': (4, 4),
|
117 | 125 | 'data': future2,
|
118 |
| - 'location': 'Alice’, }, |
| 126 | + 'location': ['Alice’], }, |
119 | 127 | (0,0): {
|
120 | 128 | 'start': (0,0),
|
121 | 129 | 'shape': (4, 4),
|
122 | 130 | 'data': future3,
|
123 |
| - 'location': '1.1.1.2:55667’, }, |
| 131 | + 'location': ['1.1.1.2:55667’], }, |
124 | 132 | }
|
125 | 133 | 'get': lambda x: x.result()
|
126 | 134 | }
|
127 | 135 | ```
|
128 | 136 | ### 2d-structure (64 elements), 1d-partition-grid, 4 partitions on 2 ranks, row-block-cyclic distribution, partitions are of type `pandas.DataFrame`, MPI
|
129 | 137 | ```python
|
130 | 138 | __partitioned_interface__ = {
|
131 |
| - 'shape’: (4,1), |
| 139 | + 'shape’: (8, 8), |
| 140 | + 'partition_tiling’: (4, 1), |
132 | 141 | 'partitions’: {
|
133 | 142 | (0,0): {
|
134 | 143 | 'start': (0, 0),
|
135 | 144 | 'shape': (2, 8),
|
136 | 145 | 'data': df0, # this is for rank 0, for rank 1 it'd be None
|
137 |
| - 'location': 0, }, |
| 146 | + 'location': [0], }, |
138 | 147 | (1,0): {
|
139 |
| - 'start': (16, 0), |
| 148 | + 'start': (2, 0), |
140 | 149 | 'shape': (2, 8),
|
141 | 150 | 'data': None, # this is for rank 0, for rank 1 it'd be df1
|
142 |
| - 'location': 1, }, |
| 151 | + 'location': [1], }, |
143 | 152 | (2,0): {
|
144 |
| - 'start': (32, 0), |
| 153 | + 'start': (4, 0), |
145 | 154 | 'shape': (2, 8),
|
146 | 155 | 'data': df2, # this is for rank 0, for rank 1 it'd be None
|
147 |
| - 'location': 0, }, |
| 156 | + 'location': [0], }, |
148 | 157 | (3,0): {
|
149 |
| - 'start': (48, 0), |
| 158 | + 'start': (6, 0), |
150 | 159 | 'shape': (2, 8),
|
151 | 160 | 'data': None, # this is for rank 0, for rank 1 it'd be df3
|
152 |
| - 'location': 1, }, |
| 161 | + 'location': [1], }, |
153 | 162 | },
|
154 | 163 | 'get': lambda x: x,
|
155 | 164 | 'locals': [(0,0), (2,0)] # this is for rank 0, for rank 1 it'd be [(1,0), (3,0)]
|
|
0 commit comments