Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 92 additions & 2 deletions bolt/spark/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,21 @@ def mean(self, axis=None, keepdims=False):
"""
return self._stat(axis, name='mean', keepdims=keepdims)

def nanmean(self, axis=None, keepdims=False):
"""
Return the nanmean of the array over the given axis.

Parameters
----------
axis : tuple or int, optional, default=None
Axis to compute statistic over, if None
will compute over all axes

keepdims : boolean, optional, default=False
Keep axis remaining after operation with size 1.
"""
return self._stat(axis, name='nanmean', keepdims=keepdims)

def var(self, axis=None, keepdims=False):
"""
Return the variance of the array over the given axis.
Expand All @@ -347,7 +362,22 @@ def var(self, axis=None, keepdims=False):
keepdims : boolean, optional, default=False
Keep axis remaining after operation with size 1.
"""
return self._stat(axis, name='variance', keepdims=keepdims)
return self._stat(axis, name='var', keepdims=keepdims)

def nanvar(self, axis=None, keepdims=False):
"""
Return the variance of the array over the given axis ignoring NaNs.

Parameters
----------
axis : tuple or int, optional, default=None
Axis to compute statistic over, if None
will compute over all axes

keepdims : boolean, optional, default=False
Keep axis remaining after operation with size 1.
"""
return self._stat(axis, name='nanvar', keepdims=keepdims)

def std(self, axis=None, keepdims=False):
"""
Expand All @@ -362,7 +392,22 @@ def std(self, axis=None, keepdims=False):
keepdims : boolean, optional, default=False
Keep axis remaining after operation with size 1.
"""
return self._stat(axis, name='stdev', keepdims=keepdims)
return self._stat(axis, name='std', keepdims=keepdims)

def nanstd(self, axis=None, keepdims=False):
"""
Return the standard deviation of the array over the given axis ignoring NaNs.

Parameters
----------
axis : tuple or int, optional, default=None
Axis to compute statistic over, if None
will compute over all axes

keepdims : boolean, optional, default=False
Keep axis remaining after operation with size 1.
"""
return self._stat(axis, name='nanstd', keepdims=keepdims)

def sum(self, axis=None, keepdims=False):
"""
Expand All @@ -380,6 +425,21 @@ def sum(self, axis=None, keepdims=False):
from operator import add
return self._stat(axis, func=add, keepdims=keepdims)

def nansum(self, axis=None, keepdims=False):
"""
Return the sum of the array over the given axis ignoring NaNs.

Parameters
----------
axis : tuple or int, optional, default=None
Axis to compute statistic over, if None
will compute over all axes

keepdims : boolean, optional, default=False
Keep axis remaining after operation with size 1.
"""
return self._stat(axis, name='nansum', keepdims=keepdims)

def max(self, axis=None, keepdims=False):
"""
Return the maximum of the array over the given axis.
Expand All @@ -396,6 +456,21 @@ def max(self, axis=None, keepdims=False):
from numpy import maximum
return self._stat(axis, func=maximum, keepdims=keepdims)

def nanmax(self, axis=None, keepdims=False):
"""
Return the maximum of the array over the given axis ignoring NaNs.

Parameters
----------
axis : tuple or int, optional, default=None
Axis to compute statistic over, if None
will compute over all axes

keepdims : boolean, optional, default=False
Keep axis remaining after operation with size 1.
"""
return self._stat(axis, name='nanmax', keepdims=keepdims)

def min(self, axis=None, keepdims=False):
"""
Return the minimum of the array over the given axis.
Expand All @@ -412,6 +487,21 @@ def min(self, axis=None, keepdims=False):
from numpy import minimum
return self._stat(axis, func=minimum, keepdims=keepdims)

def nanmin(self, axis=None, keepdims=False):
"""
Return the minimum of the array over the given axis ignoring NaNs.

Parameters
----------
axis : tuple or int, optional, default=None
Axis to compute statistic over, if None
will compute over all axes

keepdims : boolean, optional, default=False
Keep axis remaining after operation with size 1.
"""
return self._stat(axis, name='nanmin', keepdims=keepdims)

