From 6ac6f604e4bcf29bc0f7a9a614d086cb7e7f6186 Mon Sep 17 00:00:00 2001 From: Steve Varner Date: Tue, 3 May 2016 12:45:10 -0400 Subject: [PATCH 1/7] Adding changes to support nanmean, nammin, nanmax, etc. --- bolt/spark/array.py | 105 +++++++++++++++++++ bolt/spark/statcounter.py | 209 ++++++++++++++++++++++++++++++++++---- 2 files changed, 292 insertions(+), 22 deletions(-) diff --git a/bolt/spark/array.py b/bolt/spark/array.py index d180abd..44e5165 100644 --- a/bolt/spark/array.py +++ b/bolt/spark/array.py @@ -334,6 +334,21 @@ def mean(self, axis=None, keepdims=False): """ return self._stat(axis, name='mean', keepdims=keepdims) + def nanmean(self, axis=None, keepdims=False): + """ + Return the nanmean of the array over the given axis. + + Parameters + ---------- + axis : tuple or int, optional, default=None + Axis to compute statistic over, if None + will compute over all axes + + keepdims : boolean, optional, default=False + Keep axis remaining after operation with size 1. + """ + return self._stat(axis, name='nanmean', keepdims=keepdims) + def var(self, axis=None, keepdims=False): """ Return the variance of the array over the given axis. @@ -349,6 +364,21 @@ def var(self, axis=None, keepdims=False): """ return self._stat(axis, name='variance', keepdims=keepdims) + def nanvar(self, axis=None, keepdims=False): + """ + Return the variance of the array over the given axis ignoring NaNs. + + Parameters + ---------- + axis : tuple or int, optional, default=None + Axis to compute statistic over, if None + will compute over all axes + + keepdims : boolean, optional, default=False + Keep axis remaining after operation with size 1. + """ + return self._stat(axis, name='nanvariance', keepdims=keepdims) + def std(self, axis=None, keepdims=False): """ Return the standard deviation of the array over the given axis. @@ -364,6 +394,21 @@ def std(self, axis=None, keepdims=False): """ return self._stat(axis, name='stdev', keepdims=keepdims) + def nanstd(self, axis=None, keepdims=False): + """ + Return the standard deviation of the array over the given axis ignoring NaNs. + + Parameters + ---------- + axis : tuple or int, optional, default=None + Axis to compute statistic over, if None + will compute over all axes + + keepdims : boolean, optional, default=False + Keep axis remaining after operation with size 1. + """ + return self._stat(axis, name='nanstdev', keepdims=keepdims) + def sum(self, axis=None, keepdims=False): """ Return the sum of the array over the given axis. @@ -380,6 +425,21 @@ def sum(self, axis=None, keepdims=False): from operator import add return self._stat(axis, func=add, keepdims=keepdims) + def nansum(self, axis=None, keepdims=False): + """ + Return the sum of the array over the given axis ignoring NaNs. + + Parameters + ---------- + axis : tuple or int, optional, default=None + Axis to compute statistic over, if None + will compute over all axes + + keepdims : boolean, optional, default=False + Keep axis remaining after operation with size 1. + """ + return self._stat(axis, name='nansum', keepdims=keepdims) + def max(self, axis=None, keepdims=False): """ Return the maximum of the array over the given axis. @@ -396,6 +456,21 @@ def max(self, axis=None, keepdims=False): from numpy import maximum return self._stat(axis, func=maximum, keepdims=keepdims) + def nanmax(self, axis=None, keepdims=False): + """ + Return the maximum of the array over the given axis ignoring NaNs. + + Parameters + ---------- + axis : tuple or int, optional, default=None + Axis to compute statistic over, if None + will compute over all axes + + keepdims : boolean, optional, default=False + Keep axis remaining after operation with size 1. + """ + return self._stat(axis, name='nanmax', keepdims=keepdims) + def min(self, axis=None, keepdims=False): """ Return the minimum of the array over the given axis. @@ -412,6 +487,36 @@ def min(self, axis=None, keepdims=False): from numpy import minimum return self._stat(axis, func=minimum, keepdims=keepdims) + def nanmin(self, axis=None, keepdims=False): + """ + Return the minimum of the array over the given axis ignoring NaNs. + + Parameters + ---------- + axis : tuple or int, optional, default=None + Axis to compute statistic over, if None + will compute over all axes + + keepdims : boolean, optional, default=False + Keep axis remaining after operation with size 1. + """ + return self._stat(axis, name='nanmin', keepdims=keepdims) + + def nancount(self, axis=None, keepdims=False): + """ + Return the count of non NaN values. + + Parameters + ---------- + axis : tuple or int, optional, default=None + Axis to compute statistic over, if None + will compute over all axes + + keepdims : boolean, optional, default=False + Keep axis remaining after operation with size 1. + """ + return self._stat(axis, name='nancount', keepdims=keepdims) + def concatenate(self, arry, axis=0): """ Join this array with another array. diff --git a/bolt/spark/statcounter.py b/bolt/spark/statcounter.py index 162f6eb..616e8e4 100644 --- a/bolt/spark/statcounter.py +++ b/bolt/spark/statcounter.py @@ -20,29 +20,52 @@ # This code is based on pyspark's statcounter.py and used under the ASF 2.0 license. import copy +import math from itertools import chain -from numpy import sqrt +from numpy import zeros, maximum, minimum, sqrt, isnan, fmin, fmax, shape, reshape, invert, amax, amin, nansum, dstack class StatCounter(object): REQUIRED_FOR = { + 'count': ('n',), 'mean': ('mu',), - 'sum': ('mu',), - 'variance': ('mu', 'm2'), - 'stdev': ('mu', 'm2'), - 'all': ('mu', 'm2') + 'sum': ('mu','n'), + 'min': ('minValue',), + 'max': ('maxValue',), + 'variance': ('mu', 'n', 'm2'), + 'sampleVariance': ('mu', 'n', 'm2'), + 'stdev': ('mu', 'n', 'm2'), + 'sampleStdev': ('mu', 'n', 'm2'), + 'nancount': ('n_n',), + 'nanmean': ('mu_n',), + 'nansum': ('mu_n', 'n_n'), + 'nanmin': ('minValue_n',), + 'nanmax': ('maxValue_n',), + 'nanvariance': ('mu_n', 'n_n', 'm2_n'), + 'nansampleVariance': ('mu_n', 'n_n', 'm2_n'), + 'nanstdev': ('mu_n', 'n_n', 'm2_n'), + 'nansampleStdev': ('mu_n', 'n_n', 'm2_n'), + 'all': ('n', 'mu', 'm2', 'minValue', 'maxValue', 'n_n', 'mu_n', 'm2_n', 'minValue_n', 'maxValue_n') } def __init__(self, values=(), stats='all'): - self.n = 0 - self.mu = 0.0 - self.m2 = 0.0 + self.n = 0L # Running count of our values + self.mu = 0.0 # Running mean of our values + self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2) + self.maxValue = None + self.minValue = None + self.n_n = None # Running count of our values + self.mu_n = None # Running mean of our values + self.m2_n = None # Running variance numerator (sum of (x - mean)^2) + self.maxValue_n = None + self.minValue_n = None - if isinstance(stats, str): + if isinstance(stats, basestring): stats = [stats] - self.required = frozenset(chain().from_iterable([StatCounter.REQUIRED_FOR[stat] for stat in stats])) + + self.requiredAttrs = frozenset(chain().from_iterable([StatCounter.REQUIRED_FOR[stat] for stat in stats])) for v in values: self.merge(v) @@ -55,30 +78,53 @@ def merge(self, value): self.mu += delta / self.n if self.__requires('m2'): self.m2 += delta * (value - self.mu) + if self.__requires('maxValue'): + self.maxValue = maximum(self.maxValue, value) if not self.maxValue is None else value + if self.__requires('minValue'): + self.minValue = minimum(self.minValue, value) if not self.minValue is None else value + + if self.n_n is None: + #Create the initial counter and set it to zeros + self.n_n = zeros(value.shape) + self.mu_n = zeros(value.shape) + self.m2_n = zeros(value.shape) + + self.n_n += ~isnan(value) + if self.__requires('mu_n'): + delta = value - self.mu_n + delta[isnan(value)] = 0 + self.mu_n = nansum(dstack((self.mu_n, (delta / self.n_n))),axis=2) + if self.__requires('m2_n'): + #Since value can have nans - replace with zeros + tmpVal = value; + tmpVal[isnan(tmpVal)] = 0 + self.m2_n += delta * (tmpVal - self.mu_n) + if self.__requires('maxValue_n'): + self.maxValue_n = fmax(self.maxValue_n, value) if not self.maxValue_n is None else value + if self.__requires('minValue_n'): + self.minValue_n = fmin(self.minValue_n, value) if not self.minValue_n is None else value return self # checks whether the passed attribute name is required to be updated in order to support the # statistics requested in self.requested def __requires(self, attrname): - return attrname in self.required + return attrname in self.requiredAttrs # merge another StatCounter into this one, adding up the statistics def combine(self, other): if not isinstance(other, StatCounter): - raise Exception("can only merge StatCounters!") + raise Exception("Can only merge Statcounters!") - # reference equality holds - if other is self: - # avoid overwriting fields in a weird order - self.merge(copy.deepcopy(other)) + if other is self: # reference equality holds + self.merge(copy.deepcopy(other)) # Avoid overwriting fields in a weird order else: - # accumulator should only be updated if it's valid in both statcounters - self.required = set(self.required).intersection(set(other.required)) + # accumulator should only be updated if it's valid in both statcounters: + self.requiredAttrs = set(self.requiredAttrs).intersection(set(other.requiredAttrs)) if self.n == 0: self.n = other.n - for attrname in ('mu', 'm2'): + for attrname in ('mu', 'm2', 'maxValue', 'minValue', 'n_n', 'mu_n', 'm2_n', 'maxValue_n', 'minValue_n'): if self.__requires(attrname): setattr(self, attrname, getattr(other, attrname)) @@ -95,16 +141,50 @@ def combine(self, other): if self.__requires('m2'): self.m2 += other.m2 + (delta * delta * self.n * other.n) / (self.n + other.n) + if self.__requires('maxValue'): + self.maxValue = maximum(self.maxValue, other.maxValue) + if self.__requires('minValue'): + self.minValue = minimum(self.minValue, other.minValue) + self.n += other.n + + if self.__requires('mu_n'): + delta = other.mu_n - self.mu_n + self.mu_n = (self.mu_n * self.n_n + other.mu_n * other.n_n) / (self.n_n + other.n_n) + + #Set areas with no data to zero + self.mu_n[isnan(self.mu_n)] = 0 + + + if self.__requires('m2_n'): + tmpAdd = (delta * delta * self.n_n * other.n_n) / (self.n_n + other.n_n) + tmpAdd[isnan(tmpAdd)] = 0 + self.m2_n += other.m2_n + tmpAdd + + if self.__requires('maxValue_n'): + self.maxValue_n = fmax(self.maxValue_n, other.maxValue_n) + if self.__requires('minValue_n'): + self.minValue_n = fmin(self.minValue_n, other.minValue_n) + + self.n_n += other.n_n + + + return self - def count(self): - return self.n + # Clone this StatCounter + def copy(self): + return copy.deepcopy(self) + def __isavail(self, attrname): - if not all(attr in self.required for attr in StatCounter.REQUIRED_FOR[attrname]): + if not all(attr in self.requiredAttrs for attr in StatCounter.REQUIRED_FOR[attrname]): raise ValueError("'%s' stat not available, must be requested at " "StatCounter instantiation" % attrname) + @property + def count(self): + self.__isavail('count') + return self.n @property def mean(self): @@ -116,6 +196,17 @@ def sum(self): self.__isavail('sum') return self.n * self.mu + @property + def min(self): + self.__isavail('min') + return self.minValue + + @property + def max(self): + self.__isavail('max') + return self.maxValue + + # Return the variance of the values. @property def variance(self): self.__isavail('variance') @@ -128,3 +219,77 @@ def variance(self): def stdev(self): self.__isavail('stdev') return sqrt(self.variance) + + # + # Return the sample standard deviation of the values, which corrects for bias in estimating the + # variance by dividing by N-1 instead of N. + # + @property + def sampleStdev(self): + self.__isavail('sampleStdev') + return sqrt(self.sampleVariance) + + @property + def nancount(self): + self.__isavail('nancount') + return self.n_n + + @property + def nanmean(self): + self.__isavail('nanmean') + return self.mu_n + + @property + def nansum(self): + self.__isavail('nansum') + return self.n_n * self.mu_n + + @property + def nanmin(self): + self.__isavail('nanmin') + return self.minValue_n + + @property + def nanmax(self): + self.__isavail('nanmax') + return self.maxValue_n + + # Return the variance of the values. + @property + def nanvariance(self): + self.__isavail('nanvariance') + tmpVar = self.m2_n / self.n_n + #set areas with no data to zero + tmpVar[isnan(tmpVar)] = 0 + return tmpVar + + # + # Return the sample variance, which corrects for bias in estimating the variance by dividing + # by N-1 instead of N. + # + @property + def nansampleVariance(self): + self.__isavail('nansampleVariance') + tmpVar = self.m2_n / (self.n_n - 1) + #set areas with no data to zero + tmpVar[isnan(tmpVar)] = 0 + return tmpVar + + # Return the standard deviation of the values. + @property + def nanstdev(self): + self.__isavail('nanstdev') + return sqrt(self.nanvariance) + + # + # Return the sample standard deviation of the values, which corrects for bias in estimating the + # variance by dividing by N-1 instead of N. + # + @property + def nansampleStdev(self): + self.__isavail('nansampleStdev') + return sqrt(self.nansampleVariance) + + def __repr__(self): + return ("(count: %s, mean: %s, stdev: %s, max: %s, min: %s, required: %s, nancount: %s, nanmean: %s, nanstdev: %s, nanmax: %s, nanmin: %s)" % + (self.count(), self.mean(), self.stdev(), self.max(), self.min(), str(tuple(self.requiredAttrs)), self.nancount(), self.nanmean(), self.nanstdev(), self.nanmax(), self.nanmin())) From b9ff466d3bd9aef82f11d455c96d79ee14928e6c Mon Sep 17 00:00:00 2001 From: "Mohar, Boaz" Date: Tue, 17 May 2016 22:20:39 -0400 Subject: [PATCH 2/7] -removed some of the new methods -added checks to prevent a errors when sending an array without any nan to a nan method -resolved long python 3 error -added tests --- bolt/spark/statcounter.py | 140 +++++++++------------------- test/spark/test_spark_functional.py | 116 ++++++++++++++++++++++- 2 files changed, 159 insertions(+), 97 deletions(-) diff --git a/bolt/spark/statcounter.py b/bolt/spark/statcounter.py index 616e8e4..980468e 100644 --- a/bolt/spark/statcounter.py +++ b/bolt/spark/statcounter.py @@ -20,45 +20,38 @@ # This code is based on pyspark's statcounter.py and used under the ASF 2.0 license. import copy -import math +import sys from itertools import chain from numpy import zeros, maximum, minimum, sqrt, isnan, fmin, fmax, shape, reshape, invert, amax, amin, nansum, dstack +if sys.version_info > (3,): + long = int + class StatCounter(object): REQUIRED_FOR = { - 'count': ('n',), - 'mean': ('mu',), - 'sum': ('mu','n'), - 'min': ('minValue',), - 'max': ('maxValue',), - 'variance': ('mu', 'n', 'm2'), - 'sampleVariance': ('mu', 'n', 'm2'), - 'stdev': ('mu', 'n', 'm2'), - 'sampleStdev': ('mu', 'n', 'm2'), - 'nancount': ('n_n',), - 'nanmean': ('mu_n',), + 'mean': ('mu', 'n_n'), + 'sum': ('mu', 'n_n'), + 'variance': ('mu', 'm2', 'n_n'), + 'stdev': ('mu', 'n', 'm2', 'n_n'), + 'nanmean': ('mu_n', 'n_n'), 'nansum': ('mu_n', 'n_n'), - 'nanmin': ('minValue_n',), - 'nanmax': ('maxValue_n',), - 'nanvariance': ('mu_n', 'n_n', 'm2_n'), - 'nansampleVariance': ('mu_n', 'n_n', 'm2_n'), - 'nanstdev': ('mu_n', 'n_n', 'm2_n'), - 'nansampleStdev': ('mu_n', 'n_n', 'm2_n'), - 'all': ('n', 'mu', 'm2', 'minValue', 'maxValue', 'n_n', 'mu_n', 'm2_n', 'minValue_n', 'maxValue_n') + 'nanvariance': ('mu_n', 'm2_n', 'n_n'), + 'nanstdev': ('mu_n', 'm2_n', 'n_n'), + 'nanmin': ('minValue_n', 'n_n'), + 'nanmax': ('maxValue_n', 'n_n'), + 'all': ('n', 'mu', 'm2', 'n_n', 'mu_n', 'm2_n') } def __init__(self, values=(), stats='all'): - self.n = 0L # Running count of our values + self.n = long(0) # Running count of our values self.mu = 0.0 # Running mean of our values self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2) - self.maxValue = None - self.minValue = None - self.n_n = None # Running count of our values - self.mu_n = None # Running mean of our values - self.m2_n = None # Running variance numerator (sum of (x - mean)^2) + self.n_n = None # Running count of our values without NaNs + self.mu_n = None # Running mean of our values without NaNs + self.m2_n = None # Running variance numerator (sum of (x - mean)^2) without NaNs self.maxValue_n = None self.minValue_n = None @@ -78,27 +71,25 @@ def merge(self, value): self.mu += delta / self.n if self.__requires('m2'): self.m2 += delta * (value - self.mu) - if self.__requires('maxValue'): - self.maxValue = maximum(self.maxValue, value) if not self.maxValue is None else value - if self.__requires('minValue'): - self.minValue = minimum(self.minValue, value) if not self.minValue is None else value if self.n_n is None: - #Create the initial counter and set it to zeros + # Create the initial counter and set it to zeros self.n_n = zeros(value.shape) self.mu_n = zeros(value.shape) self.m2_n = zeros(value.shape) self.n_n += ~isnan(value) if self.__requires('mu_n'): - delta = value - self.mu_n - delta[isnan(value)] = 0 - self.mu_n = nansum(dstack((self.mu_n, (delta / self.n_n))),axis=2) + delta = (value - self.mu_n).squeeze() + if nansum(isnan(value)): + delta[isnan(value)] = 0 + self.mu_n = nansum(dstack((self.mu_n, (delta / self.n_n))), axis=2) + if self.__requires('m2_n'): - #Since value can have nans - replace with zeros + # Since value can have nans - replace with zeros tmpVal = value; tmpVal[isnan(tmpVal)] = 0 - self.m2_n += delta * (tmpVal - self.mu_n) + self.m2_n += delta * (tmpVal - self.mu_n).squeeze() if self.__requires('maxValue_n'): self.maxValue_n = fmax(self.maxValue_n, value) if not self.maxValue_n is None else value if self.__requires('minValue_n'): @@ -124,11 +115,16 @@ def combine(self, other): if self.n == 0: self.n = other.n - for attrname in ('mu', 'm2', 'maxValue', 'minValue', 'n_n', 'mu_n', 'm2_n', 'maxValue_n', 'minValue_n'): + for attrname in ('mu', 'm2', 'n_n', 'mu_n', 'm2_n', 'maxValue_n', 'minValue_n'): if self.__requires(attrname): setattr(self, attrname, getattr(other, attrname)) elif other.n != 0: + if self.n_n is None: + # Create the initial counter and set it to zeros + self.n_n = zeros(other.shape) + self.mu_n = zeros(other.shape) + self.m2_n = zeros(other.shape) if self.__requires('mu'): delta = other.mu - self.mu if other.n * 10 < self.n: @@ -141,11 +137,6 @@ def combine(self, other): if self.__requires('m2'): self.m2 += other.m2 + (delta * delta * self.n * other.n) / (self.n + other.n) - if self.__requires('maxValue'): - self.maxValue = maximum(self.maxValue, other.maxValue) - if self.__requires('minValue'): - self.minValue = minimum(self.minValue, other.minValue) - self.n += other.n if self.__requires('mu_n'): @@ -155,11 +146,10 @@ def combine(self, other): #Set areas with no data to zero self.mu_n[isnan(self.mu_n)] = 0 - if self.__requires('m2_n'): tmpAdd = (delta * delta * self.n_n * other.n_n) / (self.n_n + other.n_n) tmpAdd[isnan(tmpAdd)] = 0 - self.m2_n += other.m2_n + tmpAdd + self.m2_n += other.m2_n + tmpAdd.squeeze() if self.__requires('maxValue_n'): self.maxValue_n = fmax(self.maxValue_n, other.maxValue_n) @@ -168,22 +158,18 @@ def combine(self, other): self.n_n += other.n_n - - return self # Clone this StatCounter def copy(self): return copy.deepcopy(self) - def __isavail(self, attrname): if not all(attr in self.requiredAttrs for attr in StatCounter.REQUIRED_FOR[attrname]): raise ValueError("'%s' stat not available, must be requested at " "StatCounter instantiation" % attrname) - @property + def count(self): - self.__isavail('count') return self.n @property @@ -196,16 +182,6 @@ def sum(self): self.__isavail('sum') return self.n * self.mu - @property - def min(self): - self.__isavail('min') - return self.minValue - - @property - def max(self): - self.__isavail('max') - return self.maxValue - # Return the variance of the values. @property def variance(self): @@ -220,29 +196,18 @@ def stdev(self): self.__isavail('stdev') return sqrt(self.variance) - # - # Return the sample standard deviation of the values, which corrects for bias in estimating the - # variance by dividing by N-1 instead of N. - # - @property - def sampleStdev(self): - self.__isavail('sampleStdev') - return sqrt(self.sampleVariance) - - @property def nancount(self): - self.__isavail('nancount') return self.n_n @property def nanmean(self): self.__isavail('nanmean') - return self.mu_n + return self.mu_n.squeeze() @property def nansum(self): self.__isavail('nansum') - return self.n_n * self.mu_n + return self.n_n * self.mu_n.squeeze() @property def nanmin(self): @@ -259,20 +224,10 @@ def nanmax(self): def nanvariance(self): self.__isavail('nanvariance') tmpVar = self.m2_n / self.n_n - #set areas with no data to zero - tmpVar[isnan(tmpVar)] = 0 - return tmpVar - - # - # Return the sample variance, which corrects for bias in estimating the variance by dividing - # by N-1 instead of N. - # - @property - def nansampleVariance(self): - self.__isavail('nansampleVariance') - tmpVar = self.m2_n / (self.n_n - 1) - #set areas with no data to zero - tmpVar[isnan(tmpVar)] = 0 + # set areas with no data to zero + mask = isnan(tmpVar) + if nansum(mask): + tmpVar[isnan(tmpVar)] = 0 return tmpVar # Return the standard deviation of the values. @@ -281,15 +236,8 @@ def nanstdev(self): self.__isavail('nanstdev') return sqrt(self.nanvariance) - # - # Return the sample standard deviation of the values, which corrects for bias in estimating the - # variance by dividing by N-1 instead of N. - # - @property - def nansampleStdev(self): - self.__isavail('nansampleStdev') - return sqrt(self.nansampleVariance) - def __repr__(self): - return ("(count: %s, mean: %s, stdev: %s, max: %s, min: %s, required: %s, nancount: %s, nanmean: %s, nanstdev: %s, nanmax: %s, nanmin: %s)" % - (self.count(), self.mean(), self.stdev(), self.max(), self.min(), str(tuple(self.requiredAttrs)), self.nancount(), self.nanmean(), self.nanstdev(), self.nanmax(), self.nanmin())) + return ("(count: %s, mean: %s, stdev: %s, required: %s, nancount: %s, nanmean: %s, nanstdev: %s, nanmin: %s, " + "nanmax: %s)" % + (self.count(), self.mean, self.stdev, str(tuple(self.requiredAttrs)), self.nancount(), + self.nanmean(), self.nanstdev, self.nanmin, self.nanmax)) diff --git a/test/spark/test_spark_functional.py b/test/spark/test_spark_functional.py index ad778dd..bddef68 100644 --- a/test/spark/test_spark_functional.py +++ b/test/spark/test_spark_functional.py @@ -1,5 +1,5 @@ import pytest -from numpy import arange, repeat +from numpy import arange, repeat, nan, float32, nanmean, nanmax, nanmin, nanvar, nanstd, nansum from bolt import array from bolt.utils import allclose import generic @@ -116,3 +116,117 @@ def test_max(sc): assert allclose(b.max(axis=0), x.max(axis=0)) assert allclose(b.max(axis=(0, 1)), x.max(axis=(0, 1))) assert b.max(axis=(0, 1, 2)) == x.max(axis=(0, 1, 2)) + +def test_nanmean(sc): + x = arange(2 * 3 * 4).reshape(2, 3, 4).astype(float32) + b = array(x, sc, axis=(0,)) + + assert allclose(b.nanmean(), nanmean(x)) + assert allclose(b.nanmean(axis=0), nanmean(x, axis=0)) + assert allclose(b.nanmean(axis=(0, 1)), nanmean(x, axis=(0, 1))) + assert allclose(b.nanmean(axis=(0, 1, 2)), nanmean(x, axis=(0, 1, 2))) + + x[1, 2, 3] = nan + x[0, 0, 2] = nan + x[1, 1, 0] = nan + b = array(x, sc, axis=(0,)) + + assert allclose(b.nanmean(), nanmean(x)) + assert allclose(b.nanmean(axis=0), nanmean(x, axis=0)) + assert allclose(b.nanmean(axis=(0, 1)), nanmean(x, axis=(0, 1))) + assert allclose(b.nanmean(axis=(0, 1, 2)), nanmean(x, axis=(0, 1, 2))) + +def test_nanstd(sc): + x = arange(2 * 3 * 4).reshape(2, 3, 4).astype(float32) + b = array(x, sc, axis=(0,)) + + assert allclose(b.nanstd(), nanstd(x)) + assert allclose(b.nanstd(axis=0), nanstd(x, axis=0)) + assert allclose(b.nanstd(axis=(0, 1)), nanstd(x, axis=(0, 1))) + assert allclose(b.nanstd(axis=(0, 1, 2)), nanstd(x, axis=(0, 1, 2))) + + x[1, 2, 3] = nan + x[0, 0, 2] = nan + x[1, 1, 0] = nan + b = array(x, sc, axis=(0,)) + + assert allclose(b.nanstd(), nanstd(x)) + assert allclose(b.nanstd(axis=0), nanstd(x, axis=0)) + assert allclose(b.nanstd(axis=(0, 1)), nanstd(x, axis=(0, 1))) + assert allclose(b.nanstd(axis=(0, 1, 2)), nanstd(x, axis=(0, 1, 2))) + +def test_nanvar(sc): + x = arange(2 * 3 * 4).reshape(2, 3, 4).astype(float32) + b = array(x, sc, axis=(0,)) + + assert allclose(b.nanvar(), nanvar(x)) + assert allclose(b.nanvar(axis=0), nanvar(x, axis=0)) + assert allclose(b.nanvar(axis=(0, 1)), nanvar(x, axis=(0, 1))) + assert allclose(b.nanvar(axis=(0, 1, 2)), nanvar(x, axis=(0, 1, 2))) + + x[1, 2, 3] = nan + x[0, 0, 2] = nan + x[1, 1, 0] = nan + b = array(x, sc, axis=(0,)) + + assert allclose(b.nanvar(), nanvar(x)) + assert allclose(b.nanvar(axis=0), nanvar(x, axis=0)) + assert allclose(b.nanvar(axis=(0, 1)), nanvar(x, axis=(0, 1))) + assert allclose(b.nanvar(axis=(0, 1, 2)), nanvar(x, axis=(0, 1, 2))) + +def test_nansum(sc): + x = arange(2 * 3 * 4).reshape(2, 3, 4).astype(float32) + b = array(x, sc, axis=(0,)) + + assert allclose(b.nansum(), nansum(x)) + assert allclose(b.nansum(axis=0), nansum(x, axis=0)) + assert allclose(b.nansum(axis=(0, 1)), nansum(x, axis=(0, 1))) + assert allclose(b.nansum(axis=(0, 1, 2)), nansum(x, axis=(0, 1, 2))) + + x[1, 2, 3] = nan + x[0, 0, 2] = nan + x[1, 1, 0] = nan + b = array(x, sc, axis=(0,)) + + assert allclose(b.nansum(), nansum(x)) + assert allclose(b.nansum(axis=0), nansum(x, axis=0)) + assert allclose(b.nansum(axis=(0, 1)), nansum(x, axis=(0, 1))) + assert allclose(b.nansum(axis=(0, 1, 2)), nansum(x, axis=(0, 1, 2))) + +def test_nanmin(sc): + x = arange(2 * 3 * 4).reshape(2, 3, 4).astype(float32) + b = array(x, sc, axis=(0,)) + + assert allclose(b.nanmin(), nanmin(x)) + assert allclose(b.nanmin(axis=0), nanmin(x, axis=0)) + assert allclose(b.nanmin(axis=(0, 1)), nanmin(x, axis=(0, 1))) + assert allclose(b.nanmin(axis=(0, 1, 2)), nanmin(x, axis=(0, 1, 2))) + + x[1, 2, 3] = nan + x[0, 0, 2] = nan + x[1, 1, 0] = nan + b = array(x, sc, axis=(0,)) + + assert allclose(b.nanmin(), nanmin(x)) + assert allclose(b.nanmin(axis=0), nanmin(x, axis=0)) + assert allclose(b.nanmin(axis=(0, 1)), nanmin(x, axis=(0, 1))) + assert allclose(b.nanmin(axis=(0, 1, 2)), nanmin(x, axis=(0, 1, 2))) + +def test_nanmax(sc): + x = arange(2 * 3 * 4).reshape(2, 3, 4).astype(float32) + b = array(x, sc, axis=(0,)) + + assert allclose(b.nanmax(), nanmax(x)) + assert allclose(b.nanmax(axis=0), nanmax(x, axis=0)) + assert allclose(b.nanmax(axis=(0, 1)), nanmax(x, axis=(0, 1))) + assert allclose(b.nanmax(axis=(0, 1, 2)), nanmax(x, axis=(0, 1, 2))) + + x[1, 2, 3] = nan + x[0, 0, 2] = nan + x[1, 1, 0] = nan + b = array(x, sc, axis=(0,)) + + assert allclose(b.nanmax(), nanmax(x)) + assert allclose(b.nanmax(axis=0), nanmax(x, axis=0)) + assert allclose(b.nanmax(axis=(0, 1)), nanmax(x, axis=(0, 1))) + assert allclose(b.nanmax(axis=(0, 1, 2)), nanmax(x, axis=(0, 1, 2))) \ No newline at end of file From 2f2a621be8e365d2ce09082f47811148358d899e Mon Sep 17 00:00:00 2001 From: "Mohar, Boaz" Date: Tue, 17 May 2016 22:51:19 -0400 Subject: [PATCH 3/7] fixed basestring python 3 error, cleanup --- bolt/spark/statcounter.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/bolt/spark/statcounter.py b/bolt/spark/statcounter.py index 980468e..0f400f0 100644 --- a/bolt/spark/statcounter.py +++ b/bolt/spark/statcounter.py @@ -22,12 +22,15 @@ import copy import sys from itertools import chain +from numpy import zeros, sqrt, isnan, fmin, fmax, nansum, dstack -from numpy import zeros, maximum, minimum, sqrt, isnan, fmin, fmax, shape, reshape, invert, amax, amin, nansum, dstack - +# python 3 compatibility if sys.version_info > (3,): long = int - +try: + basestring +except NameError: + basestring = str class StatCounter(object): From f6fd04a77ebbba63314fabd653e27f7ee2082052 Mon Sep 17 00:00:00 2001 From: "Mohar, Boaz" Date: Wed, 18 May 2016 09:49:15 -0400 Subject: [PATCH 4/7] renaming to match numpy, bug fix for propagating NaNs --- bolt/spark/array.py | 8 +++--- bolt/spark/statcounter.py | 55 ++++++++++++++++++++++----------------- 2 files changed, 35 insertions(+), 28 deletions(-) diff --git a/bolt/spark/array.py b/bolt/spark/array.py index 44e5165..4e88053 100644 --- a/bolt/spark/array.py +++ b/bolt/spark/array.py @@ -362,7 +362,7 @@ def var(self, axis=None, keepdims=False): keepdims : boolean, optional, default=False Keep axis remaining after operation with size 1. """ - return self._stat(axis, name='variance', keepdims=keepdims) + return self._stat(axis, name='var', keepdims=keepdims) def nanvar(self, axis=None, keepdims=False): """ @@ -377,7 +377,7 @@ def nanvar(self, axis=None, keepdims=False): keepdims : boolean, optional, default=False Keep axis remaining after operation with size 1. """ - return self._stat(axis, name='nanvariance', keepdims=keepdims) + return self._stat(axis, name='nanvar', keepdims=keepdims) def std(self, axis=None, keepdims=False): """ @@ -392,7 +392,7 @@ def std(self, axis=None, keepdims=False): keepdims : boolean, optional, default=False Keep axis remaining after operation with size 1. """ - return self._stat(axis, name='stdev', keepdims=keepdims) + return self._stat(axis, name='std', keepdims=keepdims) def nanstd(self, axis=None, keepdims=False): """ @@ -407,7 +407,7 @@ def nanstd(self, axis=None, keepdims=False): keepdims : boolean, optional, default=False Keep axis remaining after operation with size 1. """ - return self._stat(axis, name='nanstdev', keepdims=keepdims) + return self._stat(axis, name='nanstd', keepdims=keepdims) def sum(self, axis=None, keepdims=False): """ diff --git a/bolt/spark/statcounter.py b/bolt/spark/statcounter.py index 0f400f0..8382851 100644 --- a/bolt/spark/statcounter.py +++ b/bolt/spark/statcounter.py @@ -37,12 +37,12 @@ class StatCounter(object): REQUIRED_FOR = { 'mean': ('mu', 'n_n'), 'sum': ('mu', 'n_n'), - 'variance': ('mu', 'm2', 'n_n'), - 'stdev': ('mu', 'n', 'm2', 'n_n'), + 'var': ('mu', 'm2', 'n_n'), + 'std': ('mu', 'n', 'm2', 'n_n'), 'nanmean': ('mu_n', 'n_n'), 'nansum': ('mu_n', 'n_n'), - 'nanvariance': ('mu_n', 'm2_n', 'n_n'), - 'nanstdev': ('mu_n', 'm2_n', 'n_n'), + 'nanvar': ('mu_n', 'm2_n', 'n_n'), + 'nanstd': ('mu_n', 'm2_n', 'n_n'), 'nanmin': ('minValue_n', 'n_n'), 'nanmax': ('maxValue_n', 'n_n'), 'all': ('n', 'mu', 'm2', 'n_n', 'mu_n', 'm2_n') @@ -187,8 +187,8 @@ def sum(self): # Return the variance of the values. @property - def variance(self): - self.__isavail('variance') + def var(self): + self.__isavail('var') if self.n == 0: return float('nan') else: @@ -197,7 +197,7 @@ def variance(self): @property def stdev(self): self.__isavail('stdev') - return sqrt(self.variance) + return sqrt(self.var) def nancount(self): return self.n_n @@ -205,42 +205,49 @@ def nancount(self): @property def nanmean(self): self.__isavail('nanmean') - return self.mu_n.squeeze() + counts = self.nancount() + mean = self.mu_n.squeeze() + mean[counts == 0] = float('NaN') + return mean + @property def nansum(self): self.__isavail('nansum') - return self.n_n * self.mu_n.squeeze() + return self.nancount() * self.mu_n.squeeze() @property def nanmin(self): self.__isavail('nanmin') - return self.minValue_n + counts = self.nancount() + min = self.minValue_n + min[counts == 0] = float('NaN') + return min @property def nanmax(self): self.__isavail('nanmax') - return self.maxValue_n + counts = self.nancount() + max = self.maxValue_n + max[counts == 0] = float('NaN') + return max # Return the variance of the values. @property - def nanvariance(self): - self.__isavail('nanvariance') - tmpVar = self.m2_n / self.n_n - # set areas with no data to zero - mask = isnan(tmpVar) - if nansum(mask): - tmpVar[isnan(tmpVar)] = 0 - return tmpVar + def nanvar(self): + self.__isavail('nanvar') + counts = self.nancount() + var = self.m2_n + return var / counts # Return the standard deviation of the values. @property - def nanstdev(self): - self.__isavail('nanstdev') - return sqrt(self.nanvariance) + def nanstd(self): + self.__isavail('nanstd') + return sqrt(self.nanvar) def __repr__(self): - return ("(count: %s, mean: %s, stdev: %s, required: %s, nancount: %s, nanmean: %s, nanstdev: %s, nanmin: %s, " + return ("(count: %s, mean: %s, stdev: %s, required: %s, nancount: %s, nanmean: %s, nanstd: %s, nanmin: %s, " "nanmax: %s)" % (self.count(), self.mean, self.stdev, str(tuple(self.requiredAttrs)), self.nancount(), - self.nanmean(), self.nanstdev, self.nanmin, self.nanmax)) + self.nanmean(), self.nanstd, self.nanmin, self.nanmax)) From 1376a3181d6b928ced21ff3b933b0577e63a9b68 Mon Sep 17 00:00:00 2001 From: "Mohar, Boaz" Date: Wed, 18 May 2016 09:58:43 -0400 Subject: [PATCH 5/7] fix stdev naming error --- bolt/spark/statcounter.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/bolt/spark/statcounter.py b/bolt/spark/statcounter.py index 8382851..32fbd1c 100644 --- a/bolt/spark/statcounter.py +++ b/bolt/spark/statcounter.py @@ -195,8 +195,8 @@ def var(self): return self.m2 / self.n @property - def stdev(self): - self.__isavail('stdev') + def std(self): + self.__isavail('std') return sqrt(self.var) def nancount(self): @@ -229,6 +229,7 @@ def nanmax(self): self.__isavail('nanmax') counts = self.nancount() max = self.maxValue_n + if len(max) > 0 max[counts == 0] = float('NaN') return max @@ -247,7 +248,7 @@ def nanstd(self): return sqrt(self.nanvar) def __repr__(self): - return ("(count: %s, mean: %s, stdev: %s, required: %s, nancount: %s, nanmean: %s, nanstd: %s, nanmin: %s, " + return ("(count: %s, mean: %s, std: %s, required: %s, nancount: %s, nanmean: %s, nanstd: %s, nanmin: %s, " "nanmax: %s)" % - (self.count(), self.mean, self.stdev, str(tuple(self.requiredAttrs)), self.nancount(), + (self.count(), self.mean, self.std, str(tuple(self.requiredAttrs)), self.nancount(), self.nanmean(), self.nanstd, self.nanmin, self.nanmax)) From 52d03b783d4d52e80408d92e53a2535b631f2882 Mon Sep 17 00:00:00 2001 From: "Mohar, Boaz" Date: Wed, 18 May 2016 10:12:50 -0400 Subject: [PATCH 6/7] More bug fixes for propagating NaNs --- bolt/spark/statcounter.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/bolt/spark/statcounter.py b/bolt/spark/statcounter.py index 32fbd1c..daf95f3 100644 --- a/bolt/spark/statcounter.py +++ b/bolt/spark/statcounter.py @@ -207,7 +207,10 @@ def nanmean(self): self.__isavail('nanmean') counts = self.nancount() mean = self.mu_n.squeeze() - mean[counts == 0] = float('NaN') + if counts.shape != (): + mean[counts == 0] = float('NaN') + elif counts == 0: + return float('NaN') return mean @@ -221,7 +224,10 @@ def nanmin(self): self.__isavail('nanmin') counts = self.nancount() min = self.minValue_n - min[counts == 0] = float('NaN') + if counts.shape != (): + min[counts == 0] = float('NaN') + elif counts == 0: + return float('NaN') return min @property @@ -229,8 +235,10 @@ def nanmax(self): self.__isavail('nanmax') counts = self.nancount() max = self.maxValue_n - if len(max) > 0 - max[counts == 0] = float('NaN') + if counts.shape != (): + max[counts == 0] = float('NaN') + elif counts == 0: + return float('NaN') return max # Return the variance of the values. From f1e51503ddddd7c04015e78bf9da768f3b0f7d48 Mon Sep 17 00:00:00 2001 From: "Mohar, Boaz" Date: Wed, 18 May 2016 13:44:58 -0400 Subject: [PATCH 7/7] -removed nancount -style changes --- bolt/spark/array.py | 15 --------- bolt/spark/statcounter.py | 65 ++++++++++++++++++++------------------- 2 files changed, 33 insertions(+), 47 deletions(-) diff --git a/bolt/spark/array.py b/bolt/spark/array.py index 4e88053..fa0c74f 100644 --- a/bolt/spark/array.py +++ b/bolt/spark/array.py @@ -502,21 +502,6 @@ def nanmin(self, axis=None, keepdims=False): """ return self._stat(axis, name='nanmin', keepdims=keepdims) - def nancount(self, axis=None, keepdims=False): - """ - Return the count of non NaN values. - - Parameters - ---------- - axis : tuple or int, optional, default=None - Axis to compute statistic over, if None - will compute over all axes - - keepdims : boolean, optional, default=False - Keep axis remaining after operation with size 1. - """ - return self._stat(axis, name='nancount', keepdims=keepdims) - def concatenate(self, arry, axis=0): """ Join this array with another array. diff --git a/bolt/spark/statcounter.py b/bolt/spark/statcounter.py index daf95f3..f21ecd4 100644 --- a/bolt/spark/statcounter.py +++ b/bolt/spark/statcounter.py @@ -32,6 +32,7 @@ except NameError: basestring = str + class StatCounter(object): REQUIRED_FOR = { @@ -43,25 +44,25 @@ class StatCounter(object): 'nansum': ('mu_n', 'n_n'), 'nanvar': ('mu_n', 'm2_n', 'n_n'), 'nanstd': ('mu_n', 'm2_n', 'n_n'), - 'nanmin': ('minValue_n', 'n_n'), - 'nanmax': ('maxValue_n', 'n_n'), + 'nanmin': ('minvalue_n', 'n_n'), + 'nanmax': ('maxvalue_n', 'n_n'), 'all': ('n', 'mu', 'm2', 'n_n', 'mu_n', 'm2_n') } def __init__(self, values=(), stats='all'): - self.n = long(0) # Running count of our values - self.mu = 0.0 # Running mean of our values - self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2) - self.n_n = None # Running count of our values without NaNs - self.mu_n = None # Running mean of our values without NaNs - self.m2_n = None # Running variance numerator (sum of (x - mean)^2) without NaNs - self.maxValue_n = None - self.minValue_n = None + self.n = long(0) # Running count of our values + self.mu = 0.0 # Running mean of our values + self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2) + self.n_n = None # Running count of our values without NaNs + self.mu_n = None # Running mean of our values without NaNs + self.m2_n = None # Running variance numerator (sum of (x - mean)^2) without NaNs + self.maxvalue_n = None # Running max value without NaNs + self.minvalue_n = None # Running min value without NaNs if isinstance(stats, basestring): stats = [stats] - self.requiredAttrs = frozenset(chain().from_iterable([StatCounter.REQUIRED_FOR[stat] for stat in stats])) + self.required_attrs = frozenset(chain().from_iterable([StatCounter.REQUIRED_FOR[stat] for stat in stats])) for v in values: self.merge(v) @@ -90,20 +91,20 @@ def merge(self, value): if self.__requires('m2_n'): # Since value can have nans - replace with zeros - tmpVal = value; - tmpVal[isnan(tmpVal)] = 0 - self.m2_n += delta * (tmpVal - self.mu_n).squeeze() - if self.__requires('maxValue_n'): - self.maxValue_n = fmax(self.maxValue_n, value) if not self.maxValue_n is None else value - if self.__requires('minValue_n'): - self.minValue_n = fmin(self.minValue_n, value) if not self.minValue_n is None else value + temp = value + temp[isnan(temp)] = 0 + self.m2_n += delta * (temp - self.mu_n).squeeze() + if self.__requires('maxvalue_n'): + self.maxvalue_n = fmax(self.maxvalue_n, value) if not self.maxvalue_n is None else value + if self.__requires('minvalue_n'): + self.minvalue_n = fmin(self.minvalue_n, value) if not self.minvalue_n is None else value return self # checks whether the passed attribute name is required to be updated in order to support the # statistics requested in self.requested def __requires(self, attrname): - return attrname in self.requiredAttrs + return attrname in self.required_attrs # merge another StatCounter into this one, adding up the statistics def combine(self, other): @@ -114,11 +115,11 @@ def combine(self, other): self.merge(copy.deepcopy(other)) # Avoid overwriting fields in a weird order else: # accumulator should only be updated if it's valid in both statcounters: - self.requiredAttrs = set(self.requiredAttrs).intersection(set(other.requiredAttrs)) + self.required_attrs = set(self.required_attrs).intersection(set(other.required_attrs)) if self.n == 0: self.n = other.n - for attrname in ('mu', 'm2', 'n_n', 'mu_n', 'm2_n', 'maxValue_n', 'minValue_n'): + for attrname in ('mu', 'm2', 'n_n', 'mu_n', 'm2_n', 'maxvalue_n', 'minvalue_n'): if self.__requires(attrname): setattr(self, attrname, getattr(other, attrname)) @@ -150,14 +151,14 @@ def combine(self, other): self.mu_n[isnan(self.mu_n)] = 0 if self.__requires('m2_n'): - tmpAdd = (delta * delta * self.n_n * other.n_n) / (self.n_n + other.n_n) - tmpAdd[isnan(tmpAdd)] = 0 - self.m2_n += other.m2_n + tmpAdd.squeeze() + temp = (delta * delta * self.n_n * other.n_n) / (self.n_n + other.n_n) + temp[isnan(temp)] = 0 + self.m2_n += other.m2_n + temp.squeeze() - if self.__requires('maxValue_n'): - self.maxValue_n = fmax(self.maxValue_n, other.maxValue_n) - if self.__requires('minValue_n'): - self.minValue_n = fmin(self.minValue_n, other.minValue_n) + if self.__requires('maxvalue_n'): + self.maxvalue_n = fmax(self.maxvalue_n, other.maxvalue_n) + if self.__requires('minvalue_n'): + self.minvalue_n = fmin(self.minvalue_n, other.minvalue_n) self.n_n += other.n_n @@ -168,7 +169,7 @@ def copy(self): return copy.deepcopy(self) def __isavail(self, attrname): - if not all(attr in self.requiredAttrs for attr in StatCounter.REQUIRED_FOR[attrname]): + if not all(attr in self.required_attrs for attr in StatCounter.REQUIRED_FOR[attrname]): raise ValueError("'%s' stat not available, must be requested at " "StatCounter instantiation" % attrname) @@ -223,7 +224,7 @@ def nansum(self): def nanmin(self): self.__isavail('nanmin') counts = self.nancount() - min = self.minValue_n + min = self.minvalue_n if counts.shape != (): min[counts == 0] = float('NaN') elif counts == 0: @@ -234,7 +235,7 @@ def nanmin(self): def nanmax(self): self.__isavail('nanmax') counts = self.nancount() - max = self.maxValue_n + max = self.maxvalue_n if counts.shape != (): max[counts == 0] = float('NaN') elif counts == 0: @@ -258,5 +259,5 @@ def nanstd(self): def __repr__(self): return ("(count: %s, mean: %s, std: %s, required: %s, nancount: %s, nanmean: %s, nanstd: %s, nanmin: %s, " "nanmax: %s)" % - (self.count(), self.mean, self.std, str(tuple(self.requiredAttrs)), self.nancount(), + (self.count(), self.mean, self.std, str(tuple(self.required_attrs)), self.nancount(), self.nanmean(), self.nanstd, self.nanmin, self.nanmax))