From d7d34aec206020c736c1aa1c76bc998c639d6455 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Henrik=20Bostr=C3=B6m?= Date: Tue, 28 Jun 2022 16:25:49 +0200 Subject: [PATCH] crepes 0.1.0 --- src/crepes/__init__.py | 9 +- src/crepes/base.py | 500 ++++++++++++++++++++++++++++++----------- src/crepes/fillings.py | 139 ++++++++++-- 3 files changed, 497 insertions(+), 151 deletions(-) diff --git a/src/crepes/__init__.py b/src/crepes/__init__.py index e4ece93..19a425a 100644 --- a/src/crepes/__init__.py +++ b/src/crepes/__init__.py @@ -1,14 +1,15 @@ """Conformal regressors and predictive systems (crepes) -Routines that implement conformal regressors and conformal predictive +Classes implementing conformal regressors and conformal predictive systems, which transform point predictions into prediction intervals -and cumulative distributions, respectively. +and cumulative distribution functions, respectively. Author: Henrik Boström (bostromh@kth.se) -Copyright 2021 Henrik Boström +Copyright 2022 Henrik Boström License: BSD 3 clause """ -from crepes.base import ConformalRegressor, ConformalPredictiveSystem +from crepes.base import ConformalRegressor, ConformalPredictiveSystem, __version__ + diff --git a/src/crepes/base.py b/src/crepes/base.py index 33c1cde..225ccbd 100644 --- a/src/crepes/base.py +++ b/src/crepes/base.py @@ -1,29 +1,30 @@ """Conformal regressors and predictive systems (crepes) -Routines that implement conformal regressors and conformal predictive +Classes implementing conformal regressors and conformal predictive systems, which transform point predictions into prediction intervals -and cumulative distributions, respectively. +and cumulative distribution functions, respectively. Author: Henrik Boström (bostromh@kth.se) -Copyright 2021 Henrik Boström +Copyright 2022 Henrik Boström License: BSD 3 clause """ -# To do: -# -# - error messages -# - commenting and documentation -# - test for uniformity of p-values (in evaluate) - import numpy as np import pandas as pd import time +__version__ = "0.1.0" class ConformalPredictor(): + """ + Conformal Predictor. + The class contains two sub-classes: ConformalRegressor + and ConformalPredictiveSystem. + + """ def __init__(self): self.alphas = None @@ -39,16 +40,18 @@ class ConformalRegressor(ConformalPredictor): """ Conformal Regressor. - A conformal regressor transforms point predictions (regression values) into - prediction intervals, for a certain confidence level. + A conformal regressor transforms point predictions (regression + values) into prediction intervals, for a certain confidence level. """ def __repr__(self): if self.fitted: - return "ConformalRegressor(fitted={}, normalized={}, mondrian={})".format(self.fitted, self.normalized, self.mondrian) + return (f"ConformalRegressor(fitted={self.fitted}, " + f"normalized={self.normalized}, " + f"mondrian={self.mondrian})") else: - return "ConformalRegressor(fitted={})".format(self.fitted) + return f"ConformalRegressor(fitted={self.fitted})" def fit(self, residuals=None, sigmas=None, bins=None): """ @@ -56,12 +59,12 @@ def fit(self, residuals=None, sigmas=None, bins=None): Parameters ---------- - residuals : array-like of shape (n_values,) - Residuals; actual - predicted - sigmas: array-like of shape (n_values,) - Sigmas; difficulty estimates - bins : array-like of shape (n_values,) - Bins; Mondrian categories + residuals : array-like of shape (n_values,), default=None + actual values - predicted values + sigmas: array-like of shape (n_values,), default=None + difficulty estimates + bins : array-like of shape (n_values,), default=None + Mondrian categories Returns ------- @@ -84,38 +87,42 @@ def fit(self, residuals=None, sigmas=None, bins=None): bin_values = np.unique(bins) if sigmas is None: self.normalized = False - self.alphas = (bin_values,[np.sort(abs_residuals[bins==b])[::-1] for b in bin_values]) + self.alphas = (bin_values,[np.sort( + abs_residuals[bins==b])[::-1] for b in bin_values]) else: self.normalized = True - self.alphas = (bin_values, [np.sort(abs_residuals[bins==b]/sigmas[bins==b])[::-1] for b in bin_values]) + self.alphas = (bin_values, [np.sort( + abs_residuals[bins==b]/sigmas[bins==b])[::-1] + for b in bin_values]) self.fitted = True toc = time.time() self.time_fit = toc-tic return self - def predict(self, y_hat=None, sigmas=None, bins=None, confidence=0.95, y_min=-np.inf, y_max=np.inf): + def predict(self, y_hat=None, sigmas=None, bins=None, confidence=0.95, + y_min=-np.inf, y_max=np.inf): """ - Predict using the conformal regressor. + Predict using conformal regressor. Parameters ---------- - y_hat : array-like of shape (n_values,) - predicted (regression) values - sigmas : array-like of shape (n_values,) - Sigmas; difficulty estimates - bins : array-like of shape (n_values,) - Bins; Mondrian categories - confidence : float in range (0,1), default = 0.95 - The confidence level. - y_min : float or int, default = -np.inf - The minimum value to include in prediction intervals. - y_max : float or int, default = np.inf - The maximum value to include in prediction intervals. + y_hat : array-like of shape (n_values,), default=None + predicted values + sigmas : array-like of shape (n_values,), default=None + difficulty estimates + bins : array-like of shape (n_values,), default=None + Mondrian categories + confidence : float in range (0,1), default=0.95 + confidence level + y_min : float or int, default=-np.inf + minimum value to include in prediction intervals + y_max : float or int, default=np.inf + maximum value to include in prediction intervals Returns ------- intervals : ndarray of shape (n_values, 2) - Prediction intervals. + prediction intervals """ tic = time.time() @@ -131,21 +138,31 @@ def predict(self, y_hat=None, sigmas=None, bins=None, confidence=0.95, y_min=-np intervals[:,0] = y_hat-alpha intervals[:,1] = y_hat+alpha else: - intervals[:,0] = -np.inf # If the no. of calibration instances is too small for the chosen confidence level, - intervals[:,1] = np.inf # then the intervals will be of maximum size + intervals[:,0] = -np.inf + intervals[:,1] = np.inf + # If the no. of calibration examples is too small for + # the chosen confidence level, then the intervals will + # be of maximum size else: bin_values, bin_alphas = self.alphas bin_indexes = [np.argwhere(bins == b).T[0] for b in bin_values] - alpha_indexes = [int((1-confidence)*(len(bin_alphas[b])+1))-1 for b in range(len(bin_values))] - bin_alpha = [bin_alphas[b][alpha_indexes[b]] if alpha_indexes[b]>=0 else np.inf for b in range(len(bin_values))] + alpha_indexes = [int((1-confidence)*(len(bin_alphas[b])+1))-1 + for b in range(len(bin_values))] + bin_alpha = [bin_alphas[b][alpha_indexes[b]] + if alpha_indexes[b]>=0 else np.inf + for b in range(len(bin_values))] if self.normalized: for b in range(len(bin_values)): - intervals[bin_indexes[b],0] = y_hat[bin_indexes[b]]-bin_alpha[b]*sigmas[bin_indexes[b]] - intervals[bin_indexes[b],1] = y_hat[bin_indexes[b]]+bin_alpha[b]*sigmas[bin_indexes[b]] + intervals[bin_indexes[b],0] = y_hat[bin_indexes[b]] \ + - bin_alpha[b]*sigmas[bin_indexes[b]] + intervals[bin_indexes[b],1] = y_hat[bin_indexes[b]] \ + + bin_alpha[b]*sigmas[bin_indexes[b]] else: for b in range(len(bin_values)): - intervals[bin_indexes[b],0] = y_hat[bin_indexes[b]]-bin_alpha[b] - intervals[bin_indexes[b],1] = y_hat[bin_indexes[b]]+bin_alpha[b] + intervals[bin_indexes[b],0] = y_hat[bin_indexes[b]] \ + - bin_alpha[b] + intervals[bin_indexes[b],1] = y_hat[bin_indexes[b]] \ + + bin_alpha[b] if y_min > -np.inf: intervals[intervals 0. + cpds : ndarray of (n_values, c_values), ndarray of (n_values,) + or list of ndarrays + conformal predictive distributions. Only returned if + return_cpds == True. If bins is None, the distributions are + represented by a single array, where the number of columns + (c_values) is determined by the number of residuals of the fitted + conformal predictive system. Otherwise, the distributions + are represented by a vector of arrays, if cpds_by_bins = False, + or a list of arrays, with one element for each bin, if + cpds_by_bins = True. + """ + tic = time.time() if not self.mondrian: if self.normalized: - cpds = np.array([y_hat[i]+sigmas[i]*self.alphas for i in range(len(y_hat))]) + cpds = np.array([y_hat[i]+sigmas[i]*self.alphas + for i in range(len(y_hat))]) else: - cpds = np.array([y_hat[i]+self.alphas for i in range(len(y_hat))]) + cpds = np.array([y_hat[i]+self.alphas + for i in range(len(y_hat))]) else: bin_values, bin_alphas = self.alphas bin_indexes = [np.argwhere(bins == b).T[0] for b in bin_values] if self.normalized: - cpds = [np.array([y_hat[i]+sigmas[i]*bin_alphas[b] for i in bin_indexes[b]]) for b in range(len(bin_values))] + cpds = [np.array([y_hat[i]+sigmas[i]*bin_alphas[b] + for i in bin_indexes[b]]) + for b in range(len(bin_values))] else: - cpds = [np.array([y_hat[i]+bin_alphas[b] for i in bin_indexes[b]]) for b in range(len(bin_values))] + cpds = [np.array([y_hat[i]+bin_alphas[b] for + i in bin_indexes[b]]) + for b in range(len(bin_values))] no_prec_result_cols = 0 - if type(lower_percentiles) == int or type(lower_percentiles) == float: + if type(lower_percentiles) in [int, float]: lower_percentiles = [lower_percentiles] - if type(higher_percentiles) == int or type(higher_percentiles) == float: + if type(higher_percentiles) in [int, float]: higher_percentiles = [higher_percentiles] - no_result_columns = (y != [])+len(lower_percentiles)+len(higher_percentiles) + if lower_percentiles is None: + lower_percentiles = [] + if higher_percentiles is None: + higher_percentiles = [] + no_result_columns = \ + (y is not None) + len(lower_percentiles) + len(higher_percentiles) if no_result_columns > 0: result = np.zeros((len(y_hat),no_result_columns)) - if len(y) > 0: + if y is not None: no_prec_result_cols += 1 gammas = np.random.rand(len(y_hat)) - if type(y) == int or type(y) == float: + if type(y) in [int, float]: if not self.mondrian: - result[:,0] = np.array([(len(np.argwhere(cpds[i] 0: if not self.mondrian: - lower_indexes = [int(lower_percentile/100*(len(self.alphas)+1))-1 for lower_percentile in lower_percentiles] - result[:,no_prec_result_cols:no_prec_result_cols+len(lower_percentiles)] = cpds[:,lower_indexes] - y_min_columns = [no_prec_result_cols+i for i in range(len(lower_indexes)) if lower_indexes[i]<0] + lower_indexes = [int(lower_percentile/100 \ + * (len(self.alphas)+1))-1 + for lower_percentile in lower_percentiles] + result[:,no_prec_result_cols:no_prec_result_cols \ + + len(lower_percentiles)] = cpds[:,lower_indexes] + y_min_columns = [no_prec_result_cols+i + for i in range(len(lower_indexes)) + if lower_indexes[i]<0] result[:,y_min_columns] = y_min else: for b in range(len(bin_values)): - lower_indexes = [int(lower_percentile/100*(len(bin_alphas[b])+1))-1 for lower_percentile in lower_percentiles] - result[bin_indexes[b],no_prec_result_cols:no_prec_result_cols+len(lower_indexes)] = cpds[b][:,lower_indexes] - y_min_columns = [no_prec_result_cols+i for i in range(len(lower_indexes)) if lower_indexes[i]<0] + lower_indexes = [int(lower_percentile/100 \ + * (len(bin_alphas[b])+1))-1 + for lower_percentile + in lower_percentiles] + result[bin_indexes[b], + no_prec_result_cols:no_prec_result_cols \ + + len(lower_indexes)] = cpds[b][:,lower_indexes] + y_min_columns = [no_prec_result_cols+i + for i in range(len(lower_indexes)) + if lower_indexes[i]<0] result[:,y_min_columns] = y_min if y_min > -np.inf: - result[:,no_prec_result_cols:no_prec_result_cols+len(lower_percentiles)][result[:,no_prec_result_cols:no_prec_result_cols+len(lower_percentiles)] 0: if not self.mondrian: - higher_indexes = np.array([int(np.ceil(higher_percentile/100*(len(self.alphas)+1)))-1 for higher_percentile in higher_percentiles], dtype=int) - too_high_indexes = np.array([i for i in range(len(higher_indexes)) if higher_indexes[i] > len(self.alphas)-1], dtype=int) + higher_indexes = np.array( + [int(np.ceil(higher_percentile/100 \ + * (len(self.alphas)+1)))-1 + for higher_percentile in higher_percentiles], + dtype=int) + too_high_indexes = np.array( + [i for i in range(len(higher_indexes)) + if higher_indexes[i] > len(self.alphas)-1], dtype=int) higher_indexes[too_high_indexes] = len(self.alphas)-1 - result[:,no_prec_result_cols:no_prec_result_cols+len(higher_indexes)] = cpds[:,higher_indexes] + result[:,no_prec_result_cols:no_prec_result_cols \ + + len(higher_indexes)] = cpds[:,higher_indexes] result[:,no_prec_result_cols+too_high_indexes] = y_max else: for b in range(len(bin_values)): - higher_indexes = [int(np.ceil(higher_percentile/100*(len(bin_alphas[b])+1)))-1 for higher_percentile in higher_percentiles] - result[bin_indexes[b],no_prec_result_cols:no_prec_result_cols+len(higher_indexes)] = cpds[b][:,higher_indexes] + higher_indexes = [ + int(np.ceil(higher_percentile/100 \ + * (len(bin_alphas[b])+1)))-1 + for higher_percentile in higher_percentiles] + result[bin_indexes[b], + no_prec_result_cols:no_prec_result_cols \ + + len(higher_indexes)] = cpds[b]\ + [:,higher_indexes] if y_max < np.inf: - result[:,no_prec_result_cols:no_prec_result_cols+len(higher_percentiles)][result[:,no_prec_result_cols:no_prec_result_cols+len(higher_percentiles)]>y_max] = y_max + result[:,no_prec_result_cols:no_prec_result_cols\ + + len(higher_percentiles)]\ + [result[:,no_prec_result_cols:no_prec_result_cols \ + + len(higher_percentiles)]>y_max] = y_max toc = time.time() self.time_predict = toc-tic if no_result_columns > 0 and return_cpds: - return result, cpds + if not self.mondrian or cpds_by_bins: + cpds_out = cpds + else: + cpds_out = np.empty(len(y_hat), dtype=object) + for b in range(len(bin_values)): + cpds_out[bin_indexes[b]] = [cpds[b][i] + for i in range(len(cpds[b]))] + return result, cpds_out elif no_result_columns > 0: return result elif return_cpds: - return cpds + if not self.mondrian or cpds_by_bins: + cpds_out = cpds + else: + cpds_out = np.empty(len(y_hat), dtype=object) + for b in range(len(bin_values)): + cpds_out[bin_indexes[b]] = [cpds[b][i] + for i in range(len(cpds[b]))] + return cpds_out + + def evaluate(self, y_hat=None, y=None, sigmas=None, bins=None, + confidence=0.95, y_min=-np.inf, y_max=np.inf, metrics=None): + """ + Evaluate conformal predictive system. + + Parameters + ---------- + y_hat : array-like of shape (n_values,), default=None, + predicted values + y : array-like of shape (n_values,), default=None, + correct target values + sigmas : array-like of shape (n_values,), default=None, + difficulty estimates + bins : array-like of shape (n_values,), default=None, + Mondrian categories + confidence : float in range (0,1), default=0.95 + confidence level + y_min : float or int, default=-np.inf + minimum value to include in prediction intervals + y_max : float or int, default=np.inf + maximum value to include in prediction intervals + metrics : a string or a list of strings, default=list of all + metrics; ["error", "efficiency", "CRPS", "time_fit", + "time_evaluate"] + + Returns + ------- + results : dictionary with a key for each selected metric + estimated performance using the metrics + """ - def evaluate(self, y_hat=None, y=None, sigmas=None, bins=None, confidence=0.95, y_min=-np.inf, y_max=np.inf, metrics=None): tic = time.time() if metrics is None: metrics = ["error","efficiency","CRPS","time_fit","time_evaluate"] @@ -344,32 +522,48 @@ def evaluate(self, y_hat=None, y=None, sigmas=None, bins=None, confidence=0.95, higher_percentile = (confidence+(1-confidence)/2)*100 test_results = {} if "CRPS" in metrics: - results, cpds = self.predict(y_hat, sigmas=sigmas, bins=bins, y=y, lower_percentiles=lower_percentile, - higher_percentiles=higher_percentile, y_min=y_min, y_max=y_max, return_cpds=True) + results, cpds = self.predict(y_hat, sigmas=sigmas, bins=bins, y=y, + lower_percentiles=lower_percentile, + higher_percentiles=higher_percentile, + y_min=y_min, y_max=y_max, + return_cpds=True, cpds_by_bins=True) intervals = results[:,[1,2]] if not self.mondrian: if self.normalized: - crps = calculate_crps(cpds, self.alphas, y_hat, sigmas, y) + crps = calculate_crps(cpds, self.alphas, sigmas, y) else: - crps = calculate_crps(cpds, self.alphas, y_hat, np.ones(len(y_hat)), y) + crps = calculate_crps(cpds, self.alphas, + np.ones(len(y_hat)), y) else: bin_values, bin_alphas = self.alphas - bin_indexes = [np.argwhere(bins == b).T[0] for b in bin_values] + bin_indexes = [np.argwhere(bins == b).T[0] + for b in bin_values] if self.normalized: - crps = np.sum([calculate_crps(cpds[b], bin_alphas[b], y_hat[bin_indexes[b]], sigmas[bin_indexes[b]], y[bin_indexes[b]])*len(bin_indexes[b]) + crps = np.sum([calculate_crps(cpds[b], + bin_alphas[b], + sigmas[bin_indexes[b]], + y[bin_indexes[b]]) \ + * len(bin_indexes[b]) for b in range(len(bin_values))])/len(y) else: - crps = np.sum([calculate_crps(cpds[b], bin_alphas[b], y_hat[bin_indexes[b]], np.ones(len(bin_indexes[b])), y[bin_indexes[b]])*len(bin_indexes[b]) + crps = np.sum([calculate_crps(cpds[b], + bin_alphas[b], + np.ones(len(bin_indexes[b])), + y[bin_indexes[b]]) \ + * len(bin_indexes[b]) for b in range(len(bin_values))])/len(y) - else: - intervals = self.predict(y_hat, sigmas=sigmas, bins=bins, lower_percentiles=lower_percentile, - higher_percentiles=higher_percentile, y_min=y_min, y_max=y_max, return_CRPS=False) + intervals = self.predict(y_hat, sigmas=sigmas, bins=bins, + lower_percentiles=lower_percentile, + higher_percentiles=higher_percentile, + y_min=y_min, y_max=y_max, + return_CRPS=False) if "error" in metrics: - test_results["error"] = 1-np.mean(np.logical_and(intervals[:,0]<=y,y<=intervals[:,1])) + test_results["error"] = 1-np.mean(np.logical_and( + intervals[:,0]<=y,y<=intervals[:,1])) if "efficiency" in metrics: test_results["efficiency"] = np.mean(intervals[:,1]-intervals[:,0]) - if "CRPS" in metrics: + if "CRPS" in metrics: test_results["CRPS"] = crps if "time_fit" in metrics: test_results["time_fit"] = self.time_fit @@ -379,23 +573,73 @@ def evaluate(self, y_hat=None, y=None, sigmas=None, bins=None, confidence=0.95, test_results["time_evaluate"] = self.time_evaluate return test_results -def calculate_crps(cpds, alphas, predictions, sigmas, y): +def calculate_crps(cpds, alphas, sigmas, y): + """ + Calculate mean continuous-ranked probability score (crps) + for a set of conformal predictive distributions. + + Parameters + ---------- + cpds : array-like of shape (n_values, c_values) + conformal predictive distributions + alphas : array-like of shape (c_values,) + sorted (normalized) residuals of the calibration examples + sigmas : array-like of shape (n_values,), + difficulty estimates + y : array-like of shape (n_values,) + correct target values + + Returns + ------- + crps : float + mean continuous-ranked probability score for the conformal + predictive distributions + """ widths = np.array([alphas[i+1]-alphas[i] for i in range(len(alphas)-1)]) cum_probs = np.cumsum([1/len(alphas) for i in range(len(alphas)-1)]) lower_errors = cum_probs**2 - upper_errors = (1-cum_probs)**2 + higher_errors = (1-cum_probs)**2 cpd_indexes = [np.argwhere(cpds[i]