Skip to content
This repository has been archived by the owner on Sep 11, 2023. It is now read-only.

Commit

Permalink
Merge pull request #515 from marscher/fix_joblib_callback_problem_pro…
Browse files Browse the repository at this point in the history
…gressbars

Fix joblib callback problem progressbars
  • Loading branch information
marscher committed Aug 28, 2015
2 parents e1b946c + 86686f0 commit 088dbf7
Show file tree
Hide file tree
Showing 10 changed files with 251 additions and 166 deletions.
49 changes: 33 additions & 16 deletions pyemma/_base/estimator.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from __future__ import absolute_import
from __future__ import absolute_import, print_function

from six.moves import range
import inspect
import joblib

from pyemma._ext.sklearn.base import BaseEstimator as _BaseEstimator
from pyemma._ext.sklearn.parameter_search import ParameterGrid
Expand Down Expand Up @@ -221,23 +220,41 @@ def estimate_param_scan(estimator, X, param_sets, evaluate=None, evaluate_args=N
# set call back for joblib
if progress_reporter is not None:
progress_reporter._progress_register(len(estimators), stage=0,
description="estimating")
description="estimating %s" % str(estimator.__class__.__name__))

if n_jobs > 1:
class CallBack(object):
def __init__(self, index, parallel):
self.index = index
self.parallel = parallel
self.reporter = progress_reporter

def __call__(self, index):
if self.reporter is not None:
self.reporter._progress_update(1, stage=0)
if self.parallel._original_iterable:
self.parallel.dispatch_next()
import joblib.parallel
joblib.parallel.CallBack = CallBack
else:
def _print(msg, msg_args):
# NOTE: this is a ugly hack, because if we only use one job,
# we do not get the joblib callback interface, as a workaround
# we use the Parallel._print function, which is called with
# msg_args = (done_jobs, total_jobs)
if len(msg_args) == 2:
progress_reporter._progress_update(1, stage=0)

class CallBack(object):
def __init__(self, index, parallel):
self.index = index
self.parallel = parallel
# iterate over parameter settings
from joblib import Parallel
import joblib
pool = Parallel(n_jobs=n_jobs)

def __call__(self, index):
if progress_reporter is not None:
progress_reporter._progress_update(1, stage=0)
if self.parallel._original_iterable:
self.parallel.dispatch_next()
import joblib.parallel
joblib.parallel.CallBack = CallBack
if progress_reporter is not None and n_jobs == 1:
pool._print = _print
# NOTE: verbose has to be set, otherwise our print hack does not work.
pool.verbose = 50

