Skip to content
This repository has been archived by the owner on Sep 11, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1030 from marscher/its_fix_lag_setter_avoid_recom…
Browse files Browse the repository at this point in the history
…putation

[ITS] fix lag setter and avoid recomputation
  • Loading branch information
franknoe authored Feb 2, 2017
2 parents 4aa5201 + 497b889 commit 56a100b
Show file tree
Hide file tree
Showing 11 changed files with 374 additions and 199 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ sudo: false
env:
global:
- PATH=$HOME/miniconda/bin:$PATH
- common_py_deps="conda-build=2.0.1"
- common_py_deps="conda-build=2.1.2"
- PACKAGENAME=pyemma
- ORGNAME=omnia
- PYTHONHASHSEED=0
- OMP_NUM_THREADS=2
- MACOSX_DEPLOYMENT_TARGET=10.9
matrix:
- CONDA_PY=2.7 CONDA_NPY=1.11
- CONDA_PY=3.4 CONDA_NPY=1.10
Expand Down
4 changes: 2 additions & 2 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ environment:
# /E:ON and /V:ON options are not enabled in the batch script intepreter
# See: http://stackoverflow.com/a/13751649/163740
CMD_IN_ENV: "cmd /E:ON /V:ON /C .\\devtools\\ci\\appveyor\\run_with_env.cmd"
CONDA_NPY: "110"
CONDA_NPY: "111"
PYTHONHASHSEED: "0"

matrix:
Expand All @@ -29,7 +29,7 @@ install:
- conda config --set always_yes true
- conda config --add channels omnia
- conda config --add channels conda-forge
- conda install -q conda-build=2.0.1 jinja2
- conda install -q conda-build=2.1.2 jinja2

