Skip to content
This repository has been archived by the owner on Sep 11, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1035 from marscher/fix_streaming_estimation_trans…
Browse files Browse the repository at this point in the history
…former_data_prod

Fix streaming estimation transformer data producer
  • Loading branch information
marscher authored Feb 3, 2017
2 parents 56a100b + 82f7998 commit d23dcaa
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 22 deletions.
16 changes: 16 additions & 0 deletions doc/source/CHANGELOG.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
Changelog
=========

2.3.1 (tba)
-----------

**New features**:

- msm:
- ImpliedTimescales: enable insertion/removal of lag times.
Avoid recomputing existing models. #1030

**Fixes**:

- coordinates:
- If Estimators supporting streaming are used directly, restore previous behaviour. #1034
Note that estimators used directly from the API are not affected.


2.3 (6-1-2017)
--------------

Expand Down
11 changes: 6 additions & 5 deletions pyemma/coordinates/clustering/assign.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,12 @@ def describe(self):

@AbstractClustering.data_producer.setter
def data_producer(self, dp):
# check dimensions
dim = self.clustercenters.shape[1]
if not dim == dp.dimension():
raise ValueError('cluster centers have wrong dimension. Have dim=%i'
', but input has %i' % (dim, dp.dimension()))
if dp is not None:
# check dimensions
dim = self.clustercenters.shape[1]
if not dim == dp.dimension():
raise ValueError('cluster centers have wrong dimension. Have dim=%i'
', but input has %i' % (dim, dp.dimension()))
AbstractClustering.data_producer.fset(self, dp)

def _estimate(self, iterable, **kw):
Expand Down
6 changes: 4 additions & 2 deletions pyemma/coordinates/data/_base/streaming_estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ def __init__(self, chunksize=None):
self._chunksize = chunksize

def estimate(self, X, **kwargs):
# ensure the input is able to provide a stream
if not isinstance(X, Iterable):
if isinstance(X, np.ndarray) or \
(isinstance(X, (list, tuple)) and len(X) > 0 and all([isinstance(x, np.ndarray) for x in X])):
(isinstance(X, (list, tuple)) and len(X) > 0 and all((isinstance(x, np.ndarray) for x in X))):
X = DataInMemory(X, self.chunksize)
else:
raise ValueError("no np.ndarray or non-empty list of np.ndarrays given")

