Skip to content
This repository has been archived by the owner on Sep 11, 2023. It is now read-only.

Commit

Permalink
Merge pull request #678 from clonker/devel_refactor_transformer_rando…
Browse files Browse the repository at this point in the history
…m_access

refactor random access
  • Loading branch information
marscher committed Jan 21, 2016
2 parents fd35074 + a637e65 commit d27b668
Show file tree
Hide file tree
Showing 10 changed files with 586 additions and 201 deletions.
18 changes: 9 additions & 9 deletions pyemma/coordinates/clustering/kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,29 +305,29 @@ def _init_estimate(self):
super(MiniBatchKmeansClustering, self)._init_estimate()

def _estimate(self, iterable, **kw):
# mini-batch kmeans does not use stride. Enforse it.
# mini-batch kmeans does not use stride. Enforce it.
self.stride = None
self._init_estimate()

ipass = 0
i_pass = 0
converged_in_max_iter = False
prev_cost = 0

ra_stride = self._draw_mini_batch_sample()
iterator = iterable.iterator(return_trajindex=True, stride=ra_stride)
iterator = iterable.iterator(return_trajindex=False, stride=ra_stride)

while not (converged_in_max_iter or ipass + 1 >= self.max_iter):
while not (converged_in_max_iter or i_pass + 1 >= self.max_iter):
first_chunk = True
# draw new sample and re-use existing iterator instance.
ra_stride = self._draw_mini_batch_sample()
iterator.stride = ra_stride
iterator.reset()
for itraj, X in iter(iterator):
for X in iter(iterator):
# collect data
self._collect_data(X, first_chunk)
# initialize cluster centers
if ipass == 0:
self._initialize_centers(X, itraj, iterator.pos, iterator.last_chunk)
if i_pass == 0:
self._initialize_centers(X, iterator.current_trajindex, iterator.pos, iterator.last_chunk)
first_chunk = False

# one pass over data completed
Expand All @@ -347,12 +347,12 @@ def _estimate(self, iterable, **kw):

if rel_change <= self.tolerance:
converged_in_max_iter = True
self._logger.info("Cluster centers converged after %i steps." % (ipass + 1))
self._logger.info("Cluster centers converged after %i steps." % (i_pass + 1))
self._progress_force_finish(stage=1)
else:
self._progress_update(1, stage=1)

ipass += 1
i_pass += 1

