diff --git a/bolt/factory.py b/bolt/factory.py index e8e781b..84e6699 100644 --- a/bolt/factory.py +++ b/bolt/factory.py @@ -1,9 +1,11 @@ from bolt.local.construct import ConstructLocal from bolt.spark.construct import ConstructSpark +from bolt.localspark.construct import ConstructLocalSpark constructors = [ ('local', ConstructLocal), - ('spark', ConstructSpark) + ('spark', ConstructSpark), + {'localspark', ConstructLocalSpark} ] def wrapped(f): diff --git a/bolt/localspark/__init__.py b/bolt/localspark/__init__.py new file mode 100644 index 0000000..6b92ee4 --- /dev/null +++ b/bolt/localspark/__init__.py @@ -0,0 +1,111 @@ +from collections import defaultdict +class LocalRDD(object): + """ + A simple, inefficient, approximation for a locally stored RDD object (a lazy list) + """ + def __init__(self, kv_list, fcn_to_apply = None): + """ + Parameters + ---------- + kv_list : list + the list of key value pairs + fcn_to_apply : function + the function that should be applied, ndarray -> ndarray + """ + self.kv_list = kv_list + self.lazy_fcn_list = [] if fcn_to_apply is None else fcn_to_apply + self.cached_list = None + + def map(self, c_func): + return LocalRDD(self.kv_list, fcn_to_apply = self.lazy_fcn_list + [c_func]) + + def mapValues(self, c_func): + return self.map(lambda (a,b): (a,c_func(b))) + + def flatMap(self, c_func): + return self.map(c_func).map('flatten') + + def mapPartitions(self, c_func): + """ + Make artificial partitions and executes the function on each one + """ + return self.map(lambda (k,v): c_func([(k,v)])) + + def partitionBy(self, numPartitions, partitionFunc): + """ + The current operations need to be executed before they can be handed off to the partition function + """ + return LocalPartitionedRDD(self.collect(), partitionFunc) + + def values(self): + return self.map(lambda (_, v): v) + + def first(self): + return self.collect()[0] + + def collect(self): + if self.cached_list is None: + self.cached_list = LocalRDD.expand(self) + return self.cached_list + + @property + def context(self): + # making a new one is easier + return LocalSparkContext() + + @staticmethod + def expand(curRDD): + last_list = curRDD.kv_list + for c_func in curRDD.lazy_fcn_list: + if c_func == 'flatten': + out_list = [] + for i in last_list: + out_list += i + last_list = out_list + else: + last_list = map(c_func, last_list) + return last_list + +class LocalPartitionedRDD(object): + + def __init__(self, kv_list, partitionFunc, part_rdd = None): + """ + Creates a partitioned RDD which supports mapPartitions and values operations + + Parameters + ---------- + kv_list : list[(k,v)] + the list of key values + partitionFunc : function + apply to the keys to put them in distinct partitions + part_rdd : dict + to supply the already partitioned dataset with keys as the partition ids + and values as the partition contents + """ + if part_rdd is None: + self.part_rdd = defaultdict(list) + for (k,v) in kv_list: + self.part_rdd[partitionFunc(k)]+=[(k,v)] + else: + self.part_rdd = part_rdd + self.partitionFunc = partitionFunc + self.kv_list = kv_list + + def mapPartitions(self, c_func): + new_part_values = {} + new_kv_list = [] + for partName, partValues in self.part_rdd.iteritems(): + new_values = c_func(partValues) + new_part_values[partName] = new_values + new_kv_list += new_values + return LocalPartitionedRDD(new_kv_list, self.partitionFunc, part_rdd = new_part_values) + + def values(self): + return LocalRDD(self.kv_list, [lambda (_, v): v]) + +class LocalSparkContext(object): + def __init__(self): + pass + def parallelize(self, in_list, npartitions = 0): + return LocalRDD(in_list) + \ No newline at end of file diff --git a/bolt/localspark/construct.py b/bolt/localspark/construct.py new file mode 100644 index 0000000..6c1d388 --- /dev/null +++ b/bolt/localspark/construct.py @@ -0,0 +1,46 @@ +from bolt.construct import ConstructBase +from bolt.spark.construct import ConstructSpark as cs +import numpy as np +from bolt.localspark import LocalSparkContext + + +class ConstructLocalSpark(ConstructBase): + + @staticmethod + def array(a, context=None, axis=(0,), dtype=None, npartitions=None): + return cs.array(a, context = LocalSparkContext(), axis = axis, + dtype=dtype, npartitions=npartitions) + + @staticmethod + def ones(shape, context=None, axis=(0,), dtype=np.float64, npartitions=None): + return cs.ones(a, context = LocalSparkContext(), axis = axis, + dtype=dtype, npartitions=npartitions) + + @staticmethod + def zeros(shape, context=None, axis=(0,), dtype=np.float64, npartitions=None): + return cs.zeros(a, context = LocalSparkContext(), axis = axis, + dtype=dtype, npartitions=npartitions) + + @staticmethod + def concatenate(arrays, axis=0): + return cs.concatenate(array, axis = axis) + + @staticmethod + def _argcheck(*args, **kwargs): + """ + Check that arguments are consistent with localspark array construction. + + Condition is + (1) keyword arg 'context' is the string 'fake' + """ + + return kwargs.get('context', '').find('fake')>=0 + + @staticmethod + def _format_axes(axes, shape): + return cs._format_axes(axes, shape) + + @staticmethod + def _wrap(func, shape, context=None, axis=(0,), dtype=None, npartitions=None): + return cs._wrap(func, shape = shape, context = LocalSparkContext(), axis = axis, + dtype = dtype, npartitions = npartitions)