# Because we want to use pipelining methods like get_output, we have to set a data producer.
self.data_producer = X
# run estimation
try:
super(StreamingEstimator, self).estimate(X, **kwargs)
Expand Down
21 changes: 15 additions & 6 deletions pyemma/coordinates/data/_base/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import six
from pyemma._ext.sklearn.base import TransformerMixin
from pyemma.coordinates.data._base.datasource import DataSource, DataSourceIterator
from pyemma.coordinates.data._base.iterable import Iterable
from pyemma.coordinates.data._base.random_accessible import RandomAccessStrategy
from pyemma.coordinates.data._base.streaming_estimator import StreamingEstimator
from pyemma.coordinates.util.change_notification import (inform_children_upon_change,
Expand Down Expand Up @@ -108,6 +109,9 @@ class StreamingTransformer(Transformer, DataSource, NotifyOnChangesMixIn):

r""" Basis class for pipelined Transformers.
This class derives from DataSource, so follow up pipeline elements can stream
the output of this class.
Parameters
----------
chunksize : int (optional)
Expand All @@ -116,24 +120,28 @@ class StreamingTransformer(Transformer, DataSource, NotifyOnChangesMixIn):
"""
def __init__(self, chunksize=1000):
super(StreamingTransformer, self).__init__(chunksize=chunksize)
self._estimated = False
self._data_producer = None
self.data_producer = None
self._Y_source = None

@property
# overload of DataSource
def data_producer(self):
if not hasattr(self, '_data_producer'):
return None
return self._data_producer

@data_producer.setter
@inform_children_upon_change
def data_producer(self, dp):
if dp is not self._data_producer:
if dp is not self.data_producer:
# first unregister from current dataproducer
if self._data_producer is not None and isinstance(self._data_producer, NotifyOnChangesMixIn):
self._data_producer._stream_unregister_child(self)
if self.data_producer is not None and isinstance(self.data_producer, NotifyOnChangesMixIn):
self.data_producer._stream_unregister_child(self)
# then register this instance as a child of the new one.
if dp is not None and isinstance(dp, NotifyOnChangesMixIn):
dp._stream_register_child(self)
if dp is not None and not isinstance(dp, Iterable):
raise ValueError('can not set data_producer to non-iterable class of type {}'.format(type(dp)))
self._data_producer = dp
# register random access strategies
self._set_random_access_strategies()
Expand Down Expand Up @@ -215,9 +223,10 @@ def n_frames_total(self, stride=1, skip=0):

class StreamingEstimationTransformer(StreamingTransformer, StreamingEstimator):
""" Basis class for pipelined Transformers, which perform also estimation. """

def estimate(self, X, **kwargs):
super(StreamingEstimationTransformer, self).estimate(X, **kwargs)
# we perform the mapping to memory exactly here, because a StreamingEstimator on its own
# has not output to be mapped. Only the combination of Estimation/Transforming has this feature.
if self.in_memory and not self._mapping_to_mem_active:
self._map_to_memory()
return self
Expand Down
41 changes: 33 additions & 8 deletions pyemma/coordinates/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,6 @@ def setUpClass(cls):
cls.rt = coor.cluster_uniform_time(data = cls.X, k = 100)
cls.cl = [cls.km, cls.rs, cls.rt]

def setUp(self):
pass

def test_chunksize(self):
for c in self.cl:
assert types.is_int(c.chunksize)
Expand Down Expand Up @@ -138,11 +135,6 @@ def test_output_type(self):
for c in self.cl:
assert c.output_type() == np.int32

def test_parametrize(self):
for c in self.cl:
# nothing should happen
c.parametrize()

def test_save_dtrajs(self):
extension = ".dtraj"
outdir = self.dtraj_dir
Expand Down Expand Up @@ -170,5 +162,38 @@ def test_trajectory_lengths(self):
assert c.trajectory_lengths()[0] == c.trajectory_length(0)


class TestClusterDirect(TestCluster):
# perform all the tests of TestCluster, but use Estimator classes directly without API.
@classmethod
def setUpClass(cls):
from pyemma.coordinates.clustering import KmeansClustering, RegularSpaceClustering, UniformTimeClustering
cls.dtraj_dir = tempfile.mkdtemp()

# generate Gaussian mixture
means = [np.array([-3,0]),
np.array([-1,1]),
np.array([0,0]),
np.array([1,-1]),
np.array([4,2])]
widths = [np.array([0.3,2]),
np.array([0.3,2]),
np.array([0.3,2]),
np.array([0.3,2]),
np.array([0.3,2])]
# continuous trajectory
nsample = 1000
cls.T = len(means)*nsample
cls.X = np.zeros((cls.T, 2))
for i in range(len(means)):
cls.X[i*nsample:(i+1)*nsample,0] = widths[i][0] * np.random.randn() + means[i][0]
cls.X[i*nsample:(i+1)*nsample,1] = widths[i][1] * np.random.randn() + means[i][1]
# cluster in different ways
cls.km = KmeansClustering(n_clusters=100).estimate(cls.X)
cls.rs = RegularSpaceClustering(dmin=0.5).estimate(cls.X)
cls.rt = UniformTimeClustering(n_clusters=100).estimate(cls.X)
cls.cl = [cls.km, cls.rs, cls.rt]
return cls


if __name__ == "__main__":
unittest.main()
8 changes: 8 additions & 0 deletions pyemma/coordinates/tests/test_pca.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,5 +222,13 @@ def test_feature_correlation_data(self):
true_corr = np.corrcoef(feature_traj.T, pca_traj.T)[:nfeat,-npcs:]
np.testing.assert_allclose(test_corr, true_corr, atol=1.E-8)

def test_pipelining_sklearn_compat(self):
from pyemma.coordinates.transform import PCA
t = PCA(dim=2)
x = np.random.random((20, 3))
y = t.fit_transform(x)
y2 = t.get_output()
np.testing.assert_allclose(y2[0], y)

if __name__ == "__main__":
unittest.main()
10 changes: 9 additions & 1 deletion pyemma/coordinates/tests/test_tica.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,15 @@ def test_too_short_trajs(self):

def test_with_skip(self):
data = np.random.random((100, 10))
tica_obj = api.tica(lag=10, dim=1, skip=1)
tica_obj = api.tica(data, lag=10, dim=1, skip=1)

def test_pipelining_sklearn_compat(self):
from pyemma.coordinates.transform import TICA
t = TICA(1)
x = np.random.random((20, 3))
y = t.fit_transform(x)
y2 = t.get_output()
np.testing.assert_allclose(y2[0], y)


class TestTICAExtensive(unittest.TestCase):
Expand Down

0 comments on commit d23dcaa

Please sign in to comment.