if not converged_in_max_iter:
self._logger.info("Algorithm did not reach convergence criterion"
Expand Down
112 changes: 81 additions & 31 deletions pyemma/coordinates/data/_base/datasource.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,40 @@
# This file is part of PyEMMA.
#
# Copyright (c) 2015, 2014 Computational Molecular Biology Group, Freie Universitaet Berlin (GER)
#
# PyEMMA is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import functools
from abc import ABCMeta, abstractmethod
from math import ceil

import six
import numpy as np
from math import ceil
import six

from pyemma.coordinates.data._base.iterable import Iterable
from pyemma.coordinates.data._base.random_accessible import TrajectoryRandomAccessible


class DataSource(Iterable):
class DataSource(Iterable, TrajectoryRandomAccessible):
"""
Superclass for all pipeline elements. It inherits "Iterable", therefore serves as an iterator factory over the
data it holds. The difference to Iterable is that DataSource is specialized for trajectories, whereas the concept
of trajectories is generally unknown for Iterable.
"""

def __init__(self, chunksize=100):
super(DataSource, self).__init__(chunksize=chunksize)
self._lengths = []
# storage for arrays (used in _add_array_to_storage)
self._data = []
# following properties have to be set in subclass
Expand All @@ -25,21 +47,34 @@ def ntraj(self):
__doc__ = self.number_of_trajectories.__doc__
return self._ntraj

@property
def is_random_accessible(self):
from pyemma.coordinates.data._base.random_accessible import RandomAccessibleDataSource
return isinstance(self, RandomAccessibleDataSource)

@property
def is_reader(self):
"""
Property telling if this data source is a reader or not.
Returns
-------
bool: True if this data source is a reader and False otherwise
"""
return self._is_reader

@property
def data(self):
"""
Property that returns the data that was hold in storage (data in memory mode).
Returns
-------
list : The stored data.
"""
return self._data

@property
def data_producer(self):
"""
The data producer for this data source object (can be another data source object).
Returns
-------
This data source's data producer.
"""
return self

def number_of_trajectories(self):
Expand Down Expand Up @@ -133,20 +168,24 @@ def _add_array_to_storage(self, array):


class IteratorState(object):
"""
State class holding all the relevant information of an iterator's state.
"""

def __init__(self, stride=1, skip=0, chunk=0, return_trajindex=False, ntraj=0):
self.skip = skip
self.chunk = chunk
self.return_trajindex = return_trajindex
self.itraj = 0
self._current_itraj = 0
self._t = 0
self._pos = 0
self._pos_adv = 0
self.current_itraj = 0
self.t = 0
self.pos = 0
self.pos_adv = 0
self.stride = None
self.uniform_stride = False
self.traj_keys = None
self.trajectory_lengths = None
self.ra_indices_for_traj_dict = {}

def ra_indices_for_traj(self, traj):
"""
Expand All @@ -155,7 +194,10 @@ def ra_indices_for_traj(self, traj):
:return: a Nx1 - np.array of the indices corresponding to the trajectory index
"""
assert not self.uniform_stride, "requested random access indices, but is in uniform stride mode"
return self.stride[self.stride[:, 0] == traj][:, 1] if traj in self.traj_keys else np.array([])
if traj in self.traj_keys:
return self.ra_indices_for_traj_dict[traj]
else:
return np.array([])

def ra_trajectory_length(self, traj):
assert not self.uniform_stride, "requested random access trajectory length, but is in uniform stride mode"
Expand All @@ -179,7 +221,9 @@ def is_stride_sorted(self):


class DataSourceIterator(six.with_metaclass(ABCMeta)):

"""
Abstract class for any data source iterator.
"""
def __init__(self, data_source, skip=0, chunk=0, stride=1, return_trajindex=False):
self._data_source = data_source
self.state = IteratorState(skip=skip, chunk=chunk,
Expand All @@ -193,12 +237,15 @@ def __init_stride(self, stride):
if isinstance(stride, np.ndarray):
keys = stride[:, 0]
self.state.traj_keys, self.state.trajectory_lengths = np.unique(keys, return_counts=True)
self.state.ra_indices_for_traj_dict = {}
for traj in self.state.traj_keys:
self.state.ra_indices_for_traj_dict[traj] = self.state.stride[self.state.stride[:, 0] == traj][:, 1]
else:
self.state.traj_keys = None
self.state.uniform_stride = IteratorState.is_uniform_stride(stride)
if not IteratorState.is_uniform_stride(stride):
if self._data_source.needs_sorted_ra_stride and not self.state.is_stride_sorted():
raise ValueError("For this data source, currently only sorted arrays allowed for random access")
if not self.state.is_stride_sorted():
raise ValueError("Only sorted arrays allowed for iterator pseudo random access")
# skip trajs which are not included in stride
while self.state.itraj not in self.state.traj_keys and self.state.itraj < self._data_source.ntraj:
self.state.itraj += 1
Expand Down Expand Up @@ -262,7 +309,7 @@ def pos(self):
int
The current iterator's position in the current trajectory.
"""
return self.state._pos
return self.state.pos

@property
def current_trajindex(self):
Expand All @@ -273,7 +320,7 @@ def current_trajindex(self):
int
The current iterator's trajectory index.
"""
return self.state._current_itraj
return self.state.current_itraj

@property
def skip(self):
Expand All @@ -296,7 +343,7 @@ def _t(self):
int
The upcoming iterator position.
"""
return self.state._t
return self.state.t

@_t.setter
def _t(self, value):
Expand All @@ -307,7 +354,7 @@ def _t(self, value):
value : int
The upcoming iterator position.
"""
self.state._t = value
self.state.t = value

@property
def _itraj(self):
Expand Down Expand Up @@ -386,7 +433,7 @@ def stride(self, value):
value : int
The new stride parameter.
"""
self.state.stride = value
self.__init_stride(value)

@property
def return_traj_index(self):
Expand Down Expand Up @@ -476,23 +523,26 @@ def _it_next(self):
and self._itraj < self.number_of_trajectories():
self._itraj += 1
# we have to obtain the current index before invoking next_chunk (which increments itraj)
self.state._current_itraj = self._itraj
self.state._pos = self.state._pos_adv
self.state.current_itraj = self._itraj
self.state.pos = self.state.pos_adv
try:
X = self._next_chunk()
except StopIteration:
self._last_chunk_in_traj = True
raise
if self.state._current_itraj != self._itraj:
self.state._pos_adv = 0
if self.state.current_itraj != self._itraj:
self.state.pos_adv = 0
self._last_chunk_in_traj = True
else:
self.state._pos_adv += len(X)
length = self._data_source.trajectory_length(itraj=self.state._current_itraj,
stride=self.stride, skip=self.skip)
self._last_chunk_in_traj = self.state._pos_adv >= length
self.state.pos_adv += len(X)
if self.uniform_stride:
length = self._data_source.trajectory_length(itraj=self.state.current_itraj,
stride=self.stride, skip=self.skip)
else:
length = self.ra_trajectory_length(self.state.current_itraj)
self._last_chunk_in_traj = self.state.pos_adv >= length
if self.return_traj_index:
return self.state._current_itraj, X
return self.state.current_itraj, X
return X

def next(self):
Expand Down
25 changes: 20 additions & 5 deletions pyemma/coordinates/data/_base/iterable.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
# This file is part of PyEMMA.
#
# Copyright (c) 2015, 2014 Computational Molecular Biology Group, Freie Universitaet Berlin (GER)
#
# PyEMMA is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from abc import ABCMeta, abstractmethod
import six
import numpy as np
Expand All @@ -16,11 +33,6 @@ def __init__(self, chunksize=100):
self._in_memory = False
# should be set in subclass
self._ndim = 0
self._needs_sorted_random_access_stride = True

@property
def needs_sorted_ra_stride(self):
return self._needs_sorted_random_access_stride

def dimension(self):
return self._ndim
Expand Down Expand Up @@ -65,13 +77,16 @@ def _clear_in_memory(self):
self._logger.debug("clear memory")
assert self.in_memory, "tried to delete in memory results which are not set"
self._Y = None
self._Y_source = None

def _map_to_memory(self, stride=1):
r"""Maps results to memory. Will be stored in attribute :attr:`_Y`."""
self._logger.debug("mapping to mem")
assert self._in_memory
self._mapping_to_mem_active = True
self._Y = self.get_output(stride=stride)
from pyemma.coordinates.data import DataInMemory
self._Y_source = DataInMemory(self._Y)
self._mapping_to_mem_active = False

def iterator(self, stride=1, lag=0, chunk=None, return_trajindex=True):
Expand Down
Loading

0 comments on commit d27b668

Please sign in to comment.