# iterate over parameter settings
pool = joblib.Parallel(n_jobs=n_jobs)
task_iter = (joblib.delayed(_estimate_param_scan_worker)(estimators[i],
param_sets[i], X,
evaluate,
Expand Down
39 changes: 19 additions & 20 deletions pyemma/_base/progress/reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
@author: marscher
'''
from __future__ import absolute_import
from __future__ import absolute_import, print_function
from pyemma.util.types import is_int
from pyemma._base.progress.bar import ProgressBar as _ProgressBar
from pyemma._base.progress.bar import show_progressbar as _show_progressbar
Expand All @@ -18,17 +18,6 @@ class ProgressReporter(object):
# Note: this class has intentionally no constructor, because it is more
# comfortable for the user of this class (who is then not in the need to call it).

@property
def progress_silence(self):
""" If set to True, no progress will be reported. Defaults to False."""
if not hasattr(self, '_prog_rep_silence'):
self._prog_rep_silence = False
return self._prog_rep_silence

@progress_silence.setter
def progress_silence(self, value):
setattr(self, '_prog_rep_silence', value)

def _progress_register(self, amount_of_work, description=None, stage=0):
""" Registers a progress which can be reported/displayed via a progress bar.
Expand All @@ -43,7 +32,7 @@ def _progress_register(self, amount_of_work, description=None, stage=0):
in the first pass over the data, calculate covariances in the second),
one needs to estimate different times of arrival.
"""
if hasattr(self, '_prog_rep_silence') and self._prog_rep_silence:
if hasattr(self, 'show_progress') and not self.show_progress:
return

# note this semantic makes it possible to use this class without calling
Expand All @@ -52,15 +41,19 @@ def _progress_register(self, amount_of_work, description=None, stage=0):
self._prog_rep_progressbars = {}

if not is_int(amount_of_work):
raise ValueError("amount_of_work has to be of integer type. But is "
+ str(type(amount_of_work)))
raise ValueError("amount_of_work has to be of integer type. But is %s"
% type(amount_of_work))

# if stage in self._prog_rep_progressbars:
# import warnings
# warnings.warn("overriding progress for stage " + str(stage))
self._prog_rep_progressbars[stage] = _ProgressBar(
amount_of_work, description=description)

# def _progress_set_description(self, stage, description):
# """ set description of an already existing progress """
# assert hasattr(self, '_prog_rep_progressbars')
# assert stage in self._prog_rep_progressbars
#
# self._prog_rep_progressbars[stage].description = description

def register_progress_callback(self, call_back, stage=0):
""" Registers the progress reporter.
Expand All @@ -76,8 +69,9 @@ def register_progress_callback(self, call_back, stage=0):
stage: int, optional, default=0
The stage you want the given call back function to be fired.
"""
if hasattr(self, '_prog_rep_silence') and self._prog_rep_silence:
if hasattr(self, 'show_progress') and not self.show_progress:
return

if not hasattr(self, '_callbacks'):
self._prog_rep_callbacks = {}

Expand All @@ -104,7 +98,7 @@ def _progress_update(self, numerator_increment, stage=0):
Current stage of the algorithm, 0 or greater
"""
if hasattr(self, '_prog_rep_silence') and self._prog_rep_silence:
if hasattr(self, 'show_progress') and not self.show_progress:
return

if stage not in self._prog_rep_progressbars:
Expand All @@ -121,6 +115,11 @@ def _progress_update(self, numerator_increment, stage=0):

def _progress_force_finish(self, stage=0):
""" forcefully finish the progress for given stage """
if hasattr(self, 'show_progress') and not self.show_progress:
return
if stage not in self._prog_rep_progressbars:
raise RuntimeError(
"call _progress_register(amount_of_work, stage=x) on this instance first!")
pg = self._prog_rep_progressbars[stage]
pg.numerator = pg.denominator
pg._eta.eta_epoch = 0
Expand Down
50 changes: 34 additions & 16 deletions pyemma/msm/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@

@shortcut('its')
def timescales_msm(dtrajs, lags=None, nits=None, reversible=True, connected=True,
errors=None, nsamples=50, n_jobs=1):
errors=None, nsamples=50, n_jobs=1, show_progress=True):
# format data
r""" Calculate implied timescales from Markov state models estimated at a series of lag times.
Expand Down Expand Up @@ -177,12 +177,14 @@ def timescales_msm(dtrajs, lags=None, nits=None, reversible=True, connected=True
if errors is None:
estimator = _ML_MSM(reversible=reversible, connectivity=connectivity)
elif errors == 'bayes':
estimator = _Bayes_MSM(reversible=reversible, connectivity=connectivity, nsamples=nsamples)
estimator = _Bayes_MSM(reversible=reversible, connectivity=connectivity,
nsamples=nsamples, show_progress=show_progress)
else:
raise NotImplementedError('Error estimation method'+errors+'currently not implemented')

# go
itsobj = _ImpliedTimescales(estimator, lags=lags, nits=nits, n_jobs=n_jobs)
itsobj = _ImpliedTimescales(estimator, lags=lags, nits=nits, n_jobs=n_jobs,
show_progress=show_progress)
itsobj.estimate(dtrajs)
return itsobj

Expand Down Expand Up @@ -480,7 +482,8 @@ def estimate_markov_model(dtrajs, lag, reversible=True, sparse=False, connectivi


def timescales_hmsm(dtrajs, nstates, lags=None, nits=None, reversible=True,
connected=True, errors=None, nsamples=100, n_jobs=1):
connected=True, errors=None, nsamples=100, n_jobs=1,
show_progress=True):
r""" Calculate implied timescales from Hidden Markov state models estimated at a series of lag times.
Warning: this can be slow!
Expand Down Expand Up @@ -521,6 +524,9 @@ def timescales_hmsm(dtrajs, nstates, lags=None, nits=None, reversible=True,
n_jobs = 1 : int
how many subprocesses to start to estimate the models for each lag time.
show_progress : bool, default=True
Show progressbars for calculation?
Returns
-------
itsobj : :class:`ImpliedTimescales <pyemma.msm.ImpliedTimescales>` object
Expand Down Expand Up @@ -579,15 +585,19 @@ def timescales_hmsm(dtrajs, nstates, lags=None, nits=None, reversible=True,
if errors is None:
estimator = _ML_HMSM(nstates=nstates, reversible=reversible, connectivity=connectivity)
elif errors == 'bayes':
estimator = _Bayes_HMSM(nstates=nstates, reversible=reversible, connectivity=connectivity)
estimator = _Bayes_HMSM(nstates=nstates, reversible=reversible,
connectivity=connectivity,
show_progress=show_progress)
else:
raise NotImplementedError('Error estimation method'+errors+'currently not implemented')
raise NotImplementedError('Error estimation method'+str(errors)+'currently not implemented')

# go
itsobj = _ImpliedTimescales(estimator, lags=lags, nits=nits, n_jobs=n_jobs)
itsobj = _ImpliedTimescales(estimator, lags=lags, nits=nits, n_jobs=n_jobs,
show_progress=show_progress)
itsobj.estimate(dtrajs)
return itsobj


def estimate_hidden_markov_model(dtrajs, nstates, lag, reversible=True, connectivity='largest', observe_active=True,
dt_traj='1 step', accuracy=1e-3, maxit=1000):
r""" Estimates a Hidden Markov state model from discrete trajectories
Expand Down Expand Up @@ -761,7 +771,7 @@ def estimate_hidden_markov_model(dtrajs, nstates, lag, reversible=True, connecti


def bayesian_markov_model(dtrajs, lag, reversible=True, sparse=False, connectivity='largest',
nsamples=100, conf=0.95, dt_traj='1 step'):
nsamples=100, conf=0.95, dt_traj='1 step', show_progress=True):
r""" Bayesian Markov model estimate using Gibbs sampling of the posterior
Returns a :class:`SampledMSM <pyemma.msm.models.SampledMSM>` that contains the
Expand Down Expand Up @@ -829,6 +839,9 @@ def bayesian_markov_model(dtrajs, lag, reversible=True, sparse=False, connectivi
| 'ms', 'millisecond*'
| 's', 'second*'
show_progress : bool, default=True
Show progressbars for calculation?
Returns
-------
An :class:`SampledMSM <pyemma.msm.model.SampledMSM>` object containing a
Expand Down Expand Up @@ -919,12 +932,13 @@ def bayesian_markov_model(dtrajs, lag, reversible=True, sparse=False, connectivi
"""
# TODO: store_data=True
bmsm_estimator = _Bayes_MSM(lag=lag, reversible=reversible, sparse=sparse, connectivity=connectivity,
dt_traj=dt_traj, nsamples=nsamples, conf=conf)
dt_traj=dt_traj, nsamples=nsamples, conf=conf, show_progress=show_progress)
return bmsm_estimator.estimate(dtrajs)


def bayesian_hidden_markov_model(dtrajs, nstates, lag, nsamples=100, reversible=True, connectivity='largest',
observe_active=True, conf=0.95, dt_traj='1 step'):
observe_active=True, conf=0.95, dt_traj='1 step',
show_progress=True):
r""" Bayesian Hidden Markov model estimate using Gibbs sampling of the posterior
Returns a :class:`SampledHMSM <pyemma.msm.model.SampledHMSM>` that contains
Expand Down Expand Up @@ -991,6 +1005,9 @@ def bayesian_hidden_markov_model(dtrajs, nstates, lag, nsamples=100, reversible=
| 'ms', 'millisecond*'
| 's', 'second*'
show_progress : bool, default=True
Show progressbars for calculation?
Returns
-------
An :class:`SampledHMSM <pyemma.msm.model.SampledHMSM>` object containing a
Expand Down Expand Up @@ -1058,7 +1075,8 @@ def bayesian_hidden_markov_model(dtrajs, nstates, lag, nsamples=100, reversible=
"""
bhmsm_estimator = _Bayes_HMSM(lag=lag, nstates=nstates, nsamples=nsamples, reversible=reversible,
connectivity=connectivity, observe_active=observe_active, dt_traj=dt_traj, conf=conf)
connectivity=connectivity, observe_active=observe_active,
dt_traj=dt_traj, conf=conf, show_progress=show_progress)
return bhmsm_estimator.estimate(dtrajs)

# TODO: need code examples
Expand All @@ -1078,23 +1096,23 @@ def tpt(msmobj, A, B):
List of integer state labels for set A
B : array_like
List of integer state labels for set B
Returns
-------
tptobj : :class:`ReactiveFlux <pyemma.msm.flux.ReactiveFlux>` object
A python object containing the reactive A->B flux network
and several additional quantities, such as stationary probability,
committors and set definitions.
Notes
-----
The central object used in transition path theory is
the forward and backward committor function.
TPT (originally introduced in [1]_) for continuous systems has a
discrete version outlined in [2]_. Here, we use the transition
matrix formulation described in [3]_.
.. autoclass:: pyemma.msm.flux.reactive_flux.ReactiveFlux
:members:
Expand Down Expand Up @@ -1140,7 +1158,7 @@ def tpt(msmobj, A, B):
Examples
--------
"""
T = msmobj.transition_matrix
mu = msmobj.stationary_distribution
Expand Down
Loading

0 comments on commit 088dbf7

Please sign in to comment.