Skip to content
This repository was archived by the owner on Sep 11, 2023. It is now read-only.

Commit 78dae90

Browse files
authored
Merge pull request #1251 from marscher/chunksize_fixes
[coordinates] handle default_chunksize gracefully.
2 parents 38fbb6e + d1f4b31 commit 78dae90

File tree

18 files changed

+166
-65
lines changed

18 files changed

+166
-65
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ build*
1111
*.egg-info
1212
*.so
1313
/temp
14+
/tmp
1415
/target
1516
__pycache__
1617
/pylint/pylint_*.txt

doc/source/CHANGELOG.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Quick fix release to repair chunking in the coordinates package.
99
**Fixes**:
1010

1111
- msm: fix bug in ImpliedTimescales, which happened when an estimation failed for a given lag time. #1248
12-
- coordinates: fixed handling of default chunksize. #1247
12+
- coordinates: fixed handling of default chunksize. #1247, #1251
1313
- base: updated pybind to 2.2.2. #1249
1414

1515

pyemma/_base/progress/reporter/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,9 @@ def _progress_force_finish(self, stage=0, description=None):
193193

194194
pg = self._prog_rep_progressbars[stage]
195195
pg.desc = description
196-
pg.update(int(pg.total - pg.n))
196+
increment = int(pg.total - pg.n)
197+
if increment > 0:
198+
pg.update(increment)
197199
pg.refresh(nolock=True)
198200
pg.close()
199201
self._prog_rep_progressbars.pop(stage, None)