# use agg backend in matplotlib to avoid gui popup, which can not be closed.
- echo %userprofile%
Expand Down
22 changes: 13 additions & 9 deletions pyemma/_base/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def _estimate_param_scan_worker(estimator, params, X, evaluate, evaluate_args,
failfast):
""" Method that runs estimation for several parameter settings.
Defined as a worker for Parallelization
Defined as a worker for parallelization
"""
# run estimation
Expand Down Expand Up @@ -157,7 +157,7 @@ def _estimate_param_scan_worker(estimator, params, X, evaluate, evaluate_args,
values = [] # the function values the model
for ieval, name in enumerate(evaluate):
# get method/attribute name and arguments to be evaluated
name = evaluate[ieval]
#name = evaluate[ieval]
args = ()
if evaluate_args is not None:
args = evaluate_args[ieval]
Expand Down Expand Up @@ -248,7 +248,7 @@ def estimate_param_scan(estimator, X, param_sets, evaluate=None, evaluate_args=N
>>> param_sets=param_grid({'lag': [1,2,3]})
>>>
>>> estimate_param_scan(MaximumLikelihoodMSM, dtraj, param_sets, evaluate='timescales')
[array([ 1.24113168, 0.77454377]), array([ 2.48226337, 1.54908754]), array([ 3.72339505, 2.32363131])]
[array([ 1.24113168, 0.77454377]), array([ 2.65266698, 1.42909842]), array([ 5.34810405, 1.14784446])]
Now we also want to get samples of the timescales using the BayesianMSM.
>>> estimate_param_scan(MaximumLikelihoodMSM, dtraj, param_sets, failfast=False,
Expand All @@ -268,9 +268,14 @@ def estimate_param_scan(estimator, X, param_sets, evaluate=None, evaluate_args=N
estimator = get_estimator(estimator)
if hasattr(estimator, 'show_progress'):
estimator.show_progress = show_progress

# if we want to return estimators, make clones. Otherwise just copy references.
# For parallel processing we always need clones
if return_estimators or n_jobs > 1 or n_jobs is None:
# For parallel processing we always need clones.
# Also if the Estimator is its own Model, we have to clone.
from pyemma._base.model import Model
if (return_estimators or
n_jobs > 1 or n_jobs is None or
isinstance(estimator, Model)):
estimators = [clone_estimator(estimator) for _ in param_sets]
else:
estimators = [estimator for _ in param_sets]
Expand All @@ -297,8 +302,7 @@ def estimate_param_scan(estimator, X, param_sets, evaluate=None, evaluate_args=N
param_set, X,
evaluate,
evaluate_args,
failfast,
)
failfast)
for estimator, param_set in zip(estimators, param_sets))

from .parallel import _init_pool
Expand All @@ -310,8 +314,8 @@ def estimate_param_scan(estimator, X, param_sets, evaluate=None, evaluate_args=N
# if n_jobs=1 don't invoke the pool, but directly dispatch the iterator
else:
res = []
for i, param in enumerate(param_sets):
res.append(_estimate_param_scan_worker(estimators[i], param, X,
for estimator, param_set in zip(estimators, param_sets):
res.append(_estimate_param_scan_worker(estimator, param_set, X,
evaluate, evaluate_args, failfast))
if progress_reporter is not None and show_progress:
progress_reporter._progress_update(1, stage=0)
Expand Down
56 changes: 56 additions & 0 deletions pyemma/_base/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,59 @@ def __call__(self, *args, **kw):
joblib.parallel.BatchCompletionCallBack = CallBack
else:
joblib.parallel.CallBack = CallBack


class NJobsMixIn(object):
# mixin for sklearn-like estimators (estimation/ctor parameter has to contain n_jobs).

@property
def n_jobs(self):
""" Returns number of jobs/threads to use during assignment of data.
Returns
-------
If None it will return number of processors /or cores or the setting of 'OMP_NUM_THREADS' env variable.
Notes
-----
By setting the environment variable 'OMP_NUM_THREADS' to an integer,
one will override the default argument of n_jobs (currently None).
"""
assert isinstance(self._n_jobs, int)
return self._n_jobs

@n_jobs.setter
def n_jobs(self, val):
""" set number of jobs/threads to use via assignment of data.
Parameters
----------
val: int or None
a positive int for the number of jobs. Or None to usage all available resources.
If set to None, this will use all available CPUs or respect the environment variable "OMP_NUM_THREADS"
to obtain a job number.
"""
if val is None:
import psutil
import os
# TODO: aint it better to use a distinct variable for this use case eg. PYEMMA_NJOBS in order to avoid multiplying OMP threads with njobs?
omp_threads_from_env = os.getenv('OMP_NUM_THREADS', None)
n_cpus = psutil.cpu_count()
if omp_threads_from_env:
try:
self._n_jobs = int(omp_threads_from_env)
if hasattr(self, 'logger'):
self.logger.info("number of threads obtained from env variable"
" 'OMP_NUM_THREADS'=%s" % omp_threads_from_env)
except ValueError as ve:
if hasattr(self, 'logger'):
self.logger.warning("could not parse env variable 'OMP_NUM_THREADS'."
" Value='{}'. Error={}. Will use {} jobs."
.format(omp_threads_from_env, ve, n_cpus))
self._n_jobs = n_cpus
else:
self._n_jobs = n_cpus
else:
self._n_jobs = int(val)
43 changes: 29 additions & 14 deletions pyemma/_ext/sklearn/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,28 @@
from pyemma.util.reflection import getargspec_no_self

###############################################################################
def _first_and_last_element(arr):
"""Returns first and last element of numpy array or sparse matrix."""
if isinstance(arr, np.ndarray) or hasattr(arr, 'data'):
# numpy array or sparse matrix with .data attribute
data = arr.data if sparse.issparse(arr) else arr
return data.flat[0], data.flat[-1]
else:
# Sparse matrices without .data attribute. Only dok_matrix at
# the time of writing, in this case indexing is fast
return arr[0, 0], arr[-1, -1]


def clone(estimator, safe=True):
"""Constructs a new estimator with the same parameters.
Clone does a deep copy of the model in an estimator
without actually copying attached data. It yields a new estimator
with the same parameters that has not been fit on any data.
Parameters
----------
estimator: estimator object, or list, tuple or set of objects
estimator : estimator object, or list, tuple or set of objects
The estimator or group of estimators to be cloned
safe: boolean, optional
safe : boolean, optional
If safe is false, clone will fall back to a deepcopy on objects
that are not estimators.
"""
Expand All @@ -62,6 +74,9 @@ def clone(estimator, safe=True):
for name in new_object_params:
param1 = new_object_params[name]
param2 = params_set[name]
if param1 is param2:
# this should always happen
continue
if isinstance(param1, np.ndarray):
# For most ndarrays, we do not test for complete equality
if not isinstance(param2, type(param1)):
Expand All @@ -74,9 +89,8 @@ def clone(estimator, safe=True):
equality_test = (
param1.shape == param2.shape
and param1.dtype == param2.dtype
# We have to use '.flat' for 2D arrays
and param1.flat[0] == param2.flat[0]
and param1.flat[-1] == param2.flat[-1]
and (_first_and_last_element(param1) ==
_first_and_last_element(param2))
)
else:
equality_test = np.all(param1 == param2)
Expand All @@ -93,19 +107,20 @@ def clone(estimator, safe=True):
else:
equality_test = (
param1.__class__ == param2.__class__
and param1.data[0] == param2.data[0]
and param1.data[-1] == param2.data[-1]
and (_first_and_last_element(param1) ==
_first_and_last_element(param2))
and param1.nnz == param2.nnz
and param1.shape == param2.shape
)
else:
new_obj_val = new_object_params[name]
params_set_val = params_set[name]
# The following construct is required to check equality on special
# singletons such as np.nan that are not equal to them-selves:
equality_test = (new_obj_val == params_set_val or
new_obj_val is params_set_val)
if not equality_test:
# fall back on standard equality
equality_test = param1 == param2
if equality_test:
warnings.warn("Estimator %s modifies parameters in __init__."
" This behavior is deprecated as of 0.18 and "
"support for this behavior will be removed in 0.20."
% type(estimator).__name__, DeprecationWarning)
else:
raise RuntimeError('Cannot clone object %s, as the constructor '
'does not seem to set parameter %s' %
(estimator, name))
Expand Down
54 changes: 2 additions & 52 deletions pyemma/coordinates/clustering/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import numpy as np
from pyemma._base.model import Model
from pyemma._base.parallel import NJobsMixIn
from pyemma._ext.sklearn.base import ClusterMixin
from pyemma.coordinates.clustering import regspatial
from pyemma.coordinates.data._base.transformer import StreamingEstimationTransformer
Expand All @@ -39,7 +40,7 @@

@fix_docs
@aliased
class AbstractClustering(StreamingEstimationTransformer, Model, ClusterMixin):
class AbstractClustering(StreamingEstimationTransformer, Model, ClusterMixin, NJobsMixIn):

"""
provides a common interface for cluster algorithms.
Expand All @@ -64,57 +65,6 @@ def __init__(self, metric='euclidean', n_jobs=None):
self._index_states = []
self.n_jobs = n_jobs

@property
def n_jobs(self):
""" Returns number of jobs/threads to use during assignment of data.
Returns
-------
If None it will return number of processors /or cores or the setting of 'OMP_NUM_THREADS' env variable.
Notes
-----
By setting the environment variable 'OMP_NUM_THREADS' to an integer,
one will override the default argument of n_jobs (currently None).
"""
assert isinstance(self._n_jobs, int)
return self._n_jobs

@n_jobs.setter
def n_jobs(self, val):
""" set number of jobs/threads to use via assignment of data.
Parameters
----------
val: int or None
a positive int for the number of jobs. Or None to usage all available resources.
Notes
-----
"""
from pyemma.util.reflection import get_default_args
def_args = get_default_args(self.__init__)

# default value from constructor?
if val == def_args['n_jobs']:
omp_threads_from_env = os.getenv('OMP_NUM_THREADS', None)
import psutil
n_cpus = psutil.cpu_count()
if omp_threads_from_env:
try:
self._n_jobs = int(omp_threads_from_env)
self.logger.info("number of threads obtained from env variable"
" 'OMP_NUM_THREADS'=%s" % omp_threads_from_env)
except ValueError as ve:
self.logger.warning("could not parse env variable 'OMP_NUM_THREADS'."
" Value='{}'. Error={}. Will use {} jobs."
.format(omp_threads_from_env, ve, n_cpus))
self._n_jobs = n_cpus
else:
self._n_jobs = n_cpus
else:
self._n_jobs = int(val)

@property
@alias('labels_') # sk-learn compat.
def clustercenters(self):
Expand Down
20 changes: 1 addition & 19 deletions pyemma/coordinates/data/featurization/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
'''
from pyemma.util.indices import (combinations,
product)
from pyemma.util.numeric import _hash_numpy_array
from pyemma.util.types import is_iterable_of_int, is_string
from six import PY3

Expand Down Expand Up @@ -56,25 +57,6 @@ def _catch_unhashable(x):

return x

if PY3:
def _hash_numpy_array(x):
hash_value = hash(x.shape)
hash_value ^= hash(x.strides)
hash_value ^= hash(x.data.tobytes())
return hash_value
else:
def _hash_numpy_array(x):
writeable = x.flags.writeable
try:
x.flags.writeable = False
hash_value = hash(x.shape)
hash_value ^= hash(x.strides)
hash_value ^= hash(x.data)
finally:
x.flags.writeable = writeable
return hash_value


def hash_top(top):
if not PY3:
return hash(top)
Expand Down
Loading

0 comments on commit 56a100b

Please sign in to comment.