def concatenate(self, arry, axis=0):
"""
Join this array with another array.
Expand Down
189 changes: 161 additions & 28 deletions bolt/spark/statcounter.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,49 @@
# This code is based on pyspark's statcounter.py and used under the ASF 2.0 license.

import copy
import sys
from itertools import chain
from numpy import zeros, sqrt, isnan, fmin, fmax, nansum, dstack

from numpy import sqrt
# python 3 compatibility
if sys.version_info > (3,):
long = int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's just use int

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nevermind...

try:
basestring
except NameError:
basestring = str


class StatCounter(object):

REQUIRED_FOR = {
'mean': ('mu',),
'sum': ('mu',),
'variance': ('mu', 'm2'),
'stdev': ('mu', 'm2'),
'all': ('mu', 'm2')
'mean': ('mu', 'n_n'),
'sum': ('mu', 'n_n'),
'var': ('mu', 'm2', 'n_n'),
'std': ('mu', 'n', 'm2', 'n_n'),
'nanmean': ('mu_n', 'n_n'),
'nansum': ('mu_n', 'n_n'),
'nanvar': ('mu_n', 'm2_n', 'n_n'),
'nanstd': ('mu_n', 'm2_n', 'n_n'),
'nanmin': ('minvalue_n', 'n_n'),
'nanmax': ('maxvalue_n', 'n_n'),
'all': ('n', 'mu', 'm2', 'n_n', 'mu_n', 'm2_n')
}

def __init__(self, values=(), stats='all'):
self.n = 0
self.mu = 0.0
self.m2 = 0.0
self.n = long(0) # Running count of our values
self.mu = 0.0 # Running mean of our values
self.m2 = 0.0 # Running variance numerator (sum of (x - mean)^2)
self.n_n = None # Running count of our values without NaNs
self.mu_n = None # Running mean of our values without NaNs
self.m2_n = None # Running variance numerator (sum of (x - mean)^2) without NaNs
self.maxvalue_n = None # Running max value without NaNs
self.minvalue_n = None # Running min value without NaNs

if isinstance(stats, str):
if isinstance(stats, basestring):
stats = [stats]
self.required = frozenset(chain().from_iterable([StatCounter.REQUIRED_FOR[stat] for stat in stats]))

self.required_attrs = frozenset(chain().from_iterable([StatCounter.REQUIRED_FOR[stat] for stat in stats]))

for v in values:
self.merge(v)
Expand All @@ -56,33 +76,59 @@ def merge(self, value):
if self.__requires('m2'):
self.m2 += delta * (value - self.mu)

if self.n_n is None:
# Create the initial counter and set it to zeros
self.n_n = zeros(value.shape)
self.mu_n = zeros(value.shape)
self.m2_n = zeros(value.shape)

self.n_n += ~isnan(value)
if self.__requires('mu_n'):
delta = (value - self.mu_n).squeeze()
if nansum(isnan(value)):
delta[isnan(value)] = 0
self.mu_n = nansum(dstack((self.mu_n, (delta / self.n_n))), axis=2)

if self.__requires('m2_n'):
# Since value can have nans - replace with zeros
temp = value
temp[isnan(temp)] = 0
self.m2_n += delta * (temp - self.mu_n).squeeze()
if self.__requires('maxvalue_n'):
self.maxvalue_n = fmax(self.maxvalue_n, value) if not self.maxvalue_n is None else value
if self.__requires('minvalue_n'):
self.minvalue_n = fmin(self.minvalue_n, value) if not self.minvalue_n is None else value

return self

# checks whether the passed attribute name is required to be updated in order to support the
# statistics requested in self.requested
def __requires(self, attrname):
return attrname in self.required
return attrname in self.required_attrs

# merge another StatCounter into this one, adding up the statistics
def combine(self, other):
if not isinstance(other, StatCounter):
raise Exception("can only merge StatCounters!")
raise Exception("Can only merge Statcounters!")

# reference equality holds
if other is self:
# avoid overwriting fields in a weird order
self.merge(copy.deepcopy(other))
if other is self: # reference equality holds
self.merge(copy.deepcopy(other)) # Avoid overwriting fields in a weird order
else:
# accumulator should only be updated if it's valid in both statcounters
self.required = set(self.required).intersection(set(other.required))
# accumulator should only be updated if it's valid in both statcounters:
self.required_attrs = set(self.required_attrs).intersection(set(other.required_attrs))

if self.n == 0:
self.n = other.n
for attrname in ('mu', 'm2'):
for attrname in ('mu', 'm2', 'n_n', 'mu_n', 'm2_n', 'maxvalue_n', 'minvalue_n'):
if self.__requires(attrname):
setattr(self, attrname, getattr(other, attrname))

elif other.n != 0:
if self.n_n is None:
# Create the initial counter and set it to zeros
self.n_n = zeros(other.shape)
self.mu_n = zeros(other.shape)
self.m2_n = zeros(other.shape)
if self.__requires('mu'):
delta = other.mu - self.mu
if other.n * 10 < self.n:
Expand All @@ -96,16 +142,40 @@ def combine(self, other):
self.m2 += other.m2 + (delta * delta * self.n * other.n) / (self.n + other.n)

self.n += other.n

if self.__requires('mu_n'):
delta = other.mu_n - self.mu_n
self.mu_n = (self.mu_n * self.n_n + other.mu_n * other.n_n) / (self.n_n + other.n_n)

#Set areas with no data to zero
self.mu_n[isnan(self.mu_n)] = 0

if self.__requires('m2_n'):
temp = (delta * delta * self.n_n * other.n_n) / (self.n_n + other.n_n)
temp[isnan(temp)] = 0
self.m2_n += other.m2_n + temp.squeeze()

if self.__requires('maxvalue_n'):
self.maxvalue_n = fmax(self.maxvalue_n, other.maxvalue_n)
if self.__requires('minvalue_n'):
self.minvalue_n = fmin(self.minvalue_n, other.minvalue_n)

self.n_n += other.n_n

return self

def count(self):
return self.n
# Clone this StatCounter
def copy(self):
return copy.deepcopy(self)

def __isavail(self, attrname):
if not all(attr in self.required for attr in StatCounter.REQUIRED_FOR[attrname]):
if not all(attr in self.required_attrs for attr in StatCounter.REQUIRED_FOR[attrname]):
raise ValueError("'%s' stat not available, must be requested at "
"StatCounter instantiation" % attrname)

def count(self):
return self.n

@property
def mean(self):
self.__isavail('mean')
Expand All @@ -116,15 +186,78 @@ def sum(self):
self.__isavail('sum')
return self.n * self.mu

# Return the variance of the values.
@property
def variance(self):
self.__isavail('variance')
def var(self):
self.__isavail('var')
if self.n == 0:
return float('nan')
else:
return self.m2 / self.n

@property
def stdev(self):
self.__isavail('stdev')
return sqrt(self.variance)
def std(self):
self.__isavail('std')
return sqrt(self.var)

def nancount(self):
return self.n_n

@property
def nanmean(self):
self.__isavail('nanmean')
counts = self.nancount()
mean = self.mu_n.squeeze()
if counts.shape != ():
mean[counts == 0] = float('NaN')
elif counts == 0:
return float('NaN')
return mean


@property
def nansum(self):
self.__isavail('nansum')
return self.nancount() * self.mu_n.squeeze()

@property
def nanmin(self):
self.__isavail('nanmin')
counts = self.nancount()
min = self.minvalue_n
if counts.shape != ():
min[counts == 0] = float('NaN')
elif counts == 0:
return float('NaN')
return min

@property
def nanmax(self):
self.__isavail('nanmax')
counts = self.nancount()
max = self.maxvalue_n
if counts.shape != ():
max[counts == 0] = float('NaN')
elif counts == 0:
return float('NaN')
return max

# Return the variance of the values.
@property
def nanvar(self):
self.__isavail('nanvar')
counts = self.nancount()
var = self.m2_n
return var / counts

# Return the standard deviation of the values.
@property
def nanstd(self):
self.__isavail('nanstd')
return sqrt(self.nanvar)

def __repr__(self):
return ("(count: %s, mean: %s, std: %s, required: %s, nancount: %s, nanmean: %s, nanstd: %s, nanmin: %s, "
"nanmax: %s)" %
(self.count(), self.mean, self.std, str(tuple(self.required_attrs)), self.nancount(),
self.nanmean(), self.nanstd, self.nanmin, self.nanmax))
Loading