pyemma/coordinates/api.py

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,10 +1012,12 @@ def pca(data=None, dim=-1, var_cutoff=0.95, stride=1, mean=None, skip=0, chunksi
10121012
warnings.warn("provided mean ignored", DeprecationWarning)
10131013

10141014
res = PCA(dim=dim, var_cutoff=var_cutoff, mean=None, skip=skip, stride=stride)
1015+
from pyemma.util.reflection import get_default_args
1016+
cs = _check_old_chunksize_arg(chunksize, get_default_args(pca)['chunksize'], **kwargs)
10151017
if data is not None:
1016-
from pyemma.util.reflection import get_default_args
1017-
cs = _check_old_chunksize_arg(chunksize, get_default_args(pca)['chunksize'], **kwargs)
10181018
res.estimate(data, chunksize=cs)
1019+
else:
1020+
res.chunksize = cs
10191021
return res
10201022

10211023

@@ -1256,6 +1258,8 @@ def tica(data=None, lag=10, dim=-1, var_cutoff=0.95, kinetic_map=True, commute_m
12561258
weights=weights, reversible=reversible, ncov_max=ncov_max)
12571259
if data is not None:
12581260
res.estimate(data, chunksize=cs)
1261+
else:
1262+
res.chunksize = cs
12591263
return res
12601264

12611265

@@ -1267,14 +1271,13 @@ def vamp(data=None, lag=10, dim=None, scaling=None, right=True, ncov_max=float('
12671271
----------
12681272
lag : int
12691273
lag time
1270-
dim : float or int
1274+
dim : float or int, default=None
12711275
Number of dimensions to keep:
12721276
1273-
* if dim is not set all available ranks are kept:
1277+
* if dim is not set (None) all available ranks are kept:
12741278
`n_components == min(n_samples, n_features)`
12751279
* if dim is an integer >= 1, this number specifies the number
1276-
of dimensions to keep. By default this will use the kinetic
1277-
variance.
1280+
of dimensions to keep.
12781281
* if dim is a float with ``0 < dim < 1``, select the number
12791282
of dimensions such that the amount of kinetic variance
12801283
that needs to be explained is greater than the percentage
@@ -1406,6 +1409,8 @@ def vamp(data=None, lag=10, dim=None, scaling=None, right=True, ncov_max=float('
14061409
res = VAMP(lag, dim=dim, scaling=scaling, right=right, skip=skip, ncov_max=ncov_max)
14071410
if data is not None:
14081411
res.estimate(data, stride=stride, chunksize=chunksize)
1412+
else:
1413+
res.chunksize = chunksize
14091414
return res
14101415

14111416

@@ -1502,6 +1507,8 @@ def covariance_lagged(data=None, c00=True, c0t=True, ctt=False, remove_constant_
15021507
weights=weights, stride=stride, skip=skip, ncov_max=ncov_max)
15031508
if data is not None:
15041509
lc.estimate(data, chunksize=chunksize)
1510+
else:
1511+
lc.chunksize = chunksize
15051512
return lc
15061513

15071514

@@ -1552,10 +1559,12 @@ def cluster_mini_batch_kmeans(data=None, k=100, max_iter=10, batch_size=0.2, met
15521559
from pyemma.coordinates.clustering.kmeans import MiniBatchKmeansClustering
15531560
res = MiniBatchKmeansClustering(n_clusters=k, max_iter=max_iter, metric=metric, init_strategy=init_strategy,
15541561
batch_size=batch_size, n_jobs=n_jobs, skip=skip, clustercenters=clustercenters)
1562+
from pyemma.util.reflection import get_default_args
1563+
cs = _check_old_chunksize_arg(chunksize, get_default_args(cluster_mini_batch_kmeans)['chunksize'], **kwargs)
15551564
if data is not None:
1556-
from pyemma.util.reflection import get_default_args
1557-
cs = _check_old_chunksize_arg(chunksize, get_default_args(cluster_mini_batch_kmeans)['chunksize'], **kwargs)
15581565
res.estimate(data, chunksize=cs)
1566+
else:
1567+
res.chunksize = chunksize
15591568
return res
15601569

15611570

@@ -1687,10 +1696,12 @@ def cluster_kmeans(data=None, k=None, max_iter=10, tolerance=1e-5, stride=1,
16871696
res = KmeansClustering(n_clusters=k, max_iter=max_iter, metric=metric, tolerance=tolerance,
16881697
init_strategy=init_strategy, fixed_seed=fixed_seed, n_jobs=n_jobs, skip=skip,
16891698
keep_data=keep_data, clustercenters=clustercenters, stride=stride)
1699+
from pyemma.util.reflection import get_default_args
1700+
cs = _check_old_chunksize_arg(chunksize, get_default_args(cluster_kmeans)['chunksize'], **kwargs)
16901701
if data is not None:
1691-
from pyemma.util.reflection import get_default_args
1692-
cs = _check_old_chunksize_arg(chunksize, get_default_args(cluster_kmeans)['chunksize'], **kwargs)
16931702
res.estimate(data, chunksize=cs)
1703+
else:
1704+
res.chunksize = cs
16941705
return res
16951706

16961707

@@ -1764,10 +1775,12 @@ def cluster_uniform_time(data=None, k=None, stride=1, metric='euclidean',
17641775
"""
17651776
from pyemma.coordinates.clustering.uniform_time import UniformTimeClustering
17661777
res = UniformTimeClustering(k, metric=metric, n_jobs=n_jobs, skip=skip, stride=stride)
1778+
from pyemma.util.reflection import get_default_args
1779+
cs = _check_old_chunksize_arg(chunksize, get_default_args(cluster_uniform_time)['chunksize'], **kwargs)
17671780
if data is not None:
1768-
from pyemma.util.reflection import get_default_args
1769-
cs = _check_old_chunksize_arg(chunksize, get_default_args(cluster_uniform_time)['chunksize'], **kwargs)
17701781
res.estimate(data, chunksize=cs)
1782+
else:
1783+
res.chunksize = cs
17711784
return res
17721785

17731786

@@ -1863,10 +1876,12 @@ def cluster_regspace(data=None, dmin=-1, max_centers=1000, stride=1, metric='euc
18631876
from pyemma.coordinates.clustering.regspace import RegularSpaceClustering as _RegularSpaceClustering
18641877
res = _RegularSpaceClustering(dmin, max_centers=max_centers, metric=metric,
18651878
n_jobs=n_jobs, stride=stride, skip=skip)
1879+
from pyemma.util.reflection import get_default_args
1880+
cs = _check_old_chunksize_arg(chunksize, get_default_args(cluster_regspace)['chunksize'], **kwargs)
18661881
if data is not None:
1867-
from pyemma.util.reflection import get_default_args
1868-
cs = _check_old_chunksize_arg(chunksize, get_default_args(cluster_regspace)['chunksize'], **kwargs)
18691882
res.estimate(data, chunksize=cs)
1883+
else:
1884+
res.chunksize = cs
18701885
return res
18711886

18721887

@@ -1952,11 +1967,13 @@ def assign_to_centers(data=None, centers=None, stride=1, return_dtrajs=True,
19521967
' or NumPy array or a reader created by source function')
19531968
from pyemma.coordinates.clustering.assign import AssignCenters
19541969
res = AssignCenters(centers, metric=metric, n_jobs=n_jobs, skip=skip, stride=stride)
1970+
from pyemma.util.reflection import get_default_args
1971+
cs = _check_old_chunksize_arg(chunksize, get_default_args(assign_to_centers)['chunksize'], **kwargs)
19551972
if data is not None:
1956-
from pyemma.util.reflection import get_default_args
1957-
cs = _check_old_chunksize_arg(chunksize, get_default_args(assign_to_centers)['chunksize'], **kwargs)
19581973
res.estimate(data, chunksize=cs)
19591974
if return_dtrajs:
19601975
return res.dtrajs
1976+
else:
1977+
res.chunksize = cs
19611978

19621979
return res

pyemma/coordinates/data/_base/datasource.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,7 @@ def number_of_trajectories(self, stride=None):
222222
n = self.ntraj
223223
return n
224224

225-
def trajectory_length(self, itraj, stride=1, skip=None):
225+
def trajectory_length(self, itraj, stride=1, skip=0):
226226
r"""Returns the length of trajectory of the requested index.
227227
228228
Parameters
@@ -246,7 +246,10 @@ def trajectory_length(self, itraj, stride=1, skip=None):
246246
selection = stride[stride[:, 0] == itraj][:, 0]
247247
return 0 if itraj not in selection else len(selection)
248248
else:
249-
return (self._lengths[itraj] - (0 if skip is None else skip) - 1) // int(stride) + 1
249+
skip = 0 if skip is None else skip
250+
res = (self._lengths[itraj] - skip - 1) // int(stride) + 1
251+
assert res >= 0
252+
return res
250253

251254
def n_chunks(self, chunksize, stride=1, skip=0):
252255
""" how many chunks an iterator of this sourcde will output, starting (eg. after calling reset())
@@ -658,6 +661,11 @@ def __init__(self, data_source, skip=0, chunk=0, stride=1, return_trajindex=Fals
658661
self.__init_stride(stride)
659662
self._pos = 0
660663
self._last_chunk_in_traj = False
664+
if not isinstance(stride, np.ndarray) and skip > 0:
665+
# skip over the trajectories that are smaller than skip
666+
while self.state.itraj < self._data_source.ntraj \
667+
and self._data_source.trajectory_length(self.state.itraj, self.stride, 0) <= skip:
668+
self.state.itraj += 1
661669
super(DataSourceIterator, self).__init__()
662670

663671
def __init_stride(self, stride):

pyemma/coordinates/data/_base/iterable.py

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727

2828

2929
class Iterable(six.with_metaclass(ABCMeta, InMemoryMixin, Loggable)):
30+
_FALLBACK_CHUNKSIZE = 1000
3031

3132
def __init__(self, chunksize=None):
3233
super(Iterable, self).__init__()
@@ -41,28 +42,42 @@ def dimension(self):
4142
def ndim(self):
4243
return self.dimension()
4344

45+
@staticmethod
46+
def _compute_default_cs(dim, itemsize, logger=None):
47+
# obtain a human readable memory size from the config, convert it to bytes and calc maximum chunksize.
48+
from pyemma import config
49+
from pyemma.util.units import string_to_bytes
50+
max_bytes = string_to_bytes(config.default_chunksize)
51+
52+
# TODO: consider rounding this to some cache size of CPU? e.g py-cpuinfo can obtain it.
53+
# if one time step is already bigger than max_memory, we set the chunksize to 1.
54+
max_elements = max(1, int(np.floor(max_bytes / (itemsize * dim))))
55+
assert max_elements * dim * itemsize <= max_bytes or max_elements == 1
56+
result = max(1, max_elements // dim)
57+
58+
assert result > 0
59+
if logger is not None:
60+
logger.debug('computed default chunksize to %s'
61+
' to limit memory per chunk to %s', result, config.default_chunksize)
62+
return result
63+
4464
@property
4565
def default_chunksize(self):
46-
""" How much data will be processed at once, in case no chunksize has been provided."""
66+
""" How much data will be processed at once, in case no chunksize has been provided.
67+
68+
Notes
69+
-----
70+
This variable respects your setting for maximum memory in pyemma.config.default_chunksize
71+
"""
4772
if self._default_chunksize is None:
4873
try:
49-
# some overloads of dimension can raise, eg. PCA, TICA
50-
dim = self.dimension()
74+
self.dimension()
75+
self.output_type()
5176
except:
52-
self.logger.info('could not obtain output dimension, defaulting to chunksize=1000')
53-
self._default_chunksize = 1000
77+
self._default_chunksize = Iterable._FALLBACK_CHUNKSIZE
5478
else:
55-
# obtain a human readable memory size from the config, convert it to bytes and calc maximum chunksize.
56-
from pyemma import config
57-
from pyemma.util.units import string_to_bytes
58-
max_bytes = string_to_bytes(config.default_chunksize)
59-
itemsize = np.dtype(self.output_type()).itemsize
60-
# TODO: consider rounding this to some cache size of CPU? e.g py-cpuinfo can obtain it.
61-
# if one time step is already bigger than max_memory, we set the chunksize to 1.
62-
max_elements = max(1, int(np.floor(max_bytes / (itemsize * self.ndim))))
63-
assert max_elements * self.ndim * itemsize <= max_bytes or max_elements == 1
64-
self._default_chunksize = max(1, max_elements // self.ndim)
65-
assert self._default_chunksize > 0, self._default_chunksize
79+
self._default_chunksize = Iterable._compute_default_cs(self.dimension(),
80+
self.output_type()().itemsize, self.logger)
6681
return self._default_chunksize
6782

6883
@property

pyemma/coordinates/data/_base/transformer.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -179,12 +179,15 @@ def chunksize(self):
179179
return self.data_producer.chunksize
180180

181181
@chunksize.setter
182-
def chunksize(self, size):
182+
def chunksize(self, value):
183183
if self.data_producer is None:
184-
if size < 0:
185-
raise ValueError('chunksize has to be positive.')
186-
self._default_chunksize = size
187-
self.data_producer.chunksize = size
184+
if not isinstance(value, (type(None), int)):
185+
raise ValueError('chunksize has to be of type: None or int')
186+
if isinstance(value, int) and value < 0:
187+
raise ValueError("Chunksize of %s was provided, but has to be >= 0" % value)
188+
self._default_chunksize = value
189+
else:
190+
self.data_producer.chunksize = value
188191

189192
def number_of_trajectories(self, stride=1):
190193
return self.data_producer.number_of_trajectories(stride)

pyemma/coordinates/data/feature_reader.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,9 @@ def __init__(self, data_source, skip=0, chunk=0, stride=1, return_trajindex=Fals
311311
return_trajindex=return_trajindex,
312312
cols=cols
313313
)
314+
# set chunksize prior selecting the first file, to ensure we have a sane value for mditer...
315+
self.chunksize = chunk
314316
self._selected_itraj = -1
315-
self._select_file(0)
316317

317318
@property
318319
def chunksize(self):
@@ -322,7 +323,7 @@ def chunksize(self):
322323
def chunksize(self, value):
323324
self.state.chunk = value
324325
if hasattr(self, '_mditer'):
325-
self._mditer._chunksize = int(value)
326+
self._mditer.chunksize = value
326327

327328
@property
328329
def skip(self):
@@ -353,6 +354,8 @@ def _next_chunk(self):
353354
354355
:return: a feature mapped vector X, or (X, Y) if lag > 0
355356
"""
357+
if not hasattr(self, '_mditer') or self._mditer is None:
358+
self._select_file(self._itraj)
356359
try:
357360
chunk = next(self._mditer)
358361
except StopIteration as si:
@@ -408,6 +411,10 @@ def _create_mditer(self):
408411
self._closed = False
409412

410413
def _create_patched_iter(self, filename, skip=0, stride=1, atom_indices=None):
411-
return patches.iterload(filename, chunk=self.chunksize, top=self._data_source.featurizer.topology,
414+
if self.is_uniform_stride(self.stride):
415+
flen = self._data_source.trajectory_length(itraj=self._itraj, stride=self.stride, skip=self.skip)
416+
else:
417+
flen = self.ra_trajectory_length(self._itraj)
418+
return patches.iterload(filename, flen, chunk=self.chunksize, top=self._data_source.featurizer.topology,
412419
skip=skip, stride=stride, atom_indices=atom_indices)
413420

pyemma/coordinates/data/sources_merger.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,8 @@ def _select_file(self, itraj):
9090
self._t = 0
9191
self._itraj = itraj
9292
self._selected_itraj = itraj
93-
if __debug__:
94-
for it in self._iterators:
95-
assert it._itraj == itraj
96-
assert it._selected_itraj == itraj
97-
assert it._t == self._t
93+
for it in self._iterators:
94+
it._select_file(itraj)
95+
assert it._itraj == itraj
96+
assert it._selected_itraj == itraj
97+
assert it._t == self._t

pyemma/coordinates/tests/test_coordinates_iterator.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,19 @@ def test_n_chunks(self):
6262
"returned for stride=%s, lag=%s" % (stride, lag))
6363
assert chunks == it.n_chunks
6464

65+
dd = [np.random.random((100, 3)), np.random.random((120, 3)), np.random.random((120, 3))]
66+
rr = DataInMemory(dd)
67+
68+
# test for lagged iterator
69+
for stride in range(1, 5):
70+
for lag in [x for x in range(0, 18)] + [50, 100]:
71+
it = rr.iterator(lag=lag, chunk=30, stride=stride, return_trajindex=False)
72+
chunks = sum(1 for _ in it)
73+
np.testing.assert_equal(it.n_chunks, chunks,
74+
err_msg="Expected number of chunks did not agree with what the iterator "
75+
"returned for stride=%s, lag=%s" % (stride, lag))
76+
assert chunks == it.n_chunks
77+
6578
def _count_chunks(self, it):
6679
with it:
6780
it.reset()

0 commit comments

Comments
 (0)