From 7888abef9c7f115dac952a65ce8ead32e314ed38 Mon Sep 17 00:00:00 2001 From: Kyle Date: Sun, 22 May 2016 01:18:21 -0400 Subject: [PATCH 1/5] Include add(), subtract(), multiply(), divide() Include elements of Spark Arrays that allow for element-wise addition, subtraction, multiplication, and division. --- bolt/spark/array.py | 105 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/bolt/spark/array.py b/bolt/spark/array.py index 8f34eb5..05d98b8 100644 --- a/bolt/spark/array.py +++ b/bolt/spark/array.py @@ -1012,3 +1012,108 @@ def display(self): """ for x in self._rdd.take(10): print(x) + + def add(self, arry): + """ + Add this array element-wise with another array (arry). + + Paramters + --------- + arry : ndarray, BoltArrayLocal, or BoltArraySpark + Another array to add element-wise + + Returns + ------- + BoltArraySpark + """ + if isinstance(arry, ndarray): + from bolt.spark.construct import ConstructSpark + arry = ConstructSpark.array(arry, self._rdd.context, axis=range(0, self.split)) + else: + if not isinstance(arry, BoltArraySpark): + raise ValueError("other must be local array or spark array, got %s" % type(arry)) + + if not all([x == y]): + raise ValueError("all the input array dimensions must match exactly") + + rdd = self._rdd.join(arry._rdd).mapValues(lambda x: x[0] + x[1]) + return self._constructor(rdd).__finalize__(self) + + def subtract(self, arry): + """ + Subtract another array (arry) element-wise from this array. + + Paramters + --------- + arry : ndarray, BoltArrayLocal, or BoltArraySpark + Another array to subtract element-wise + + Returns + ------- + BoltArraySpark + """ + if isinstance(arry, ndarray): + from bolt.spark.construct import ConstructSpark + arry = ConstructSpark.array(arry, self._rdd.context, axis=range(0, self.split)) + else: + if not isinstance(arry, BoltArraySpark): + raise ValueError("other must be local array or spark array, got %s" % type(arry)) + + if not all([x == y]): + raise ValueError("all the input array dimensions must match exactly") + + rdd = self._rdd.join(arry._rdd).mapValues(lambda x: x[0] - x[1]) + return self._constructor(rdd).__finalize__(self) + + def multiply(self, arry): + """ + Multiply this array element-wise with another array (arry). + + Paramters + --------- + arry : ndarray, BoltArrayLocal, or BoltArraySpark + Another array to multiply element-wise + + Returns + ------- + BoltArraySpark + """ + if isinstance(arry, ndarray): + from bolt.spark.construct import ConstructSpark + arry = ConstructSpark.array(arry, self._rdd.context, axis=range(0, self.split)) + else: + if not isinstance(arry, BoltArraySpark): + raise ValueError("other must be local array or spark array, got %s" % type(arry)) + + if not all([x == y]): + raise ValueError("all the input array dimensions must match exactly") + + rdd = self._rdd.join(arry._rdd).mapValues(lambda x: x[0] * x[1]) + return self._constructor(rdd).__finalize__(self) + + def divide(self, arry): + """ + Divide this array by another array (arry) element-wise. + + Paramters + --------- + arry : ndarray, BoltArrayLocal, or BoltArraySpark + Another array to divide by element-wise + + Returns + ------- + BoltArraySpark + """ + if isinstance(arry, ndarray): + from bolt.spark.construct import ConstructSpark + arry = ConstructSpark.array(arry, self._rdd.context, axis=range(0, self.split)) + else: + if not isinstance(arry, BoltArraySpark): + raise ValueError("other must be local array or spark array, got %s" % type(arry)) + + if not all([x == y]): + raise ValueError("all the input array dimensions must match exactly") + + from future import division + rdd = self._rdd.join(arry._rdd).mapValues(lambda x: x[0] / x[1]) + return self._constructor(rdd).__finalize__(self) From bc450ef2622466ef60ef18b43d20c078474c9644 Mon Sep 17 00:00:00 2001 From: Kyle Date: Sun, 22 May 2016 01:43:10 -0400 Subject: [PATCH 2/5] Update array.py --- bolt/spark/array.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/bolt/spark/array.py b/bolt/spark/array.py index 05d98b8..39887b5 100644 --- a/bolt/spark/array.py +++ b/bolt/spark/array.py @@ -1,4 +1,4 @@ -from __future__ import print_function +from __future__ import print_function, divide from numpy import asarray, unravel_index, prod, mod, ndarray, ceil, where, \ r_, sort, argsort, array, random, arange, ones, expand_dims, sum from itertools import groupby @@ -1013,7 +1013,7 @@ def display(self): for x in self._rdd.take(10): print(x) - def add(self, arry): + def __add__(self, arry): """ Add this array element-wise with another array (arry). @@ -1039,7 +1039,7 @@ def add(self, arry): rdd = self._rdd.join(arry._rdd).mapValues(lambda x: x[0] + x[1]) return self._constructor(rdd).__finalize__(self) - def subtract(self, arry): + def __sub__(self, arry): """ Subtract another array (arry) element-wise from this array. @@ -1065,7 +1065,7 @@ def subtract(self, arry): rdd = self._rdd.join(arry._rdd).mapValues(lambda x: x[0] - x[1]) return self._constructor(rdd).__finalize__(self) - def multiply(self, arry): + def __mul__(self, arry): """ Multiply this array element-wise with another array (arry). @@ -1091,7 +1091,7 @@ def multiply(self, arry): rdd = self._rdd.join(arry._rdd).mapValues(lambda x: x[0] * x[1]) return self._constructor(rdd).__finalize__(self) - def divide(self, arry): + def __truediv__(self, arry): """ Divide this array by another array (arry) element-wise. From 76c0d664805e5a49d1dcf7dfc0aea0d55a1ab47f Mon Sep 17 00:00:00 2001 From: Kyle Date: Mon, 23 May 2016 12:03:42 -0400 Subject: [PATCH 3/5] Update array.py to have correct self.__div__ Fixed __truediv__ functionality. Submitted this as a patch to the main branch with the following note: Added basic element-wise addition (+), subtraction (-), multiplication (*), and division (/) to BoltSparkArrays. Method used is `_rdd().join().mapValues()`. Attempted to play around with `_rdd().cogroup().reduceByKey()` but `cogroup()` returns interables and I was not familiar enough with them to spend the time troubleshooting other methods for a time efficiency comparison. May be a better method to implement these operations, but I went with the best I knew how. I didn't include any of the reverse functions (__radd__ etc) as I wasn't sure how it would behave/interact with local array functions if there was an attempt to list the local array first (`local + Spark`). There are no issues as long as it is always two arrays being added together and the Spark array is listed first (`Spark + local`). I also didn't add support for adding/subtracting/multiplying/dividing by a constant integer across the whole array. I wasn't sure if this is functionality we would want to add, as it can be done using `map()`. My personal opinion is to require the user to use `map()` because I feel it forces the user to remember they are working on distributed arrays. However, this functionality could be easily implemented into the existing functions with some if statements in checking for integers and doing the `map()` under the hood. On a similar note to above, these functions could also be changed to not use the symbolic values (+, -, *, /) but be calls required in a map format (`.add()`, `.subtract()`, `multiply()`, `divide()`). Finally, if there is an intention to add many operations for distributed BoltSparkArrays similar to all the functions for python ndarrays, perhaps an independent class or .py file should be created to contain these operations that could then be called? I wasn't sure of the long-term intentions of what types of functions there is an interest to integrate into BoltArrays. Tried to include similar safety catches as `concatenate()`. For division, true division from __future__ is always performed. Basic troubleshooting was performed without any major issues, but more thorough troubleshooting may be useful. --- bolt/spark/array.py | 52 ++++++++++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 24 deletions(-) diff --git a/bolt/spark/array.py b/bolt/spark/array.py index 39887b5..720402a 100644 --- a/bolt/spark/array.py +++ b/bolt/spark/array.py @@ -1,4 +1,4 @@ -from __future__ import print_function, divide +from __future__ import print_function, division from numpy import asarray, unravel_index, prod, mod, ndarray, ceil, where, \ r_, sort, argsort, array, random, arange, ones, expand_dims, sum from itertools import groupby @@ -1013,15 +1013,13 @@ def display(self): for x in self._rdd.take(10): print(x) - def __add__(self, arry): + def __add__(self, arry): """ Add this array element-wise with another array (arry). - Paramters --------- arry : ndarray, BoltArrayLocal, or BoltArraySpark Another array to add element-wise - Returns ------- BoltArraySpark @@ -1033,21 +1031,19 @@ def __add__(self, arry): if not isinstance(arry, BoltArraySpark): raise ValueError("other must be local array or spark array, got %s" % type(arry)) - if not all([x == y]): - raise ValueError("all the input array dimensions must match exactly") + if not all([x == y for (x,y) in zip(self.shape, arry.shape)]): + raise ValueError("All the input array dimensions must match exactly") rdd = self._rdd.join(arry._rdd).mapValues(lambda x: x[0] + x[1]) return self._constructor(rdd).__finalize__(self) - + def __sub__(self, arry): """ Subtract another array (arry) element-wise from this array. - Paramters --------- arry : ndarray, BoltArrayLocal, or BoltArraySpark Another array to subtract element-wise - Returns ------- BoltArraySpark @@ -1059,21 +1055,19 @@ def __sub__(self, arry): if not isinstance(arry, BoltArraySpark): raise ValueError("other must be local array or spark array, got %s" % type(arry)) - if not all([x == y]): - raise ValueError("all the input array dimensions must match exactly") + if not all([x == y for (x,y) in zip(self.shape, arry.shape)]): + raise ValueError("All the input array dimensions must match exactly") rdd = self._rdd.join(arry._rdd).mapValues(lambda x: x[0] - x[1]) return self._constructor(rdd).__finalize__(self) - + def __mul__(self, arry): """ Multiply this array element-wise with another array (arry). - Paramters --------- arry : ndarray, BoltArrayLocal, or BoltArraySpark Another array to multiply element-wise - Returns ------- BoltArraySpark @@ -1085,21 +1079,19 @@ def __mul__(self, arry): if not isinstance(arry, BoltArraySpark): raise ValueError("other must be local array or spark array, got %s" % type(arry)) - if not all([x == y]): - raise ValueError("all the input array dimensions must match exactly") + if not all([x == y for (x,y) in zip(self.shape, arry.shape)]): + raise ValueError("All the input array dimensions must match exactly") rdd = self._rdd.join(arry._rdd).mapValues(lambda x: x[0] * x[1]) return self._constructor(rdd).__finalize__(self) - - def __truediv__(self, arry): - """ - Divide this array by another array (arry) element-wise. + def __div__(self, arry): + """ + Divide this array by another array (arry) element-wise. Always use true division Paramters --------- arry : ndarray, BoltArrayLocal, or BoltArraySpark Another array to divide by element-wise - Returns ------- BoltArraySpark @@ -1111,9 +1103,21 @@ def __truediv__(self, arry): if not isinstance(arry, BoltArraySpark): raise ValueError("other must be local array or spark array, got %s" % type(arry)) - if not all([x == y]): - raise ValueError("all the input array dimensions must match exactly") + if not all([x == y for (x,y) in zip(self.shape, arry.shape)]): + raise ValueError("All the input array dimensions must match exactly") - from future import division rdd = self._rdd.join(arry._rdd).mapValues(lambda x: x[0] / x[1]) return self._constructor(rdd).__finalize__(self) + + def __truediv__(self, arry): + """ + If true division all ready imported, just use division, as true division is imported there + Paramters + --------- + arry : ndarray, BoltArrayLocal, or BoltArraySpark + Another array to divide by element-wise + Returns + ------- + BoltArraySpark + """ + return self.__div__(arry) From dbe7af03456d574a08e863d6bcfc49894f883905 Mon Sep 17 00:00:00 2001 From: Kyle Rollins Hansen Date: Wed, 25 May 2016 17:59:02 -0400 Subject: [PATCH 4/5] Add elementwise binary operations & binary operators test --- bolt/spark/array.py | 154 ++++++++++------------- test/spark/test_spark_binaryoperators.py | 98 +++++++++++++++ 2 files changed, 162 insertions(+), 90 deletions(-) create mode 100644 test/spark/test_spark_binaryoperators.py diff --git a/bolt/spark/array.py b/bolt/spark/array.py index 720402a..bab7519 100644 --- a/bolt/spark/array.py +++ b/bolt/spark/array.py @@ -1,6 +1,7 @@ -from __future__ import print_function, division +from __future__ import print_function from numpy import asarray, unravel_index, prod, mod, ndarray, ceil, where, \ - r_, sort, argsort, array, random, arange, ones, expand_dims, sum + r_, sort, argsort, array, random, arange, ones, expand_dims, sum, add, \ + subtract, multiply, true_divide, isscalar from itertools import groupby from bolt.base import BoltArray @@ -1013,111 +1014,84 @@ def display(self): for x in self._rdd.take(10): print(x) - def __add__(self, arry): + def elementwise_binary(self, other, op): """ - Add this array element-wise with another array (arry). - Paramters - --------- - arry : ndarray, BoltArrayLocal, or BoltArraySpark - Another array to add element-wise - Returns - ------- - BoltArraySpark - """ - if isinstance(arry, ndarray): - from bolt.spark.construct import ConstructSpark - arry = ConstructSpark.array(arry, self._rdd.context, axis=range(0, self.split)) - else: - if not isinstance(arry, BoltArraySpark): - raise ValueError("other must be local array or spark array, got %s" % type(arry)) + Apply an elementwise binary operation between two arrays or an array and scalar. - if not all([x == y for (x,y) in zip(self.shape, arry.shape)]): - raise ValueError("All the input array dimensions must match exactly") - - rdd = self._rdd.join(arry._rdd).mapValues(lambda x: x[0] + x[1]) - return self._constructor(rdd).__finalize__(self) + Self and other must have the same shape, or other must be a scalar. - def __sub__(self, arry): - """ - Subtract another array (arry) element-wise from this array. - Paramters - --------- - arry : ndarray, BoltArrayLocal, or BoltArraySpark - Another array to subtract element-wise + Parameters + ---------- + other : scalar, ndarray, BoltArrayLocal, or BoltArraySpark + Value or array to perform a binary operation with element-wise + + op : function + Binary operator to use for elementwise operations, e.g. add, subtract + Returns ------- BoltArraySpark """ - if isinstance(arry, ndarray): - from bolt.spark.construct import ConstructSpark - arry = ConstructSpark.array(arry, self._rdd.context, axis=range(0, self.split)) - else: - if not isinstance(arry, BoltArraySpark): - raise ValueError("other must be local array or spark array, got %s" % type(arry)) + if not isscalar(other) and not self.shape == other.shape: + raise ValueError("Shapes %s and %s must be equal" % (self.shape, other.shape)) - if not all([x == y for (x,y) in zip(self.shape, arry.shape)]): - raise ValueError("All the input array dimensions must match exactly") + if isinstance(other, ndarray): + from bolt.spark.construct import ConstructSpark + other = ConstructSpark.array(other, self._rdd.context, axis=range(0, self.split)) - rdd = self._rdd.join(arry._rdd).mapValues(lambda x: x[0] - x[1]) - return self._constructor(rdd).__finalize__(self) + if isscalar(other): + return self.map(lambda x: op(x, other)) + + elif isinstance(other, BoltArraySpark): + + def func(record): + (k1, x), (k2, y) = record + return k1, op(x, y) + + rdd = self.tordd().zip(other.tordd()).map(func) + return self._constructor(rdd).__finalize__(self) + + else: + raise ValueError("other must be ndarray, BoltArrayLocal, or BoltArraySpark. Got %s" % type(other)) - def __mul__(self, arry): + def __add__(self, other): """ - Multiply this array element-wise with another array (arry). - Paramters - --------- - arry : ndarray, BoltArrayLocal, or BoltArraySpark - Another array to multiply element-wise - Returns - ------- - BoltArraySpark + Provides element-wise "+" functionality for BoltArraySpark. """ - if isinstance(arry, ndarray): - from bolt.spark.construct import ConstructSpark - arry = ConstructSpark.array(arry, self._rdd.context, axis=range(0, self.split)) - else: - if not isinstance(arry, BoltArraySpark): - raise ValueError("other must be local array or spark array, got %s" % type(arry)) - - if not all([x == y for (x,y) in zip(self.shape, arry.shape)]): - raise ValueError("All the input array dimensions must match exactly") - - rdd = self._rdd.join(arry._rdd).mapValues(lambda x: x[0] * x[1]) - return self._constructor(rdd).__finalize__(self) + return self.elementwise_binary(other, add) + + def __radd__(self, other): + """ + Provides element-wise "+" functionality for BoltArraySpark if scalar provided first. + """ + return self.elementwise_binary(other, add) - def __div__(self, arry): + def __sub__(self, other): """ - Divide this array by another array (arry) element-wise. Always use true division - Paramters - --------- - arry : ndarray, BoltArrayLocal, or BoltArraySpark - Another array to divide by element-wise - Returns - ------- - BoltArraySpark + Provides element-wise "-" functionality for BoltArraySpark. """ - if isinstance(arry, ndarray): - from bolt.spark.construct import ConstructSpark - arry = ConstructSpark.array(arry, self._rdd.context, axis=range(0, self.split)) - else: - if not isinstance(arry, BoltArraySpark): - raise ValueError("other must be local array or spark array, got %s" % type(arry)) + return self.elementwise_binary(other, subtract) - if not all([x == y for (x,y) in zip(self.shape, arry.shape)]): - raise ValueError("All the input array dimensions must match exactly") + def __mul__(self, other): + """ + Provides element-wise "*" functionality for BoltArraySpark. + """ + return self.elementwise_binary(other, multiply) - rdd = self._rdd.join(arry._rdd).mapValues(lambda x: x[0] / x[1]) - return self._constructor(rdd).__finalize__(self) + def __rmul__(self, other): + """ + Provides element-wise "*" functionality for BoltArraySpark if scalar provided first. + """ + return self.elementwise_binary(other, multiply) - def __truediv__(self, arry): + def __div__(self, other): """ - If true division all ready imported, just use division, as true division is imported there - Paramters - --------- - arry : ndarray, BoltArrayLocal, or BoltArraySpark - Another array to divide by element-wise - Returns - ------- - BoltArraySpark + Provides element-wise "/" functionality for BoltArraySpark. + """ + return self.elementwise_binary(other, true_divide) + + def __truediv__(self, other): + """ + Provides element-wise "/" functionality for BoltArraySpark for Python 3 or __future__ division. """ - return self.__div__(arry) + return self.elementwise_binary(other, true_divide) diff --git a/test/spark/test_spark_binaryoperators.py b/test/spark/test_spark_binaryoperators.py new file mode 100644 index 0000000..b3dc98b --- /dev/null +++ b/test/spark/test_spark_binaryoperators.py @@ -0,0 +1,98 @@ +import pytest +from numpy import arange, true_divide +from bolt import array +from bolt.utils import allclose + +from pyspark import SparkConf, SparkContext + +@pytest.fixture(scope="session", + params=[pytest.mark.spark_yarn('yarn'), + pytest.mark.spark_local('local')]) +def sc(request): + if request.param == 'local': + conf = (SparkConf() + .setMaster("local[2]") + .setAppName("pytest-pyspark-local-testing") + ) + elif request.param == 'yarn': + conf = (SparkConf() + .setMaster("yarn-client") + .setAppName("pytest-pyspark-yarn-testing") + .set("spark.executor.memory", "1g") + .set("spark.executor.instances", 2) + ) + request.addfinalizer(lambda: spark_context.stop()) + + spark_context = SparkContext(conf=conf) + return spark_context + +def test_elementwise_spark(sc): + x = arange(1, 2*3*4+1).reshape(2, 3, 4) + y = 5*x + bx = array(x, sc, axis=(0,)) + by = array(y, sc, axis=(0,)) + + bxyadd = bx+by + bxyaddarr = bxyadd.toarray() + + bxysub = bx-by + bxysubarr = bxysub.toarray() + + bxymul = bx*by + bxymularr = bxymul.toarray() + + bxydiv = bx/by + bxydivarr = bxydiv.toarray() + + assert allclose(bxyaddarr, x+y) + assert allclose(bxysubarr, x-y) + assert allclose(bxymularr, x*y) + assert allclose(bxydivarr, true_divide(x,y)) + +def test_elementwise_mix(sc): + x = arange(1, 2*3*4+1).reshape(2, 3, 4) + y = x*3 + bx = array(x, sc, axis=(0,)) + by = array(y, sc, axis=(0,)) + + bxyadd = bx+by + bxyaddarr = bxyadd.toarray() + bxyaddloc = bx+y + bxyaddlocarr = bxyaddloc.toarray() + + bxysub = bx-by + bxysubarr = bxysub.toarray() + bxysubloc = bx-y + bxysublocarr = bxysubloc.toarray() + + bxymul = bx*by + bxymularr = bxymul.toarray() + bxymulloc = bx*y + bxymullocarr = bxymulloc.toarray() + + bxydiv = bx/by + bxydivarr = bxydiv.toarray() + bxydivloc = bx/y + bxydivlocarr = bxydivloc.toarray() + + assert allclose(bxyadd, bxyaddlocarr) + assert allclose(bxysub, bxysublocarr) + assert allclose(bxymul, bxymullocarr) + assert allclose(bxydiv, bxydivlocarr) + +def test_elementwise_scalar(sc): + x = arange(2*3*4).reshape(2, 3, 4) + bx = array(x, sc, axis=(0,)) + + bxfive = bx + 5 + bxfivearr = bxfive.toarray() + fivebx = 5 + bx + fivebxarr = fivebx.toarray() + + bxten = bx * 10 + bxtenarr = bxten.toarray() + tenbx = 10 * bx + tenbxarr = tenbx.toarray() + + assert allclose(bxfivearr, fivebxarr) + assert allclose(bxtenarr, tenbxarr) From f31578277a1841c937f7314cde57398a7b4e6a4e Mon Sep 17 00:00:00 2001 From: Kyle Rollins Hansen Date: Wed, 25 May 2016 18:03:38 -0400 Subject: [PATCH 5/5] Update binaryoperators.py to remove spark_context local testing elements --- test/spark/test_spark_binaryoperators.py | 44 ++++++++++++------------ 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/test/spark/test_spark_binaryoperators.py b/test/spark/test_spark_binaryoperators.py index b3dc98b..40b5425 100644 --- a/test/spark/test_spark_binaryoperators.py +++ b/test/spark/test_spark_binaryoperators.py @@ -3,28 +3,28 @@ from bolt import array from bolt.utils import allclose -from pyspark import SparkConf, SparkContext - -@pytest.fixture(scope="session", - params=[pytest.mark.spark_yarn('yarn'), - pytest.mark.spark_local('local')]) -def sc(request): - if request.param == 'local': - conf = (SparkConf() - .setMaster("local[2]") - .setAppName("pytest-pyspark-local-testing") - ) - elif request.param == 'yarn': - conf = (SparkConf() - .setMaster("yarn-client") - .setAppName("pytest-pyspark-yarn-testing") - .set("spark.executor.memory", "1g") - .set("spark.executor.instances", 2) - ) - request.addfinalizer(lambda: spark_context.stop()) - - spark_context = SparkContext(conf=conf) - return spark_context +#from pyspark import SparkConf, SparkContext +# +#@pytest.fixture(scope="session", +# params=[pytest.mark.spark_yarn('yarn'), +# pytest.mark.spark_local('local')]) +#def sc(request): +# if request.param == 'local': +# conf = (SparkConf() +# .setMaster("local[2]") +# .setAppName("pytest-pyspark-local-testing") +# ) +# elif request.param == 'yarn': +# conf = (SparkConf() +# .setMaster("yarn-client") +# .setAppName("pytest-pyspark-yarn-testing") +# .set("spark.executor.memory", "1g") +# .set("spark.executor.instances", 2) +# ) +# request.addfinalizer(lambda: spark_context.stop()) +# +# spark_context = SparkContext(conf=conf) +# return spark_context def test_elementwise_spark(sc): x = arange(1, 2*3*4+1).reshape(2, 3, 4)