diff --git a/bolt/__init__.py b/bolt/__init__.py index ef75cc6..9892b82 100644 --- a/bolt/__init__.py +++ b/bolt/__init__.py @@ -1,3 +1,3 @@ -from bolt.factory import array, ones, zeros, concatenate +from bolt.array.construct import array, ones, zeros, concatenate __version__ = '0.7.1' diff --git a/bolt/local/__init__.py b/bolt/array/__init__.py similarity index 100% rename from bolt/local/__init__.py rename to bolt/array/__init__.py diff --git a/bolt/spark/array.py b/bolt/array/array.py similarity index 96% rename from bolt/spark/array.py rename to bolt/array/array.py index 0bc3cbb..adeef53 100644 --- a/bolt/spark/array.py +++ b/bolt/array/array.py @@ -3,14 +3,13 @@ r_, sort, argsort, array, random, arange, ones, expand_dims, sum from itertools import groupby -from bolt.base import BoltArray -from bolt.spark.stack import StackedArray -from bolt.spark.utils import zip_with_index -from bolt.spark.statcounter import StatCounter +from bolt.array.stack import StackedArray +from bolt.array.utils import zip_with_index +from bolt.array.statcounter import StatCounter from bolt.utils import slicify, listify, tupleize, argpack, inshape, istransposeable, isreshapeable -class BoltArraySpark(BoltArray): +class BoltArray(object): _metadata = { '_shape': None, @@ -24,12 +23,25 @@ def __init__(self, rdd, shape=None, split=None, dtype=None, ordered=True): self._shape = shape self._split = split self._dtype = dtype - self._mode = 'spark' self._ordered = ordered + def __finalize__(self, other): + if isinstance(other, BoltArray): + for name in self._metadata: + other_attr = getattr(other, name, None) + if (other_attr is not self._metadata[name]) \ + and (getattr(self, name, None) is self._metadata[name]): + object.__setattr__(self, name, other_attr) + return self + + def __repr__(self): + s = "BoltArray\n" + s += "shape: %s\n" % str(self.shape) + return s + @property def _constructor(self): - return BoltArraySpark + return BoltArray def __array__(self): return self.toarray() @@ -98,7 +110,7 @@ def _align(self, axis): Returns ------- - BoltArraySpark + BoltArray """ # ensure that the specified axes are valid inshape(self.shape, axis) @@ -149,7 +161,7 @@ def map(self, func, axis=(0,), value_shape=None, dtype=None, with_keys=False): Returns ------- - BoltArraySpark + BoltArray """ axis = tupleize(axis) swapped = self._align(axis) @@ -212,7 +224,7 @@ def filter(self, func, axis=(0,), sort=False): Returns ------- - BoltArraySpark + BoltArray """ axis = tupleize(axis) @@ -259,9 +271,8 @@ def reduce(self, func, axis=(0,), keepdims=False): Returns ------- - BoltArraySpark + BoltArray """ - from bolt.local.array import BoltArrayLocal from numpy import ndarray axis = tupleize(axis) @@ -279,7 +290,7 @@ def reduce(self, func, axis=(0,), keepdims=False): # ndarrays with single values in them should be converted into scalars return arr[0] - return BoltArrayLocal(arr) + return arr def _stat(self, axis=None, func=None, name=None, keepdims=False): """ @@ -295,7 +306,7 @@ def _stat(self, axis=None, func=None, name=None, keepdims=False): will compute over all axes func : function, optional, default=None - Function for reduce, see BoltArraySpark.reduce + Function for reduce, see BoltArray.reduce name : str A named statistic, see StatCounter @@ -311,8 +322,6 @@ def _stat(self, axis=None, func=None, name=None, keepdims=False): return self.reduce(func, axis, keepdims) if name and not func: - from bolt.local.array import BoltArrayLocal - swapped = self._align(axis) def reducer(left, right): @@ -328,7 +337,7 @@ def reducer(left, right): for i in axis: arr = expand_dims(arr, axis=i) - return BoltArrayLocal(arr).toscalar() + return arr else: raise ValueError('Must specify either a function or a statistic name.') @@ -432,7 +441,7 @@ def concatenate(self, arry, axis=0): Paramters --------- - arry : ndarray, BoltArrayLocal, or BoltArraySpark + arry : ndarray, or BoltArray Another array to concatenate with axis : int, optional, default=0 @@ -440,13 +449,13 @@ def concatenate(self, arry, axis=0): Returns ------- - BoltArraySpark + BoltArray """ if isinstance(arry, ndarray): - from bolt.spark.construct import ConstructSpark - arry = ConstructSpark.array(arry, self._rdd.context, axis=range(0, self.split)) + from bolt.array.construct import array + arry = array(arry, self._rdd.context, axis=range(0, self.split)) else: - if not isinstance(arry, BoltArraySpark): + if not isinstance(arry, BoltArray): raise ValueError("other must be local array or spark array, got %s" % type(arry)) if not all([x == y if not i == axis else True @@ -708,7 +717,7 @@ def chunk(self, size="150", axis=None, padding=None): axis = tupleize((axis)) padding = tupleize((padding)) - from bolt.spark.chunk import ChunkedArray + from bolt.array.chunk import ChunkedArray chnk = ChunkedArray(rdd=self._rdd, shape=self._shape, split=self._split, dtype=self._dtype) return chnk._chunk(size, axis, padding) @@ -739,7 +748,7 @@ def swap(self, kaxes, vaxes, size="150"): Returns ------- - BoltArraySpark + BoltArray """ kaxes = asarray(tupleize(kaxes), 'int') vaxes = asarray(tupleize(vaxes), 'int') @@ -753,7 +762,7 @@ def swap(self, kaxes, vaxes, size="150"): if len(kaxes) == 0 and len(vaxes) == 0: return self - from bolt.spark.chunk import ChunkedArray + from bolt.array.chunk import ChunkedArray chunks = self.chunk(size) @@ -853,7 +862,7 @@ def reshape(self, *shape): i = self._reshapebasic(new) if i == -1: raise NotImplementedError("Currently no support for reshaping between " - "keys and values for BoltArraySpark") + "keys and values for BoltArray") else: new_key_shape, new_value_shape = new[:i], new[i:] return self.keys.reshape(new_key_shape).values.reshape(new_value_shape) @@ -988,21 +997,14 @@ def keys(self): """ Returns a restricted keys. """ - from bolt.spark.shapes import Keys + from bolt.array.shapes import Keys return Keys(self) @property def values(self): - from bolt.spark.shapes import Values + from bolt.array.shapes import Values return Values(self) - def tolocal(self): - """ - Returns a local bolt array by first collecting as an array. - """ - from bolt.local.array import BoltArrayLocal - return BoltArrayLocal(self.toarray()) - def toarray(self): """ Returns the contents as a local array. diff --git a/bolt/spark/chunk.py b/bolt/array/chunk.py similarity index 98% rename from bolt/spark/chunk.py rename to bolt/array/chunk.py index 25ec393..e8cfae9 100644 --- a/bolt/spark/chunk.py +++ b/bolt/array/chunk.py @@ -5,12 +5,12 @@ from itertools import product from bolt.utils import tuplesort, tupleize, allstack, iterexpand -from bolt.spark.array import BoltArraySpark +from bolt.array.array import BoltArray class ChunkedArray(object): """ - Wraps a BoltArraySpark and provides an interface for chunking + Wraps a BoltArray and provides an interface for chunking into subarrays and performing operations on chunks. Many methods will be restricted until the chunked array is unchunked. @@ -196,7 +196,7 @@ def _unchunk(it): else: newshape = self.shape - return BoltArraySpark(rdd, shape=newshape, split=self._split, + return BoltArray(rdd, shape=newshape, split=self._split, dtype=self.dtype, ordered=ordered) def keys_to_values(self, axes, size=None): @@ -416,7 +416,7 @@ def map_generic(self, func): """ Apply a generic array -> object to each subarray - The resulting object is a BoltArraySpark of dtype object where the + The resulting object is a BoltArray of dtype object where the blocked dimensions are replaced with indices indication block ID. """ def process_record(val): @@ -429,7 +429,7 @@ def process_record(val): nchunks = self.getnumber(self.plan, self.vshape) newshape = tuple([int(s) for s in r_[self.kshape, nchunks]]) newsplit = len(self.shape) - return BoltArraySpark(rdd, shape=newshape, split=newsplit, ordered=self._ordered, dtype="object") + return BoltArray(rdd, shape=newshape, split=newsplit, ordered=self._ordered, dtype="object") def getplan(self, size="150", axes=None, padding=None): """ diff --git a/bolt/array/construct.py b/bolt/array/construct.py new file mode 100644 index 0000000..74cb6bd --- /dev/null +++ b/bolt/array/construct.py @@ -0,0 +1,212 @@ +from numpy import unravel_index, prod, arange, asarray, float64 + +from itertools import product + +from bolt.array.array import BoltArray +from bolt.array.utils import get_kv_shape, get_kv_axes + + +def array(a, context=None, axis=(0,), dtype=None, npartitions=None): + """ + Create a spark bolt array from a local array. + + Parameters + ---------- + a : array-like + An array, any object exposing the array interface, an + object whose __array__ method returns an array, or any + (nested) sequence. + + context : SparkContext + A context running Spark. (see pyspark) + + axis : tuple, optional, default=(0,) + Which axes to distribute the array along. The resulting + distributed object will use keys to represent these axes, + with the remaining axes represented by values. + + dtype : data-type, optional, default=None + The desired data-type for the array. If None, will + be determined from the data. (see numpy) + + npartitions : int + Number of partitions for parallization. + + Returns + ------- + BoltArray + """ + if dtype is None: + arry = asarray(a) + dtype = arry.dtype + else: + arry = asarray(a, dtype) + shape = arry.shape + ndim = len(shape) + + # handle the axes specification and transpose if necessary + axes = _format_axes(axis, arry.shape) + key_axes, value_axes = get_kv_axes(arry.shape, axes) + permutation = key_axes + value_axes + arry = arry.transpose(*permutation) + split = len(axes) + + if split < 1: + raise ValueError("split axis must be greater than 0, got %g" % split) + if split > len(shape): + raise ValueError("split axis must not exceed number of axes %g, got %g" % (ndim, split)) + + key_shape = shape[:split] + val_shape = shape[split:] + + keys = zip(*unravel_index(arange(0, int(prod(key_shape))), key_shape)) + vals = arry.reshape((prod(key_shape),) + val_shape) + + rdd = context.parallelize(zip(keys, vals), npartitions) + return BoltArray(rdd, shape=shape, split=split, dtype=dtype) + +def ones(shape, context=None, axis=(0,), dtype=float64, npartitions=None): + """ + Create a spark bolt array of ones. + + Parameters + ---------- + shape : tuple + The desired shape of the array. + + context : SparkContext + A context running Spark. (see pyspark) + + axis : tuple, optional, default=(0,) + Which axes to distribute the array along. The resulting + distributed object will use keys to represent these axes, + with the remaining axes represented by values. + + dtype : data-type, optional, default=float64 + The desired data-type for the array. If None, will + be determined from the data. (see numpy) + + npartitions : int + Number of partitions for parallization. + + Returns + ------- + BoltArray + """ + from numpy import ones + return _wrap(ones, shape, context, axis, dtype, npartitions) + +def zeros(shape, context=None, axis=(0,), dtype=float64, npartitions=None): + """ + Create a spark bolt array of zeros. + + Parameters + ---------- + shape : tuple + The desired shape of the array. + + context : SparkContext + A context running Spark. (see pyspark) + + axis : tuple, optional, default=(0,) + Which axes to distribute the array along. The resulting + distributed object will use keys to represent these axes, + with the remaining axes represented by values. + + dtype : data-type, optional, default=float64 + The desired data-type for the array. If None, will + be determined from the data. (see numpy) + + npartitions : int + Number of partitions for parallization. + + Returns + ------- + BoltArray + """ + from numpy import zeros + return _wrap(zeros, shape, context, axis, dtype, npartitions) + +def concatenate(arrays, axis=0): + """ + Join two bolt arrays together, at least one of which is in spark. + + Parameters + ---------- + arrays : tuple + A pair of arrays. At least one must be a spark array, + the other can be a local bolt array, a local numpy array, + or an array-like. + + axis : int, optional, default=0 + The axis along which the arrays will be joined. + + Returns + ------- + BoltArray + """ + if not isinstance(arrays, tuple): + raise ValueError("data type not understood") + if not len(arrays) == 2: + raise NotImplementedError("spark concatenation only supports two arrays") + + first, second = arrays + if isinstance(first, BoltArray): + return first.concatenate(second, axis) + elif isinstance(second, BoltArray): + first = array(first, second._rdd.context) + return first.concatenate(second, axis) + else: + raise ValueError("at least one array must be a spark bolt array") + +def _argcheck(*args, **kwargs): + """ + Check that arguments are consistent with spark array construction. + + Conditions are: + (1) a positional argument is a SparkContext + (2) keyword arg 'context' is a SparkContext + (3) an argument is a BoltArray, or + (4) an argument is a nested list containing a BoltArray + """ + try: + from pyspark import SparkContext + except ImportError: + return False + + cond1 = any([isinstance(arg, SparkContext) for arg in args]) + cond2 = isinstance(kwargs.get('context', None), SparkContext) + cond3 = any([isinstance(arg, BoltArray) for arg in args]) + cond4 = any([any([isinstance(sub, BoltArray) for sub in arg]) + if isinstance(arg, (tuple, list)) else False for arg in args]) + return cond1 or cond2 or cond3 or cond4 + +def _format_axes(axes, shape): + """ + Format target axes given an array shape + """ + if isinstance(axes, int): + axes = (axes,) + elif isinstance(axes, list) or hasattr(axes, '__iter__'): + axes = tuple(axes) + if not isinstance(axes, tuple): + raise ValueError("axes argument %s in the constructor not specified correctly" % str(axes)) + if min(axes) < 0 or max(axes) > len(shape) - 1: + raise ValueError("invalid key axes %s given shape %s" % (str(axes), str(shape))) + return axes + +def _wrap(func, shape, context=None, axis=(0,), dtype=None, npartitions=None): + """ + Wrap an existing numpy constructor in a parallelized construction + """ + if isinstance(shape, int): + shape = (shape,) + key_shape, value_shape = get_kv_shape(shape, _format_axes(axis, shape)) + split = len(key_shape) + + # make the keys + rdd = context.parallelize(list(product(*[arange(x) for x in key_shape])), npartitions) + + # use a map to make the arrays in parallel + rdd = rdd.map(lambda x: (x, func(value_shape, dtype, order='C'))) + return BoltArray(rdd, shape=shape, split=split, dtype=dtype) diff --git a/bolt/spark/shapes.py b/bolt/array/shapes.py similarity index 78% rename from bolt/spark/shapes.py rename to bolt/array/shapes.py index 0624a8d..78794d7 100644 --- a/bolt/spark/shapes.py +++ b/bolt/array/shapes.py @@ -1,12 +1,12 @@ from numpy import unravel_index, ravel_multi_index from bolt.utils import argpack, istransposeable, isreshapeable -from bolt.spark.array import BoltArraySpark +from bolt.array.array import BoltArray class Shapes(object): """ - Base Shape class. These classes wrap a BoltArraySpark in their + Base Shape class. These classes wrap a BoltArray in their entirity, but implement the following attributes and methods as if they were only working on the keys or the values, depending which subclass is used. @@ -28,7 +28,7 @@ def transpose(self): class Keys(Shapes): """ This class implements all the base shape attributes and methods - for the keys of a BoltArraySpark. + for the keys of a BoltArray. """ def __init__(self, barray): self._barray = barray @@ -39,8 +39,8 @@ def shape(self): def reshape(self, *shape): """ - Reshape just the keys of a BoltArraySpark, returning a - new BoltArraySpark. + Reshape just the keys of a BoltArray, returning a + new BoltArray. Parameters ---------- @@ -61,12 +61,12 @@ def f(k): newsplit = len(new) newshape = new + self._barray.values.shape - return BoltArraySpark(newrdd, shape=newshape, split=newsplit).__finalize__(self._barray) + return BoltArray(newrdd, shape=newshape, split=newsplit).__finalize__(self._barray) def transpose(self, *axes): """ - Transpose just the keys of a BoltArraySpark, returning a - new BoltArraySpark. + Transpose just the keys of a BoltArray, returning a + new BoltArray. Parameters ---------- @@ -86,7 +86,7 @@ def f(k): newrdd = self._barray._rdd.map(lambda kv: (f(kv[0]), kv[1])) newshape = tuple(self.shape[i] for i in new) + self._barray.values.shape - return BoltArraySpark(newrdd, shape=newshape, ordered=False).__finalize__(self._barray) + return BoltArray(newrdd, shape=newshape, ordered=False).__finalize__(self._barray) def __str__(self): s = "BoltArray Keys\n" @@ -99,7 +99,7 @@ def __repr__(self): class Values(Shapes): """ This class implements all the base shape attributes and methods - for the values of a BoltArraySpark. + for the values of a BoltArray. """ def __init__(self, barray): self._barray = barray @@ -110,8 +110,8 @@ def shape(self): def reshape(self, *shape): """ - Reshape just the values of a BoltArraySpark, returning a - new BoltArraySpark. + Reshape just the values of a BoltArray, returning a + new BoltArray. Parameters ---------- @@ -131,12 +131,12 @@ def f(v): newrdd = self._barray._rdd.mapValues(f) newshape = self._barray.keys.shape + new - return BoltArraySpark(newrdd, shape=newshape).__finalize__(self._barray) + return BoltArray(newrdd, shape=newshape).__finalize__(self._barray) def transpose(self, *axes): """ - Transpose just the values of a BoltArraySpark, returning a - new BoltArraySpark. + Transpose just the values of a BoltArray, returning a + new BoltArray. Parameters ---------- @@ -156,7 +156,7 @@ def f(v): newrdd = self._barray._rdd.mapValues(f) newshape = self._barray.keys.shape + tuple(self.shape[i] for i in new) - return BoltArraySpark(newrdd, shape=newshape).__finalize__(self._barray) + return BoltArray(newrdd, shape=newshape).__finalize__(self._barray) def __str__(self): s = "BoltArray Values\n" diff --git a/bolt/spark/stack.py b/bolt/array/stack.py similarity index 93% rename from bolt/spark/stack.py rename to bolt/array/stack.py index cf5ddb8..9858390 100644 --- a/bolt/spark/stack.py +++ b/bolt/array/stack.py @@ -1,9 +1,9 @@ from numpy import asarray, ndarray, concatenate -from bolt.spark.utils import zip_with_index +from bolt.array.utils import zip_with_index class StackedArray(object): """ - Wraps a BoltArraySpark and provides an interface for performing + Wraps a BoltArray and provides an interface for performing stacked operations (operations on aggregated subarrays). Many methods will be restricted or forbidden until the Stacked object is unstacked. Currently, only map() is implemented. The rationale @@ -69,16 +69,16 @@ def tostacks(partition): def unstack(self): """ - Unstack array and return a new BoltArraySpark via flatMap(). + Unstack array and return a new BoltArray via flatMap(). """ - from bolt.spark.array import BoltArraySpark + from bolt.array.array import BoltArray if self._rekeyed: rdd = self._rdd else: rdd = self._rdd.flatMap(lambda kv: zip(kv[0], list(kv[1]))) - return BoltArraySpark(rdd, shape=self.shape, split=self.split) + return BoltArray(rdd, shape=self.shape, split=self.split) def map(self, func): """ @@ -86,7 +86,7 @@ def map(self, func): Parameters ---------- - func : function + func : function This is applied to each value in the intermediate RDD. Returns diff --git a/bolt/spark/statcounter.py b/bolt/array/statcounter.py similarity index 100% rename from bolt/spark/statcounter.py rename to bolt/array/statcounter.py diff --git a/bolt/spark/utils.py b/bolt/array/utils.py similarity index 100% rename from bolt/spark/utils.py rename to bolt/array/utils.py diff --git a/bolt/base.py b/bolt/base.py deleted file mode 100644 index 240d926..0000000 --- a/bolt/base.py +++ /dev/null @@ -1,158 +0,0 @@ -class BoltArray(object): - - _mode = None - _metadata = {} - - def __finalize__(self, other): - if isinstance(other, BoltArray): - for name in self._metadata: - other_attr = getattr(other, name, None) - if (other_attr is not self._metadata[name]) \ - and (getattr(self, name, None) is self._metadata[name]): - object.__setattr__(self, name, other_attr) - return self - - @property - def mode(self): - return self._mode - - @property - def shape(self): - """ - Size of each dimension. - """ - raise NotImplementedError - - @property - def size(self): - """ - Total number of elements. - """ - raise NotImplementedError - - @property - def ndim(self): - """ - Number of dimensions. - """ - raise NotImplementedError - - @property - def dtype(self): - """ - Data-type of array. - """ - raise NotImplementedError - - @property - def _constructor(self): - return None - - def sum(self, axis): - """ - Return the sum of the array elements over the given axis. - """ - raise NotImplementedError - - def mean(self, axis): - """ - Return the mean of the array elements over the given axis. - """ - raise NotImplementedError - - def var(self, axis): - """ - Return the variance of the array elements over the given axis. - """ - raise NotImplementedError - - def std(self, axis): - """ - Return the standard deviation of the array elements over the given axis. - """ - raise NotImplementedError - - def min(self, axis): - """ - Return the minimum of the array elements over the given axis or axes. - """ - raise NotImplementedError - - def max(self, axis): - """ - Return the maximum of the array elements over the given axis or axes. - """ - raise NotImplementedError - - def concatenate(self, arry, axis): - raise NotImplementedError - - def transpose(self, axis): - """ - Return an array with the axes transposed. - """ - raise NotImplementedError - - @property - def T(self): - """ - Transpose by reversing the order of the axes. - """ - raise NotImplementedError - - def reshape(self, axis): - """ - Return an array with the same data but a new shape. - """ - raise NotImplementedError - - def squeeze(self, axis): - """ - Remove one or more single-dimensional axes from the array. - """ - raise NotImplementedError - - def swapaxes(self, axis1, axis2): - """ - Return an array with two axes interchanged. - """ - raise NotImplementedError - - def astype(self, dtype, casting): - """ - Cast the array to a specified type. - """ - raise NotImplementedError - - def __getitem__(self, index): - raise NotImplementedError - - def map(self, func, axis): - """ - Apply a function across one or more axes. - """ - raise NotImplementedError - - def reduce(self, func, axis, keepdims): - """ - Reduce an array across one or more axes. - """ - raise NotImplementedError - - def filter(self, func, axis): - """ - Filter an array across one or more axes. - """ - raise NotImplementedError - - def first(self): - """ - Return the first element of the array - """ - raise NotImplementedError - - def __repr__(self): - s = "BoltArray\n" - s += "mode: %s\n" % self._mode - s += "shape: %s\n" % str(self.shape) - return s diff --git a/bolt/construct.py b/bolt/construct.py deleted file mode 100644 index af865e3..0000000 --- a/bolt/construct.py +++ /dev/null @@ -1,12 +0,0 @@ -class ConstructBase(object): - - @classmethod - def dispatch(cls, method, *args, **kwargs): - if method in cls.__dict__: - return cls.__dict__[method].__func__(*args, **kwargs) - else: - raise NotImplementedError("Method %s not implemented on %s" % (method, cls.__name__)) - - @staticmethod - def _argcheck(*args, **kwargs): - return False diff --git a/bolt/factory.py b/bolt/factory.py deleted file mode 100644 index e8e781b..0000000 --- a/bolt/factory.py +++ /dev/null @@ -1,83 +0,0 @@ -from bolt.local.construct import ConstructLocal -from bolt.spark.construct import ConstructSpark - -constructors = [ - ('local', ConstructLocal), - ('spark', ConstructSpark) -] - -def wrapped(f): - """ - Decorator to append routed docstrings - """ - import inspect - - def extract(func): - append = "" - args = inspect.getargspec(func) - for i, a in enumerate(args.args): - if i < (len(args) - len(args.defaults)): - append += str(a) + ", " - else: - default = args.defaults[i-len(args.defaults)] - if hasattr(default, "__name__"): - default = default.__name__ - else: - default = str(default) - append += str(a) + "=" + default + ", " - append = append[:-2] + ")" - return append - - doc = f.__doc__ + "\n" - doc += " local -> array(" + extract(getattr(ConstructLocal, f.__name__)) + "\n" - doc += " spark -> array(" + extract(getattr(ConstructSpark, f.__name__)) + "\n" - f.__doc__ = doc - return f - -def lookup(*args, **kwargs): - """ - Use arguments to route constructor. - - Applies a series of checks on arguments to identify constructor, - starting with known keyword arguments, and then applying - constructor-specific checks - """ - if 'mode' in kwargs: - mode = kwargs['mode'] - if mode not in constructors: - raise ValueError('Mode %s not supported' % mode) - del kwargs['mode'] - return constructors[mode] - else: - for mode, constructor in constructors: - if constructor._argcheck(*args, **kwargs): - return constructor - return ConstructLocal - -@wrapped -def array(*args, **kwargs): - """ - Create a bolt array. - """ - return lookup(*args, **kwargs).dispatch('array', *args, **kwargs) - -@wrapped -def ones(*args, **kwargs): - """ - Create a bolt array of ones. - """ - return lookup(*args, **kwargs).dispatch('ones', *args, **kwargs) - -@wrapped -def zeros(*args, **kwargs): - """ - Create a bolt array of zeros. - """ - return lookup(*args, **kwargs).dispatch('zeros', *args, **kwargs) - -@wrapped -def concatenate(*args, **kwargs): - """ - Create a bolt array of ones. - """ - return lookup(*args, **kwargs).dispatch('concatenate', *args, **kwargs) \ No newline at end of file diff --git a/bolt/local/array.py b/bolt/local/array.py deleted file mode 100644 index daf0eaf..0000000 --- a/bolt/local/array.py +++ /dev/null @@ -1,255 +0,0 @@ -from __future__ import print_function -from numpy import ndarray, asarray, ufunc, prod -from bolt.base import BoltArray -from bolt.utils import inshape, tupleize -from functools import reduce - - -class BoltArrayLocal(ndarray, BoltArray): - - def __new__(cls, array): - obj = asarray(array).view(cls) - obj._mode = 'local' - return obj - - def __array_finalize__(self, obj): - if obj is None: - return - self._mode = getattr(obj, 'mode', None) - - def __array_wrap__(self, obj): - if obj.shape == (): - return obj[()] - else: - return ndarray.__array_wrap__(self, obj) - - @property - def _constructor(self): - return BoltArrayLocal - - def _align(self, axes, key_shape=None): - """ - Align local bolt array so that axes for iteration are in the keys. - - This operation is applied before most functional operators. - It ensures that the specified axes are valid, and might transpose/reshape - the underlying array so that the functional operators can be applied - over the correct records. - - Parameters - ---------- - axes: tuple[int] - One or more axes that will be iterated over by a functional operator - - Returns - ------- - BoltArrayLocal - """ - - # ensure that the key axes are valid for an ndarray of this shape - inshape(self.shape, axes) - - # compute the set of dimensions/axes that will be used to reshape - remaining = [dim for dim in range(len(self.shape)) if dim not in axes] - key_shape = key_shape if key_shape else [self.shape[axis] for axis in axes] - remaining_shape = [self.shape[axis] for axis in remaining] - linearized_shape = [prod(key_shape)] + remaining_shape - - # compute the transpose permutation - transpose_order = axes + remaining - - # transpose the array so that the keys being mapped over come first, then linearize keys - reshaped = self.transpose(*transpose_order).reshape(*linearized_shape) - - return reshaped - - def filter(self, func, axis=(0,)): - """ - Filter array along an axis. - - Applies a function which should evaluate to boolean, - along a single axis or multiple axes. Array will be - aligned so that the desired set of axes are in the - keys, which may require a transpose/reshape. - - Parameters - ---------- - func : function - Function to apply, should return boolean - - axis : tuple or int, optional, default=(0,) - Axis or multiple axes to filter along. - - Returns - ------- - BoltArrayLocal - """ - axes = sorted(tupleize(axis)) - reshaped = self._align(axes) - - filtered = asarray(list(filter(func, reshaped))) - - return self._constructor(filtered) - - def map(self, func, axis=(0,)): - """ - Apply a function across an axis. - - Array will be aligned so that the desired set of axes - are in the keys, which may require a transpose/reshape. - - Parameters - ---------- - func : function - Function of a single array to apply - - axis : tuple or int, optional, default=(0,) - Axis or multiple axes to apply function along. - - Returns - ------- - BoltArrayLocal - """ - axes = sorted(tupleize(axis)) - key_shape = [self.shape[axis] for axis in axes] - reshaped = self._align(axes, key_shape=key_shape) - - mapped = asarray(list(map(func, reshaped))) - elem_shape = mapped[0].shape - - # invert the previous reshape operation, using the shape of the map result - linearized_shape_inv = key_shape + list(elem_shape) - reordered = mapped.reshape(*linearized_shape_inv) - - return self._constructor(reordered) - - def reduce(self, func, axis=0): - """ - Reduce an array along an axis. - - Applies an associative/commutative function of two arguments - cumulatively to all arrays along an axis. Array will be aligned - so that the desired set of axes are in the keys, which may - require a transpose/reshape. - - Parameters - ---------- - func : function - Function of two arrays that returns a single array - - axis : tuple or int, optional, default=(0,) - Axis or multiple axes to reduce along. - - Returns - ------- - BoltArrayLocal - """ - axes = sorted(tupleize(axis)) - - # if the function is a ufunc, it can automatically handle reducing over multiple axes - if isinstance(func, ufunc): - inshape(self.shape, axes) - reduced = func.reduce(self, axis=tuple(axes)) - else: - reshaped = self._align(axes) - reduced = reduce(func, reshaped) - - new_array = self._constructor(reduced) - - # ensure that the shape of the reduced array is valid - expected_shape = [self.shape[i] for i in range(len(self.shape)) if i not in axes] - if new_array.shape != tuple(expected_shape): - raise ValueError("reduce did not yield a BoltArray with valid dimensions") - - return new_array - - def first(self): - """ - Return first element of the array - """ - return self[0] - - def concatenate(self, arry, axis=0): - """ - Join this array with another array. - - Paramters - --------- - arry : ndarray or BoltArrayLocal - Another array to concatenate with - - axis : int, optional, default=0 - The axis along which arrays will be joined. - - Returns - ------- - BoltArrayLocal - """ - if isinstance(arry, ndarray): - from bolt import concatenate - return concatenate((self, arry), axis) - else: - raise ValueError("other must be local array, got %s" % type(arry)) - - def toscalar(self): - """ - Returns the single scalar element contained in an array of shape (), if - the array has that shape. Returns self otherwise. - """ - if self.shape == (): - return self.toarray().reshape(1)[0] - else: - return self - - def tospark(self, sc, axis=0): - """ - Converts a BoltArrayLocal into a BoltArraySpark - - Parameters - ---------- - sc : SparkContext - The SparkContext which will be used to create the BoltArraySpark - - axis : tuple or int, optional, default=0 - The axis (or axes) across which this array will be parallelized - - Returns - ------- - BoltArraySpark - """ - from bolt import array - return array(self.toarray(), sc, axis=axis) - - def tordd(self, sc, axis=0): - """ - Converts a BoltArrayLocal into an RDD - - Parameters - ---------- - sc : SparkContext - The SparkContext which will be used to create the BoltArraySpark - - axis : tuple or int, optional, default=0 - The axis (or axes) across which this array will be parallelized - - Returns - ------- - RDD[(tuple, ndarray)] - """ - from bolt import array - return array(self.toarray(), sc, axis=axis).tordd() - - def toarray(self): - """ - Returns the underlying ndarray wrapped by this BoltArrayLocal - """ - return asarray(self) - - def display(self): - """ - Show a pretty-printed representation of this BoltArrayLocal - """ - print(str(self)) - - def __repr__(self): - return BoltArray.__repr__(self) diff --git a/bolt/local/construct.py b/bolt/local/construct.py deleted file mode 100644 index 41ae035..0000000 --- a/bolt/local/construct.py +++ /dev/null @@ -1,105 +0,0 @@ -from numpy import float64, asarray - -from bolt.construct import ConstructBase -from bolt.local.array import BoltArrayLocal - - -class ConstructLocal(ConstructBase): - - @staticmethod - def array(a, dtype=None, order='C'): - """ - Create a local bolt array. - - Parameters - ---------- - a : array-like - An array, any object exposing the array interface, an - object whose __array__ method returns an array, or any - (nested) sequence. - - dtype : data-type, optional, default=None - The desired data-type for the array. If None, will - be determined from the data. (see numpy) - - order : {'C', 'F', 'A'}, optional, default='C' - The order of the array. (see numpy) - - Returns - ------- - BoltArrayLocal - """ - return BoltArrayLocal(asarray(a, dtype, order)) - - @staticmethod - def ones(shape, dtype=float64, order='C'): - """ - Create a local bolt array of ones. - - Parameters - ---------- - shape : tuple - Dimensions of the desired array - - dtype : data-type, optional, default=float64 - The desired data-type for the array. (see numpy) - - order : {'C', 'F', 'A'}, optional, default='C' - The order of the array. (see numpy) - - Returns - ------- - BoltArrayLocal - """ - from numpy import ones - return ConstructLocal._wrap(ones, shape, dtype, order) - - @staticmethod - def zeros(shape, dtype=float64, order='C'): - """ - Create a local bolt array of zeros. - - Parameters - ---------- - shape : tuple - Dimensions of the desired array. - - dtype : data-type, optional, default=float64 - The desired data-type for the array. (see numpy) - - order : {'C', 'F', 'A'}, optional, default='C' - The order of the array. (see numpy) - - Returns - ------- - BoltArrayLocal - """ - from numpy import zeros - return ConstructLocal._wrap(zeros, shape, dtype, order) - - @staticmethod - def _wrap(func, shape, dtype, order): - return BoltArrayLocal(func(shape, dtype, order)) - - @staticmethod - def concatenate(arrays, axis=0): - """ - Join a sequence of arrays together. - - Parameters - ---------- - arrays : tuple - A sequence of array-like e.g. (a1, a2, ...) - - axis : int, optional, default=0 - The axis along which the arrays will be joined. - - Returns - ------- - BoltArrayLocal - """ - if not isinstance(arrays, tuple): - raise ValueError("data type not understood") - arrays = tuple([asarray(a) for a in arrays]) - from numpy import concatenate - return BoltArrayLocal(concatenate(arrays, axis)) \ No newline at end of file diff --git a/bolt/spark/__init__.py b/bolt/spark/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/bolt/spark/construct.py b/bolt/spark/construct.py deleted file mode 100644 index 6083e6e..0000000 --- a/bolt/spark/construct.py +++ /dev/null @@ -1,222 +0,0 @@ -from numpy import unravel_index, prod, arange, asarray, float64 - -from itertools import product - -from bolt.construct import ConstructBase -from bolt.spark.array import BoltArraySpark -from bolt.spark.utils import get_kv_shape, get_kv_axes - - -class ConstructSpark(ConstructBase): - - @staticmethod - def array(a, context=None, axis=(0,), dtype=None, npartitions=None): - """ - Create a spark bolt array from a local array. - - Parameters - ---------- - a : array-like - An array, any object exposing the array interface, an - object whose __array__ method returns an array, or any - (nested) sequence. - - context : SparkContext - A context running Spark. (see pyspark) - - axis : tuple, optional, default=(0,) - Which axes to distribute the array along. The resulting - distributed object will use keys to represent these axes, - with the remaining axes represented by values. - - dtype : data-type, optional, default=None - The desired data-type for the array. If None, will - be determined from the data. (see numpy) - - npartitions : int - Number of partitions for parallization. - - Returns - ------- - BoltArraySpark - """ - if dtype is None: - arry = asarray(a) - dtype = arry.dtype - else: - arry = asarray(a, dtype) - shape = arry.shape - ndim = len(shape) - - # handle the axes specification and transpose if necessary - axes = ConstructSpark._format_axes(axis, arry.shape) - key_axes, value_axes = get_kv_axes(arry.shape, axes) - permutation = key_axes + value_axes - arry = arry.transpose(*permutation) - split = len(axes) - - if split < 1: - raise ValueError("split axis must be greater than 0, got %g" % split) - if split > len(shape): - raise ValueError("split axis must not exceed number of axes %g, got %g" % (ndim, split)) - - key_shape = shape[:split] - val_shape = shape[split:] - - keys = zip(*unravel_index(arange(0, int(prod(key_shape))), key_shape)) - vals = arry.reshape((prod(key_shape),) + val_shape) - - rdd = context.parallelize(zip(keys, vals), npartitions) - return BoltArraySpark(rdd, shape=shape, split=split, dtype=dtype) - - @staticmethod - def ones(shape, context=None, axis=(0,), dtype=float64, npartitions=None): - """ - Create a spark bolt array of ones. - - Parameters - ---------- - shape : tuple - The desired shape of the array. - - context : SparkContext - A context running Spark. (see pyspark) - - axis : tuple, optional, default=(0,) - Which axes to distribute the array along. The resulting - distributed object will use keys to represent these axes, - with the remaining axes represented by values. - - dtype : data-type, optional, default=float64 - The desired data-type for the array. If None, will - be determined from the data. (see numpy) - - npartitions : int - Number of partitions for parallization. - - Returns - ------- - BoltArraySpark - """ - from numpy import ones - return ConstructSpark._wrap(ones, shape, context, axis, dtype, npartitions) - - @staticmethod - def zeros(shape, context=None, axis=(0,), dtype=float64, npartitions=None): - """ - Create a spark bolt array of zeros. - - Parameters - ---------- - shape : tuple - The desired shape of the array. - - context : SparkContext - A context running Spark. (see pyspark) - - axis : tuple, optional, default=(0,) - Which axes to distribute the array along. The resulting - distributed object will use keys to represent these axes, - with the remaining axes represented by values. - - dtype : data-type, optional, default=float64 - The desired data-type for the array. If None, will - be determined from the data. (see numpy) - - npartitions : int - Number of partitions for parallization. - - Returns - ------- - BoltArraySpark - """ - from numpy import zeros - return ConstructSpark._wrap(zeros, shape, context, axis, dtype, npartitions) - - @staticmethod - def concatenate(arrays, axis=0): - """ - Join two bolt arrays together, at least one of which is in spark. - - Parameters - ---------- - arrays : tuple - A pair of arrays. At least one must be a spark array, - the other can be a local bolt array, a local numpy array, - or an array-like. - - axis : int, optional, default=0 - The axis along which the arrays will be joined. - - Returns - ------- - BoltArraySpark - """ - if not isinstance(arrays, tuple): - raise ValueError("data type not understood") - if not len(arrays) == 2: - raise NotImplementedError("spark concatenation only supports two arrays") - - first, second = arrays - if isinstance(first, BoltArraySpark): - return first.concatenate(second, axis) - elif isinstance(second, BoltArraySpark): - first = ConstructSpark.array(first, second._rdd.context) - return first.concatenate(second, axis) - else: - raise ValueError("at least one array must be a spark bolt array") - - @staticmethod - def _argcheck(*args, **kwargs): - """ - Check that arguments are consistent with spark array construction. - - Conditions are: - (1) a positional argument is a SparkContext - (2) keyword arg 'context' is a SparkContext - (3) an argument is a BoltArraySpark, or - (4) an argument is a nested list containing a BoltArraySpark - """ - try: - from pyspark import SparkContext - except ImportError: - return False - - cond1 = any([isinstance(arg, SparkContext) for arg in args]) - cond2 = isinstance(kwargs.get('context', None), SparkContext) - cond3 = any([isinstance(arg, BoltArraySpark) for arg in args]) - cond4 = any([any([isinstance(sub, BoltArraySpark) for sub in arg]) - if isinstance(arg, (tuple, list)) else False for arg in args]) - return cond1 or cond2 or cond3 or cond4 - - @staticmethod - def _format_axes(axes, shape): - """ - Format target axes given an array shape - """ - if isinstance(axes, int): - axes = (axes,) - elif isinstance(axes, list) or hasattr(axes, '__iter__'): - axes = tuple(axes) - if not isinstance(axes, tuple): - raise ValueError("axes argument %s in the constructor not specified correctly" % str(axes)) - if min(axes) < 0 or max(axes) > len(shape) - 1: - raise ValueError("invalid key axes %s given shape %s" % (str(axes), str(shape))) - return axes - - @staticmethod - def _wrap(func, shape, context=None, axis=(0,), dtype=None, npartitions=None): - """ - Wrap an existing numpy constructor in a parallelized construction - """ - if isinstance(shape, int): - shape = (shape,) - key_shape, value_shape = get_kv_shape(shape, ConstructSpark._format_axes(axis, shape)) - split = len(key_shape) - - # make the keys - rdd = context.parallelize(list(product(*[arange(x) for x in key_shape])), npartitions) - - # use a map to make the arrays in parallel - rdd = rdd.map(lambda x: (x, func(value_shape, dtype, order='C'))) - return BoltArraySpark(rdd, shape=shape, split=split, dtype=dtype) diff --git a/setup.py b/setup.py index 1cee7a9..c973710 100755 --- a/setup.py +++ b/setup.py @@ -11,9 +11,10 @@ author='The Freeman Lab', author_email='the.freeman.lab@gmail.com', url='https://github.com/bolt-project/bolt', - packages=['bolt', - 'bolt.local', - 'bolt.spark'], + packages=[ + 'bolt', + 'bolt.array' + ], long_description=open('README.rst').read(), install_requires=open('requirements.txt').read().split() ) diff --git a/test/generic.py b/test/generic.py index 62fe588..4ab46c1 100644 --- a/test/generic.py +++ b/test/generic.py @@ -70,12 +70,11 @@ def nonuniform_map(x): res = mapped.toarray() # check that changes in dtype are correctly handled - if b.mode == 'spark': - func3 = lambda x: x.astype('float32') - mapped = b.map(func3, axis=0) - assert mapped.dtype == dtype('float32') - mapped = b.map(func3, axis=0, dtype=dtype('float32')) - assert mapped.dtype == dtype('float32') + func3 = lambda x: x.astype('float32') + mapped = b.map(func3, axis=0) + assert mapped.dtype == dtype('float32') + mapped = b.map(func3, axis=0, dtype=dtype('float32')) + assert mapped.dtype == dtype('float32') def reduce_suite(arr, b): """ @@ -94,24 +93,24 @@ def reduce_suite(arr, b): # Reduce over the first axis with an add reduced = b.reduce(add, axis=0) - res = reduced.toarray() + res = reduced assert res.shape == (arr.shape[1], arr.shape[2]) assert allclose(res, sum(arr, 0)) # Reduce over multiple axes with an add reduced = b.reduce(add, axis=(0, 1)) - res = reduced.toarray() + res = reduced assert res.shape == (arr.shape[2],) assert allclose(res, sum(sum(arr, 0), 1)) # Reduce over various other axes with an add reduced = b.reduce(add, axis=1) - res = reduced.toarray() + res = reduced assert res.shape == (arr.shape[0], arr.shape[2]) assert allclose(res, sum(arr, 1)) reduced = b.reduce(add, axis=(1, 2)) - res = reduced.toarray() + res = reduced assert res.shape == (arr.shape[0],) assert allclose(res, sum(sum(arr, 1), 1)) @@ -151,11 +150,10 @@ def filter_half(x): assert res.shape[0] <= b.shape[0] # rerun with sorting - if not b.mode == "local": - filtered = b.filter(lambda x: filter_half(x) < 0.5, sort=True) - res = filtered.toarray() - assert res.shape[1:] == b.shape[1:] - assert res.shape[0] <= b.shape[0] + filtered = b.filter(lambda x: filter_half(x) < 0.5, sort=True) + res = filtered.toarray() + assert res.shape[1:] == b.shape[1:] + assert res.shape[0] <= b.shape[0] # filter out half of the values over the second axis filtered = b.filter(lambda x: filter_half(x) < 0.5, axis=1) diff --git a/test/local/test_local_basic.py b/test/local/test_local_basic.py deleted file mode 100644 index 306426c..0000000 --- a/test/local/test_local_basic.py +++ /dev/null @@ -1,44 +0,0 @@ -from numpy import arange -from bolt import array -from bolt.spark.array import BoltArraySpark -from bolt.utils import allclose - - -def test_construct(): - x = arange(2*3*4).reshape((2, 3, 4)) - b = array(x) - assert b.shape == (2, 3, 4) - - -def test_toarray(): - - x = arange(2*3*4).reshape((2, 3, 4)) - b = array(x) - assert allclose(b.toarray(), x) - -def test_tospark(sc): - - x = arange(2*3*4).reshape((2, 3, 4)) - b = array(x) - s = b.tospark(sc, axis=0) - assert isinstance(s, BoltArraySpark) - assert s.shape == (2, 3, 4) - assert allclose(s.toarray(), x) - -def test_tordd(sc): - - from pyspark import RDD - x = arange(2*3*4).reshape((2, 3, 4)) - - b = array(x) - r = b.tordd(sc, axis=0) - assert isinstance(r, RDD) - assert r.count() == 2 - - r = b.tordd(sc, axis=(0, 1)) - assert isinstance(r, RDD) - assert r.count() == 2*3 - - r = b.tordd(sc, axis=(0, 1, 2)) - assert isinstance(r, RDD) - assert r.count() == 2*3*4 diff --git a/test/local/test_local_construct.py b/test/local/test_local_construct.py deleted file mode 100644 index 737b163..0000000 --- a/test/local/test_local_construct.py +++ /dev/null @@ -1,38 +0,0 @@ -import pytest -from numpy import arange -from bolt import array, ones, zeros, concatenate -from bolt.utils import allclose - - -def test_array(): - - x = arange(2*3*4).reshape((2, 3, 4)) - b = array(x) - assert allclose(x, b.toarray()) - -def test_ones(): - - from numpy import ones as npones - x = npones((2, 3, 4)) - b = ones((2, 3, 4)) - assert allclose(x, b.toarray()) - -def test_zeros(): - from numpy import zeros as npzeros - x = npzeros((2, 3, 4)) - b = zeros((2, 3, 4)) - assert allclose(x, b.toarray()) - -def test_concatenate(): - - from numpy import concatenate as npconcatenate - x = arange(2*3*4).reshape((2, 3, 4)) - b = concatenate((x, x)) - assert allclose(npconcatenate((x, x)), b.toarray()) - -def test_concatenate_errors(): - - x = arange(2*3*4).reshape((2, 3, 4)) - - with pytest.raises(ValueError): - concatenate(x) diff --git a/test/local/test_local_functional.py b/test/local/test_local_functional.py deleted file mode 100644 index 4c1ff82..0000000 --- a/test/local/test_local_functional.py +++ /dev/null @@ -1,52 +0,0 @@ -from numpy import arange, repeat -from bolt import array -from bolt.utils import allclose -import generic - - -def test_map(): - - import random - random.seed(42) - - x = arange(2*3*4).reshape(2, 3, 4) - b = array(x) - - # Test all generic map functionality - generic.map_suite(x, b) - - -def test_reduce(): - - from numpy import asarray - - dims = (10, 10, 10) - area = dims[0] * dims[1] - arr = asarray([repeat(x,area).reshape(dims[0], dims[1]) for x in range(dims[2])]) - b = array(arr) - - # Test all generic reduce functionality - generic.reduce_suite(arr, b) - - -def test_filter(): - - x = arange(2*3*4).reshape(2, 3, 4) - b = array(x) - - # Test all generic filter functionality - generic.filter_suite(x, b) - -def test_ufuncs(): - - x = arange(2*3*4*5).reshape(2, 3, 4, 5) - b = array(x) - - # test a common ufunc (sum) over different dimensions - assert allclose(x.sum(axis=0), b.sum(axis=0).toarray()) - assert allclose(x.sum(axis=(0, 1)), b.sum(axis=(0, 1)).toarray()) - assert allclose(x.sum(axis=(0, 1, 2)), b.sum(axis=(0, 1, 2)).toarray()) - assert x.sum() == b.sum() - - - diff --git a/test/spark/test_spark_basic.py b/test/test_spark_basic.py similarity index 97% rename from test/spark/test_spark_basic.py rename to test/test_spark_basic.py index 6593af9..f84653a 100644 --- a/test/spark/test_spark_basic.py +++ b/test/test_spark_basic.py @@ -68,9 +68,10 @@ def test_repartition(sc): def test_concatenate(sc): from numpy import concatenate + from numpy import array as npArray x = arange(2*3).reshape((2, 3)) b = array(x, sc) - c = array(x) + c = npArray(x) assert allclose(b.concatenate(x).toarray(), concatenate((x, x))) assert allclose(b.concatenate(b).toarray(), concatenate((x, x))) assert allclose(b.concatenate(c).toarray(), concatenate((x, x))) @@ -84,7 +85,7 @@ def test_dtype(sc): dtypes = b._rdd.map(lambda x: x[1].dtype).collect() for dt in dtypes: assert dt == dtype(int64) - + a = arange(2.0**8) b = array(a, sc) assert a.dtype == b.dtype @@ -123,9 +124,9 @@ def test_dtype(sc): assert dt == dtype(bool) def test_astype(sc): - + from numpy import ones as npones - + a = npones(2**8, dtype=int64) b = array(a, sc, dtype=int64) c = b.astype(bool) @@ -133,7 +134,7 @@ def test_astype(sc): dtypes = c._rdd.map(lambda x: x[1].dtype).collect() for dt in dtypes: assert dt == dtype(bool) - + b = ones((100, 100), sc, dtype=int64) c = b.astype(bool) assert c.dtype == dtype(bool) @@ -156,4 +157,4 @@ def test_clip(sc): b = array(a, sc) assert allclose(b.clip(0).toarray(), a.clip(0)) assert allclose(b.clip(2).toarray(), a.clip(2)) - assert allclose(b.clip(1, 2).toarray(), a.clip(1, 2)) \ No newline at end of file + assert allclose(b.clip(1, 2).toarray(), a.clip(1, 2)) diff --git a/test/spark/test_spark_chunking.py b/test/test_spark_chunking.py similarity index 91% rename from test/spark/test_spark_chunking.py rename to test/test_spark_chunking.py index 008106f..277fa2e 100644 --- a/test/spark/test_spark_chunking.py +++ b/test/test_spark_chunking.py @@ -1,5 +1,5 @@ import pytest -from numpy import arange, split, array_equal, empty, newaxis +from numpy import arange, split, array_equal, empty, newaxis, asarray from bolt import array, ones from bolt.utils import allclose @@ -97,14 +97,14 @@ def test_padding(sc): c = b.chunk((2, 2), padding=1) chunks = c.tordd().sortByKey().values().collect() - assert allclose(chunks[0], array([[0, 1, 2], [6, 7, 8], [12, 13, 14]])) - assert allclose(chunks[1], array([[1, 2, 3, 4], [7, 8, 9, 10], [13, 14, 15, 16]])) - assert allclose(chunks[4], array([[7, 8, 9, 10], [13, 14, 15, 16], [19, 20, 21, 22], [25, 26, 27, 28]])) - assert allclose(chunks[6], array([[18, 19, 20], [24, 25, 26]])) + assert allclose(chunks[0], asarray([[0, 1, 2], [6, 7, 8], [12, 13, 14]])) + assert allclose(chunks[1], asarray([[1, 2, 3, 4], [7, 8, 9, 10], [13, 14, 15, 16]])) + assert allclose(chunks[4], asarray([[7, 8, 9, 10], [13, 14, 15, 16], [19, 20, 21, 22], [25, 26, 27, 28]])) + assert allclose(chunks[6], asarray([[18, 19, 20], [24, 25, 26]])) c = b.chunk((3, 3), padding=(1, 2)) chunks = c.tordd().sortByKey().values().collect() - assert allclose(chunks[0], array([[0, 1, 2, 3, 4], [6, 7, 8, 9, 10], [12, 13, 14, 15, 16], [18, 19, 20, 21, 22]])) + assert allclose(chunks[0], asarray([[0, 1, 2, 3, 4], [6, 7, 8, 9, 10], [12, 13, 14, 15, 16], [18, 19, 20, 21, 22]])) c = b.chunk((2,2), padding=1) assert allclose(x, c.unchunk().toarray()) diff --git a/test/spark/test_spark_construct.py b/test/test_spark_construct.py similarity index 91% rename from test/spark/test_spark_construct.py rename to test/test_spark_construct.py index e8dc779..e371c36 100644 --- a/test/spark/test_spark_construct.py +++ b/test/test_spark_construct.py @@ -2,26 +2,26 @@ from numpy import arange from bolt import array, ones, zeros, concatenate from bolt.utils import allclose -from bolt.spark.array import BoltArraySpark +from bolt.array.array import BoltArray def test_array(sc): x = arange(2*3*4).reshape((2, 3, 4)) b = array(x, sc) - assert isinstance(b, BoltArraySpark) + assert isinstance(b, BoltArray) assert allclose(x, b.toarray()) b = array(x, sc, axis=0) - assert isinstance(b, BoltArraySpark) + assert isinstance(b, BoltArray) assert allclose(x, b.toarray()) b = array(x, sc, axis=(0, 1)) - assert isinstance(b, BoltArraySpark) + assert isinstance(b, BoltArray) assert allclose(x, b.toarray()) b = array(x, sc, axis=(0, 1), npartitions=5) - assert isinstance(b, BoltArraySpark) + assert isinstance(b, BoltArray) assert allclose(x, b.toarray()) assert b.tordd().getNumPartitions() == 5 diff --git a/test/spark/test_spark_functional.py b/test/test_spark_functional.py similarity index 87% rename from test/spark/test_spark_functional.py rename to test/test_spark_functional.py index ad778dd..5bc22ee 100644 --- a/test/spark/test_spark_functional.py +++ b/test/test_spark_functional.py @@ -14,11 +14,11 @@ def test_map(sc): # Test all map functionality when the base array is split after the first axis generic.map_suite(x, b) - # Split the BoltArraySpark after the second axis and rerun the tests + # Split the BoltArray after the second axis and rerun the tests b = array(x, sc, axis=(0, 1)) generic.map_suite(x, b) - # Split the BoltArraySpark after the third axis (scalar values) and rerun the tests + # Split the BoltArray after the third axis (scalar values) and rerun the tests b = array(x, sc, axis=(0, 1, 2)) generic.map_suite(x, b) @@ -39,11 +39,11 @@ def test_reduce(sc): # Test all reduce functionality when the base array is split after the first axis generic.reduce_suite(arr, b) - # Split the BoltArraySpark after the second axis and rerun the tests + # Split the BoltArray after the second axis and rerun the tests b = array(arr, sc, axis=(0, 1)) generic.reduce_suite(arr, b) - # Split the BoltArraySpark after the third axis (scalar values) and rerun the tests + # Split the BoltArray after the third axis (scalar values) and rerun the tests b = array(arr, sc, axis=(0, 1, 2)) generic.reduce_suite(arr, b) @@ -55,11 +55,11 @@ def test_filter(sc): # Test all filter functionality when the base array is split after the first axis generic.filter_suite(x, b) - # Split the BoltArraySpark after the second axis and rerun the tests + # Split the BoltArray after the second axis and rerun the tests b = array(x, sc, axis=(0, 1)) generic.filter_suite(x, b) - # Split the BoltArraySpark after the third axis (scalar values) and rerun the tests + # Split the BoltArray after the third axis (scalar values) and rerun the tests b = array(x, sc, axis=(0, 1, 2)) generic.filter_suite(x, b) diff --git a/test/spark/test_spark_getting.py b/test/test_spark_getting.py similarity index 100% rename from test/spark/test_spark_getting.py rename to test/test_spark_getting.py diff --git a/test/spark/test_spark_shaping.py b/test/test_spark_shaping.py similarity index 100% rename from test/spark/test_spark_shaping.py rename to test/test_spark_shaping.py diff --git a/test/spark/test_spark_stacking.py b/test/test_spark_stacking.py similarity index 95% rename from test/spark/test_spark_stacking.py rename to test/test_spark_stacking.py index c57cfe4..bf70b91 100644 --- a/test/spark/test_spark_stacking.py +++ b/test/test_spark_stacking.py @@ -2,7 +2,7 @@ from numpy import arange, repeat, asarray, vstack, tile from bolt import array, ones from bolt.utils import allclose -from bolt.spark.array import BoltArraySpark +from bolt.array.array import BoltArray def _2D_stackable_preamble(sc, num_partitions=2): @@ -10,7 +10,7 @@ def _2D_stackable_preamble(sc, num_partitions=2): dims = (10, 10) arr = vstack([[x]*dims[1] for x in arange(dims[0])]) barr = array(arr, sc, axis=0) - barr = BoltArraySpark(barr._rdd.partitionBy(num_partitions), + barr = BoltArray(barr._rdd.partitionBy(num_partitions), shape=barr.shape, split=barr.split) return barr @@ -20,7 +20,7 @@ def _3D_stackable_preamble(sc, num_partitions=2): area = dims[0] * dims[1] arr = asarray([repeat(x, area).reshape(dims[0], dims[1]) for x in range(dims[2])]) barr = array(arr, sc, axis=0) - barr = BoltArraySpark(barr._rdd.partitionBy(num_partitions), + barr = BoltArray(barr._rdd.partitionBy(num_partitions), shape=barr.shape, split=barr.split) return barr @@ -130,4 +130,4 @@ def test_stacked_conversion(sc): from pyspark import RDD barr = _2D_stackable_preamble(sc) k1 = barr.tordd().keys() - assert isinstance(k1, RDD) \ No newline at end of file + assert isinstance(k1, RDD)