diff --git a/setup.py b/setup.py index 45d9ce1..efb38ef 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ setup( name="skbel", - version="1.0.4", + version="1.0.5", packages=my_pckg, include_package_data=True, url="https://github.com/robinthibaut/skbel", diff --git a/skbel/algorithms/_statistics.py b/skbel/algorithms/_statistics.py index 49d81c4..d2652ad 100644 --- a/skbel/algorithms/_statistics.py +++ b/skbel/algorithms/_statistics.py @@ -14,7 +14,14 @@ from skbel.algorithms.extmath import get_block -__all__ = ["KDE", "kde_params", "posterior_conditional", "mvn_inference", "it_sampling", "normalize"] +__all__ = [ + "KDE", + "kde_params", + "posterior_conditional", + "mvn_inference", + "it_sampling", + "normalize", +] class KDE: @@ -25,14 +32,14 @@ class KDE: """ def __init__( - self, - *, - bw_method: str = None, - bw_adjust: float = 1, - gridsize: int = 200, - cut: float = 3, - clip: list = None, - cumulative: bool = False, + self, + *, + bw_method: str = None, + bw_adjust: float = 1, + gridsize: int = 200, + cut: float = 3, + clip: list = None, + cumulative: bool = False, ): """Initialize the estimator with its parameters. @@ -70,7 +77,7 @@ def __init__( @staticmethod def _define_support_grid( - x: np.array, bw: float, cut: float, clip: list, gridsize: int + x: np.array, bw: float, cut: float, clip: list, gridsize: int ): """Create the grid of evaluation points depending for vector x.""" clip_lo = -np.inf if clip[0] is None else clip[0] @@ -101,11 +108,11 @@ def _define_support_bivariate(self, x1: np.array, x2: np.array, weights: np.arra return grid1, grid2 def define_support( - self, - x1: np.array, - x2: np.array = None, - weights: np.array = None, - cache: bool = True, + self, + x1: np.array, + x2: np.array = None, + weights: np.array = None, + cache: bool = True, ): """Create the evaluation grid for a given data set.""" if x2 is None: @@ -176,8 +183,8 @@ def __call__(self, x1, x2=None, weights=None): def _univariate_density( - data_variable: pd.DataFrame, - estimate_kws: dict, + data_variable: pd.DataFrame, + estimate_kws: dict, ): # Initialize the estimator object estimator = KDE(**estimate_kws) @@ -200,8 +207,8 @@ def _univariate_density( def _bivariate_density( - data: pd.DataFrame, - estimate_kws: dict, + data: pd.DataFrame, + estimate_kws: dict, ): """ Estimate bivariate KDE @@ -237,15 +244,15 @@ def _bivariate_density( def kde_params( - x: np.array = None, - y: np.array = None, - bw: float = None, - gridsize: int = 200, - cut: float = 3, - clip=None, - cumulative: bool = False, - bw_method: str = "scott", - bw_adjust: int = 1, + x: np.array = None, + y: np.array = None, + bw: float = None, + gridsize: int = 200, + cut: float = 3, + clip=None, + cumulative: bool = False, + bw_method: str = "scott", + bw_adjust: int = 1, ): """ Obtain density and support (grid) of the bivariate KDE @@ -314,11 +321,11 @@ def _pixel_coordinate(line: list, x_1d: np.array, y_1d: np.array): def _conditional_distribution( - kde_array: np.array, - x_array: np.array, - y_array: np.array, - x: float = None, - y: float = None, + kde_array: np.array, + x_array: np.array, + y_array: np.array, + x: float = None, + y: float = None, ): """ Compute the conditional posterior distribution p(x_array|y_array) given x or y. @@ -368,7 +375,7 @@ def _normalize_distribution(post: np.array, support: np.array): def posterior_conditional( - X: np.array, Y: np.array, X_obs: float = None, Y_obs: float = None + X: np.array, Y: np.array, X_obs: float = None, Y_obs: float = None ): """ Computes the posterior distribution p(y|x_obs) or p(x|y_obs) by doing a cross section of the KDE of (d, h). @@ -412,7 +419,7 @@ def posterior_conditional( def mvn_inference( - X: np.array, Y: np.array, X_obs: np.array, **kwargs + X: np.array, Y: np.array, X_obs: np.array, **kwargs ) -> (np.array, np.array): """ Estimating posterior mean and covariance of the target. @@ -458,7 +465,7 @@ def mvn_inference( x_ls_predicted = np.matmul(Y, g.T) x_modeling_mean_error = np.mean(X - x_ls_predicted, axis=0) # (n_comp_CCA, 1) x_modeling_error = ( - X - x_ls_predicted - np.tile(x_modeling_mean_error, (n_training, 1)) + X - x_ls_predicted - np.tile(x_modeling_mean_error, (n_training, 1)) ) # (n_comp_CCA, n_training) @@ -482,7 +489,7 @@ def mvn_inference( y_posterior_covariance = np.linalg.pinv(d11) # (n_comp_CCA, n_comp_CCA) # Computing the posterior mean is simply a linear operation, given precomputed posterior covariance. y_posterior_mean = y_posterior_covariance @ ( - d11 @ y_mean - d12 @ (X_obs[0] - x_modeling_mean_error - y_mean @ g.T) + d11 @ y_mean - d12 @ (X_obs[0] - x_modeling_mean_error - y_mean @ g.T) ) # (n_comp_CCA,) return y_posterior_mean, y_posterior_covariance @@ -520,10 +527,10 @@ def normalize(pdf, lower_bd=-np.inf, upper_bd=np.inf, vectorize=False): between lower_bd and upper_bd is close to 1. Maps nicely over iterables. """ if lower_bd >= upper_bd: - raise ValueError('Lower bound must be less than upper bound.') + raise ValueError("Lower bound must be less than upper bound.") quadrature = integrate.quad(pdf, lower_bd, upper_bd, full_output=1) if not _convergent(quadrature): - raise ValueError('PDF integral likely divergent.') + raise ValueError("PDF integral likely divergent.") A = quadrature[0] def pdf_normed(x): @@ -533,6 +540,7 @@ def pdf_normed(x): return 0 if vectorize: + def pdf_vectorized(x): try: return pdf_normed(x) @@ -569,7 +577,7 @@ def get_cdf(pdf, lower_bd=-np.inf, upper_bd=np.inf): pdf_norm = normalize(pdf, lower_bd, upper_bd) def cdf_number(x): - "Numerical cdf""" + "Numerical cdf" "" if x < lower_bd: return 0.0 elif x > upper_bd: @@ -589,8 +597,13 @@ def cdf_vector(x): def _chebnodes(a, b, n): """Chebyshev nodes of rank n on integral [a,b].""" if not a < b: - raise ValueError('Lower bound must be less than upper bound.') - return np.array([1 / 2 * ((a + b) + (b - a) * np.cos((2 * k - 1) * np.pi / (2 * n))) for k in range(1, n + 1)]) + raise ValueError("Lower bound must be less than upper bound.") + return np.array( + [ + 1 / 2 * ((a + b) + (b - a) * np.cos((2 * k - 1) * np.pi / (2 * n))) + for k in range(1, n + 1) + ] + ) def adaptive_chebfit(pdf, lower_bd, upper_bd, eps=10 ** (-15)): @@ -665,9 +678,9 @@ def chebcdf(pdf, lower_bd, upper_bd, eps=10 ** (-15)): as an argument, and returns an iterable of floats of the same length. """ if not (np.isfinite(lower_bd) and np.isfinite(upper_bd)): - raise ValueError('Bounds must be finite.') + raise ValueError("Bounds must be finite.") if not lower_bd < upper_bd: - raise ValueError('Lower bound must be less than upper bound.') + raise ValueError("Lower bound must be less than upper bound.") x, coeffs = adaptive_chebfit(pdf, lower_bd, upper_bd, eps) int_coeffs = chebint(coeffs) @@ -681,11 +694,7 @@ def cdf(x_): return cdf -def it_sampling(pdf, - num_samples, - lower_bd=-np.inf, - upper_bd=np.inf, - chebyshev=False): +def it_sampling(pdf, num_samples, lower_bd=-np.inf, upper_bd=np.inf, chebyshev=False): """Sample from an arbitrary, unnormalized PDF. Copyright (c) 2017 Peter Wills @@ -729,11 +738,15 @@ def it_sampling(pdf, if chebyshev: if not (np.isfinite(lower_bd) and np.isfinite(upper_bd)): - raise ValueError('Bounds must be finite for Chebyshev approximation of CDF.') + raise ValueError( + "Bounds must be finite for Chebyshev approximation of CDF." + ) cdf = chebcdf(pdf, lower_bd, upper_bd) cdf_y = cdf(np.linspace(lower_bd, upper_bd, 200)) - inverse_cdf = interpolate.interp1d(cdf_y, pdf.x, kind="linear", fill_value="extrapolate") + inverse_cdf = interpolate.interp1d( + cdf_y, pdf.x, kind="linear", fill_value="extrapolate" + ) simple_samples = inverse_cdf(seeds) # Compute empirical mean and standard deviation @@ -749,6 +762,7 @@ def it_sampling(pdf, guess = np.mean(simple_samples) for seed in seeds: + def shifted(x): return cdf(x) - seed @@ -759,7 +773,8 @@ def shifted(x): else: cdf = get_cdf(pdf, lower_bd, upper_bd) cdf_y = cdf(np.linspace(lower_bd, upper_bd, 200)) - inverse_cdf = interpolate.interp1d(cdf_y, pdf.x, kind="linear", fill_value="extrapolate") + inverse_cdf = interpolate.interp1d( + cdf_y, pdf.x, kind="linear", fill_value="extrapolate" + ) simple_samples = inverse_cdf(seeds) return simple_samples - diff --git a/skbel/examples/demo_kde_cheby.py b/skbel/examples/demo_kde_cheby.py index e1bc833..97baad0 100644 --- a/skbel/examples/demo_kde_cheby.py +++ b/skbel/examples/demo_kde_cheby.py @@ -132,7 +132,9 @@ def bel_training( model = init_bel() # Set model parameters - model.mode = "kde_chebyshev" # How to compute the posterior conditional distribution + model.mode = ( + "kde_chebyshev" # How to compute the posterior conditional distribution + ) # Set PC cut model.X_n_pc = 50 model.Y_n_pc = 30 diff --git a/skbel/learning/bel.py b/skbel/learning/bel.py index 5e46f48..7bfa68f 100644 --- a/skbel/learning/bel.py +++ b/skbel/learning/bel.py @@ -68,19 +68,19 @@ class BEL(TransformerMixin, MultiOutputMixin, BaseEstimator): """ def __init__( - self, - mode: str = "mvn", - copy: bool = True, - *, - X_pre_processing=None, - Y_pre_processing=None, - X_post_processing=None, - Y_post_processing=None, - cca=None, - x_pc=None, - y_pc=None, - x_dim=None, - y_dim=None, + self, + mode: str = "mvn", + copy: bool = True, + *, + X_pre_processing=None, + Y_pre_processing=None, + X_post_processing=None, + Y_post_processing=None, + cca=None, + x_pc=None, + y_pc=None, + x_dim=None, + y_dim=None, ): """ :param mode: How to infer the posterior distribution. "mvn" (default) or "kde" @@ -329,24 +329,26 @@ def random_sample(self, n_posts: int = None, mode: str = None) -> np.array: if self.mode == "kde": Y_samples = np.zeros((self._n_posts, self.pdf.shape[0])) for i, pdf in enumerate(self.pdf): - uniform_samples = it_sampling(pdf=pdf, - num_samples=self._n_posts, - lower_bd=pdf.x.min(), - upper_bd=pdf.x.max(), - chebyshev=False, - ) + uniform_samples = it_sampling( + pdf=pdf, + num_samples=self._n_posts, + lower_bd=pdf.x.min(), + upper_bd=pdf.x.max(), + chebyshev=False, + ) Y_samples[:, i] = uniform_samples if self.mode == "kde_chebyshev": Y_samples = np.zeros((self._n_posts, self.pdf.shape[0])) for i, pdf in enumerate(self.pdf): - uniform_samples = it_sampling(pdf=pdf, - num_samples=self._n_posts, - lower_bd=pdf.x.min(), - upper_bd=pdf.x.max(), - chebyshev=True, - ) + uniform_samples = it_sampling( + pdf=pdf, + num_samples=self._n_posts, + lower_bd=pdf.x.min(), + upper_bd=pdf.x.max(), + chebyshev=True, + ) Y_samples[:, i] = uniform_samples @@ -432,8 +434,8 @@ def predict(self, X_obs, mode: str = None) -> (np.array, np.array): return self.pdf def inverse_transform( - self, - Y_pred, + self, + Y_pred, ) -> np.array: """ Back-transforms the sampled gaussian distributed posterior Y to their physical space. @@ -447,15 +449,15 @@ def inverse_transform( Y_pred ) # Posterior CCA scores y_post = ( - np.matmul(y_post, self.cca.y_loadings_.T) * self.cca.y_std_ - + self.cca.y_mean_ + np.matmul(y_post, self.cca.y_loadings_.T) * self.cca.y_std_ + + self.cca.y_mean_ ) # Posterior PC scores # Back transform PC scores nc = self.Y_pc.shape[0] # Number of components dummy = np.zeros((self.n_posts, nc)) # Create a dummy matrix filled with zeros dummy[ - :, : y_post.shape[1] + :, : y_post.shape[1] ] = y_post # Fill the dummy matrix with the posterior PC y_post = self.Y_pre_processing.inverse_transform(dummy) # Inverse transform