diff --git a/bolt/spark/array.py b/bolt/spark/array.py index d180abd..fa0c74f 100644 --- a/bolt/spark/array.py +++ b/bolt/spark/array.py @@ -334,6 +334,21 @@ def mean(self, axis=None, keepdims=False): """ return self._stat(axis, name='mean', keepdims=keepdims) + def nanmean(self, axis=None, keepdims=False): + """ + Return the nanmean of the array over the given axis. + + Parameters + ---------- + axis : tuple or int, optional, default=None + Axis to compute statistic over, if None + will compute over all axes + + keepdims : boolean, optional, default=False + Keep axis remaining after operation with size 1. + """ + return self._stat(axis, name='nanmean', keepdims=keepdims) + def var(self, axis=None, keepdims=False): """ Return the variance of the array over the given axis. @@ -347,7 +362,22 @@ def var(self, axis=None, keepdims=False): keepdims : boolean, optional, default=False Keep axis remaining after operation with size 1. """ - return self._stat(axis, name='variance', keepdims=keepdims) + return self._stat(axis, name='var', keepdims=keepdims) + + def nanvar(self, axis=None, keepdims=False): + """ + Return the variance of the array over the given axis ignoring NaNs. + + Parameters + ---------- + axis : tuple or int, optional, default=None + Axis to compute statistic over, if None + will compute over all axes + + keepdims : boolean, optional, default=False + Keep axis remaining after operation with size 1. + """ + return self._stat(axis, name='nanvar', keepdims=keepdims) def std(self, axis=None, keepdims=False): """ @@ -362,7 +392,22 @@ def std(self, axis=None, keepdims=False): keepdims : boolean, optional, default=False Keep axis remaining after operation with size 1. """ - return self._stat(axis, name='stdev', keepdims=keepdims) + return self._stat(axis, name='std', keepdims=keepdims) + + def nanstd(self, axis=None, keepdims=False): + """ + Return the standard deviation of the array over the given axis ignoring NaNs. + + Parameters + ---------- + axis : tuple or int, optional, default=None + Axis to compute statistic over, if None + will compute over all axes + + keepdims : boolean, optional, default=False + Keep axis remaining after operation with size 1. + """ + return self._stat(axis, name='nanstd', keepdims=keepdims) def sum(self, axis=None, keepdims=False): """ @@ -380,6 +425,21 @@ def sum(self, axis=None, keepdims=False): from operator import add return self._stat(axis, func=add, keepdims=keepdims) + def nansum(self, axis=None, keepdims=False): + """ + Return the sum of the array over the given axis ignoring NaNs. + + Parameters + ---------- + axis : tuple or int, optional, default=None + Axis to compute statistic over, if None + will compute over all axes + + keepdims : boolean, optional, default=False + Keep axis remaining after operation with size 1. + """ + return self._stat(axis, name='nansum', keepdims=keepdims) + def max(self, axis=None, keepdims=False): """ Return the maximum of the array over the given axis. @@ -396,6 +456,21 @@ def max(self, axis=None, keepdims=False): from numpy import maximum return self._stat(axis, func=maximum, keepdims=keepdims) + def nanmax(self, axis=None, keepdims=False): + """ + Return the maximum of the array over the given axis ignoring NaNs. + + Parameters + ---------- + axis : tuple or int, optional, default=None + Axis to compute statistic over, if None + will compute over all axes + + keepdims : boolean, optional, default=False + Keep axis remaining after operation with size 1. + """ + return self._stat(axis, name='nanmax', keepdims=keepdims) + def min(self, axis=None, keepdims=False): """ Return the minimum of the array over the given axis. @@ -412,6 +487,21 @@ def min(self, axis=None, keepdims=False): from numpy import minimum return self._stat(axis, func=minimum, keepdims=keepdims) + def nanmin(self, axis=None, keepdims=False): + """ + Return the minimum of the array over the given axis ignoring NaNs. + + Parameters + ---------- + axis : tuple or int, optional, default=None + Axis to compute statistic over, if None + will compute over all axes + + keepdims : boolean, optional, default=False + Keep axis remaining after operation with size 1. + """ + return self._stat(axis, name='nanmin', keepdims=keepdims) + def concatenate(self, arry, axis=0): """ Join this array with another array. diff --git a/bolt/spark/statcounter.py b/bolt/spark/statcounter.py index 162f6eb..f21ecd4 100644 --- a/bolt/spark/statcounter.py +++ b/bolt/spark/statcounter.py @@ -20,29 +20,49 @@ # This code is based on pyspark's statcounter.py and used under the ASF 2.0 license. import copy +import sys from itertools import chain +from numpy import zeros, sqrt, isnan, fmin, fmax, nansum, dstack -from numpy import sqrt +# python 3 compatibility +if sys.version_info > (3,): + long = int +try: + basestring +except NameError: + basestring = str class StatCounter(object): REQUIRED_FOR = { - 'mean': ('mu',), - 'sum': ('mu',), - 'variance': ('mu', 'm2'), - 'stdev': ('mu', 'm2'), - 'all': ('mu', 'm2') + 'mean': ('mu', 'n_n'), + 'sum': ('mu', 'n_n'), + 'var': ('mu', 'm2', 'n_n'), + 'std': ('mu', 'n', 'm2', 'n_n'), + 'nanmean': ('mu_n', 'n_n'), + 'nansum': ('mu_n', 'n_n'), + 'nanvar': ('mu_n', 'm2_n', 'n_n'), + 'nanstd': ('mu_n', 'm2_n', 'n_n'), + 'nanmin': ('minvalue_n', 'n_n'), + 'nanmax': ('maxvalue_n', 'n_n'), + 'all': ('n', 'mu', 'm2', 'n_n', 'mu_n', 'm2_n') } def __init__(self, values=(), stats='all'): - self.n = 0 - self.mu = 0.0 - self.m2 = 0.0 + self.n = long(0) # Running count of our values + self.mu = 0.0 # Running mean of our values + self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2) + self.n_n = None # Running count of our values without NaNs + self.mu_n = None # Running mean of our values without NaNs + self.m2_n = None # Running variance numerator (sum of (x - mean)^2) without NaNs + self.maxvalue_n = None # Running max value without NaNs + self.minvalue_n = None # Running min value without NaNs - if isinstance(stats, str): + if isinstance(stats, basestring): stats = [stats] - self.required = frozenset(chain().from_iterable([StatCounter.REQUIRED_FOR[stat] for stat in stats])) + + self.required_attrs = frozenset(chain().from_iterable([StatCounter.REQUIRED_FOR[stat] for stat in stats])) for v in values: self.merge(v) @@ -56,33 +76,59 @@ def merge(self, value): if self.__requires('m2'): self.m2 += delta * (value - self.mu) + if self.n_n is None: + # Create the initial counter and set it to zeros + self.n_n = zeros(value.shape) + self.mu_n = zeros(value.shape) + self.m2_n = zeros(value.shape) + + self.n_n += ~isnan(value) + if self.__requires('mu_n'): + delta = (value - self.mu_n).squeeze() + if nansum(isnan(value)): + delta[isnan(value)] = 0 + self.mu_n = nansum(dstack((self.mu_n, (delta / self.n_n))), axis=2) + + if self.__requires('m2_n'): + # Since value can have nans - replace with zeros + temp = value + temp[isnan(temp)] = 0 + self.m2_n += delta * (temp - self.mu_n).squeeze() + if self.__requires('maxvalue_n'): + self.maxvalue_n = fmax(self.maxvalue_n, value) if not self.maxvalue_n is None else value + if self.__requires('minvalue_n'): + self.minvalue_n = fmin(self.minvalue_n, value) if not self.minvalue_n is None else value + return self # checks whether the passed attribute name is required to be updated in order to support the # statistics requested in self.requested def __requires(self, attrname): - return attrname in self.required + return attrname in self.required_attrs # merge another StatCounter into this one, adding up the statistics def combine(self, other): if not isinstance(other, StatCounter): - raise Exception("can only merge StatCounters!") + raise Exception("Can only merge Statcounters!") - # reference equality holds - if other is self: - # avoid overwriting fields in a weird order - self.merge(copy.deepcopy(other)) + if other is self: # reference equality holds + self.merge(copy.deepcopy(other)) # Avoid overwriting fields in a weird order else: - # accumulator should only be updated if it's valid in both statcounters - self.required = set(self.required).intersection(set(other.required)) + # accumulator should only be updated if it's valid in both statcounters: + self.required_attrs = set(self.required_attrs).intersection(set(other.required_attrs)) if self.n == 0: self.n = other.n - for attrname in ('mu', 'm2'): + for attrname in ('mu', 'm2', 'n_n', 'mu_n', 'm2_n', 'maxvalue_n', 'minvalue_n'): if self.__requires(attrname): setattr(self, attrname, getattr(other, attrname)) elif other.n != 0: + if self.n_n is None: + # Create the initial counter and set it to zeros + self.n_n = zeros(other.shape) + self.mu_n = zeros(other.shape) + self.m2_n = zeros(other.shape) if self.__requires('mu'): delta = other.mu - self.mu if other.n * 10 < self.n: @@ -96,16 +142,40 @@ def combine(self, other): self.m2 += other.m2 + (delta * delta * self.n * other.n) / (self.n + other.n) self.n += other.n + + if self.__requires('mu_n'): + delta = other.mu_n - self.mu_n + self.mu_n = (self.mu_n * self.n_n + other.mu_n * other.n_n) / (self.n_n + other.n_n) + + #Set areas with no data to zero + self.mu_n[isnan(self.mu_n)] = 0 + + if self.__requires('m2_n'): + temp = (delta * delta * self.n_n * other.n_n) / (self.n_n + other.n_n) + temp[isnan(temp)] = 0 + self.m2_n += other.m2_n + temp.squeeze() + + if self.__requires('maxvalue_n'): + self.maxvalue_n = fmax(self.maxvalue_n, other.maxvalue_n) + if self.__requires('minvalue_n'): + self.minvalue_n = fmin(self.minvalue_n, other.minvalue_n) + + self.n_n += other.n_n + return self - def count(self): - return self.n + # Clone this StatCounter + def copy(self): + return copy.deepcopy(self) def __isavail(self, attrname): - if not all(attr in self.required for attr in StatCounter.REQUIRED_FOR[attrname]): + if not all(attr in self.required_attrs for attr in StatCounter.REQUIRED_FOR[attrname]): raise ValueError("'%s' stat not available, must be requested at " "StatCounter instantiation" % attrname) + def count(self): + return self.n + @property def mean(self): self.__isavail('mean') @@ -116,15 +186,78 @@ def sum(self): self.__isavail('sum') return self.n * self.mu + # Return the variance of the values. @property - def variance(self): - self.__isavail('variance') + def var(self): + self.__isavail('var') if self.n == 0: return float('nan') else: return self.m2 / self.n @property - def stdev(self): - self.__isavail('stdev') - return sqrt(self.variance) + def std(self): + self.__isavail('std') + return sqrt(self.var) + + def nancount(self): + return self.n_n + + @property + def nanmean(self): + self.__isavail('nanmean') + counts = self.nancount() + mean = self.mu_n.squeeze() + if counts.shape != (): + mean[counts == 0] = float('NaN') + elif counts == 0: + return float('NaN') + return mean + + + @property + def nansum(self): + self.__isavail('nansum') + return self.nancount() * self.mu_n.squeeze() + + @property + def nanmin(self): + self.__isavail('nanmin') + counts = self.nancount() + min = self.minvalue_n + if counts.shape != (): + min[counts == 0] = float('NaN') + elif counts == 0: + return float('NaN') + return min + + @property + def nanmax(self): + self.__isavail('nanmax') + counts = self.nancount() + max = self.maxvalue_n + if counts.shape != (): + max[counts == 0] = float('NaN') + elif counts == 0: + return float('NaN') + return max + + # Return the variance of the values. + @property + def nanvar(self): + self.__isavail('nanvar') + counts = self.nancount() + var = self.m2_n + return var / counts + + # Return the standard deviation of the values. + @property + def nanstd(self): + self.__isavail('nanstd') + return sqrt(self.nanvar) + + def __repr__(self): + return ("(count: %s, mean: %s, std: %s, required: %s, nancount: %s, nanmean: %s, nanstd: %s, nanmin: %s, " + "nanmax: %s)" % + (self.count(), self.mean, self.std, str(tuple(self.required_attrs)), self.nancount(), + self.nanmean(), self.nanstd, self.nanmin, self.nanmax)) diff --git a/test/spark/test_spark_functional.py b/test/spark/test_spark_functional.py index ad778dd..bddef68 100644 --- a/test/spark/test_spark_functional.py +++ b/test/spark/test_spark_functional.py @@ -1,5 +1,5 @@ import pytest -from numpy import arange, repeat +from numpy import arange, repeat, nan, float32, nanmean, nanmax, nanmin, nanvar, nanstd, nansum from bolt import array from bolt.utils import allclose import generic @@ -116,3 +116,117 @@ def test_max(sc): assert allclose(b.max(axis=0), x.max(axis=0)) assert allclose(b.max(axis=(0, 1)), x.max(axis=(0, 1))) assert b.max(axis=(0, 1, 2)) == x.max(axis=(0, 1, 2)) + +def test_nanmean(sc): + x = arange(2 * 3 * 4).reshape(2, 3, 4).astype(float32) + b = array(x, sc, axis=(0,)) + + assert allclose(b.nanmean(), nanmean(x)) + assert allclose(b.nanmean(axis=0), nanmean(x, axis=0)) + assert allclose(b.nanmean(axis=(0, 1)), nanmean(x, axis=(0, 1))) + assert allclose(b.nanmean(axis=(0, 1, 2)), nanmean(x, axis=(0, 1, 2))) + + x[1, 2, 3] = nan + x[0, 0, 2] = nan + x[1, 1, 0] = nan + b = array(x, sc, axis=(0,)) + + assert allclose(b.nanmean(), nanmean(x)) + assert allclose(b.nanmean(axis=0), nanmean(x, axis=0)) + assert allclose(b.nanmean(axis=(0, 1)), nanmean(x, axis=(0, 1))) + assert allclose(b.nanmean(axis=(0, 1, 2)), nanmean(x, axis=(0, 1, 2))) + +def test_nanstd(sc): + x = arange(2 * 3 * 4).reshape(2, 3, 4).astype(float32) + b = array(x, sc, axis=(0,)) + + assert allclose(b.nanstd(), nanstd(x)) + assert allclose(b.nanstd(axis=0), nanstd(x, axis=0)) + assert allclose(b.nanstd(axis=(0, 1)), nanstd(x, axis=(0, 1))) + assert allclose(b.nanstd(axis=(0, 1, 2)), nanstd(x, axis=(0, 1, 2))) + + x[1, 2, 3] = nan + x[0, 0, 2] = nan + x[1, 1, 0] = nan + b = array(x, sc, axis=(0,)) + + assert allclose(b.nanstd(), nanstd(x)) + assert allclose(b.nanstd(axis=0), nanstd(x, axis=0)) + assert allclose(b.nanstd(axis=(0, 1)), nanstd(x, axis=(0, 1))) + assert allclose(b.nanstd(axis=(0, 1, 2)), nanstd(x, axis=(0, 1, 2))) + +def test_nanvar(sc): + x = arange(2 * 3 * 4).reshape(2, 3, 4).astype(float32) + b = array(x, sc, axis=(0,)) + + assert allclose(b.nanvar(), nanvar(x)) + assert allclose(b.nanvar(axis=0), nanvar(x, axis=0)) + assert allclose(b.nanvar(axis=(0, 1)), nanvar(x, axis=(0, 1))) + assert allclose(b.nanvar(axis=(0, 1, 2)), nanvar(x, axis=(0, 1, 2))) + + x[1, 2, 3] = nan + x[0, 0, 2] = nan + x[1, 1, 0] = nan + b = array(x, sc, axis=(0,)) + + assert allclose(b.nanvar(), nanvar(x)) + assert allclose(b.nanvar(axis=0), nanvar(x, axis=0)) + assert allclose(b.nanvar(axis=(0, 1)), nanvar(x, axis=(0, 1))) + assert allclose(b.nanvar(axis=(0, 1, 2)), nanvar(x, axis=(0, 1, 2))) + +def test_nansum(sc): + x = arange(2 * 3 * 4).reshape(2, 3, 4).astype(float32) + b = array(x, sc, axis=(0,)) + + assert allclose(b.nansum(), nansum(x)) + assert allclose(b.nansum(axis=0), nansum(x, axis=0)) + assert allclose(b.nansum(axis=(0, 1)), nansum(x, axis=(0, 1))) + assert allclose(b.nansum(axis=(0, 1, 2)), nansum(x, axis=(0, 1, 2))) + + x[1, 2, 3] = nan + x[0, 0, 2] = nan + x[1, 1, 0] = nan + b = array(x, sc, axis=(0,)) + + assert allclose(b.nansum(), nansum(x)) + assert allclose(b.nansum(axis=0), nansum(x, axis=0)) + assert allclose(b.nansum(axis=(0, 1)), nansum(x, axis=(0, 1))) + assert allclose(b.nansum(axis=(0, 1, 2)), nansum(x, axis=(0, 1, 2))) + +def test_nanmin(sc): + x = arange(2 * 3 * 4).reshape(2, 3, 4).astype(float32) + b = array(x, sc, axis=(0,)) + + assert allclose(b.nanmin(), nanmin(x)) + assert allclose(b.nanmin(axis=0), nanmin(x, axis=0)) + assert allclose(b.nanmin(axis=(0, 1)), nanmin(x, axis=(0, 1))) + assert allclose(b.nanmin(axis=(0, 1, 2)), nanmin(x, axis=(0, 1, 2))) + + x[1, 2, 3] = nan + x[0, 0, 2] = nan + x[1, 1, 0] = nan + b = array(x, sc, axis=(0,)) + + assert allclose(b.nanmin(), nanmin(x)) + assert allclose(b.nanmin(axis=0), nanmin(x, axis=0)) + assert allclose(b.nanmin(axis=(0, 1)), nanmin(x, axis=(0, 1))) + assert allclose(b.nanmin(axis=(0, 1, 2)), nanmin(x, axis=(0, 1, 2))) + +def test_nanmax(sc): + x = arange(2 * 3 * 4).reshape(2, 3, 4).astype(float32) + b = array(x, sc, axis=(0,)) + + assert allclose(b.nanmax(), nanmax(x)) + assert allclose(b.nanmax(axis=0), nanmax(x, axis=0)) + assert allclose(b.nanmax(axis=(0, 1)), nanmax(x, axis=(0, 1))) + assert allclose(b.nanmax(axis=(0, 1, 2)), nanmax(x, axis=(0, 1, 2))) + + x[1, 2, 3] = nan + x[0, 0, 2] = nan + x[1, 1, 0] = nan + b = array(x, sc, axis=(0,)) + + assert allclose(b.nanmax(), nanmax(x)) + assert allclose(b.nanmax(axis=0), nanmax(x, axis=0)) + assert allclose(b.nanmax(axis=(0, 1)), nanmax(x, axis=(0, 1))) + assert allclose(b.nanmax(axis=(0, 1, 2)), nanmax(x, axis=(0, 1, 2))) \ No newline at end of file