Skip to content

Commit

Permalink
feat: add conformal Bayesian prediction
Browse files Browse the repository at this point in the history
  • Loading branch information
lsorber committed Feb 24, 2024
1 parent a2d6028 commit 8fd0bbe
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 49 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Neo LS-SVM is a modern [Least-Squares Support Vector Machine](https://en.wikiped
5. 🌀 Learns an affine transformation of the feature matrix to optimally separate the target's bins.
6. 🪞 Can solve the LS-SVM both in the primal and dual space.
7. 🌡️ Isotonically calibrated `predict_proba` based on the leave-one-out predictions.
8. 🎲 Asymmetric conformal Bayesian confidence intervals for classification and regression.

## Using

Expand Down
12 changes: 6 additions & 6 deletions src/neo_ls_svm/_feature_maps.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,13 @@ def fit(
def transform(self, X: FloatMatrix[F]) -> ComplexMatrix[C]:
"""Transform a feature matrix X ∈ Rⁿˣᵈ into φ(X) ∈ Cⁿˣᴰ⁺¹ so that φ(X)ᵢ := [φ(xᵢ)' 1].
Notice that we can choose to solve an LS-SVM in the primal or dual space using the matrix
identity (γI + AB)⁻¹ A = A (γI + BA)⁻¹:
Notice that we can choose to solve an LS-SVM in the primal or dual space using the
push-through identity (γ𝕀 + AB)⁻¹ A = A (γ𝕀 + BA)⁻¹:
argmin ||y - φ(X)β̂||² + γ||β̂||²
= (γI + φ(X)'φ(X))⁻¹ φ(X)'y
= φ(X)' (γI + φ(X)φ(X)')⁻¹y with identity (γI + AB)⁻¹ A = A (γI + BA)⁻¹
= φ(X)'a where a = (γI + φ(X)φ(X)')⁻¹y = (γI + k(xᵢ, xⱼ))⁻¹y
argmin ||φ(X)β̂ - y||² + γ||β̂||²
= (γ𝕀 + φ(X)'φ(X))⁻¹ φ(X)'y
= φ(X)' (γ𝕀 + φ(X)φ(X)')⁻¹y with the identity (γ𝕀 + AB)⁻¹ A = A (γ𝕀 + BA)⁻¹
= φ(X)'α̂ where α̂ := (γ𝕀 + φ(X)φ(X)')⁻¹y = (γ𝕀 + k(xᵢ, xⱼ))⁻¹y
This means that k(x, y) = φ(x)'φ(y) by definition. Now we look for a φ(x) so that k(x, y) =
φ(x)'φ(y) for the Gaussian kernel k(x, y) = exp(- ||y - x||² / 2). If we take h(x) :=
Expand Down
178 changes: 137 additions & 41 deletions src/neo_ls_svm/_neo_ls_svm.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

import numpy as np
import numpy.typing as npt
from scipy.linalg import eigh, lu_factor, lu_solve
from scipy.linalg import cho_factor, cho_solve, eigh, lu_factor, lu_solve
from sklearn.base import BaseEstimator, clone
from sklearn.isotonic import IsotonicRegression
from sklearn.linear_model import QuantileRegressor
from sklearn.metrics import accuracy_score, r2_score
from sklearn.metrics.pairwise import euclidean_distances, rbf_kernel
from sklearn.utils.validation import check_consistent_length, check_X_y
Expand All @@ -21,6 +22,7 @@
ComplexMatrix,
ComplexVector,
FloatMatrix,
FloatTensor,
FloatVector,
GenericVector,
)
Expand All @@ -34,34 +36,30 @@ class NeoLSSVM(BaseEstimator):
A neo Least-Squares Support Vector Machine with:
- [x] A next-generation regularisation term that penalises the complexity of the prediction
surface, decision function, and maximises the margin.
- [x] Large-scale support through state-of-the-art random feature maps.
- [x] Optional automatic selection of primal or dual problem.
- [x] Automatic optimal tuning of the regularisation hyperparameter γ that minimises the
leave-one-out error, without having to refit the model.
- [x] Automatic tuning of the kernel parameters σ, without having to refit the model.
- [x] Automatic robust shift and scaling of the feature matrix and labels.
- [x] Leave-one-out residuals and error as a free output after fitting, optimally clipped in
classification.
- [x] Isotonically calibrated class probabilities based on leave-one-out predictions.
- [ ] Automatic robust fit by removing outliers.
1. ⚡ Linear complexity in the number of training examples with Orthogonal Random Features.
2. 🚀 Hyperparameter free: zero-cost optimization of the regularisation parameter γ and
kernel parameter σ.
3. 🏔️ Adds a new tertiary objective that minimizes the complexity of the prediction surface.
4. 🎁 Returns the leave-one-out residuals and error for free after fitting.
5. 🌀 Learns an affine transformation of the feature matrix to optimally separate the
target's bins.
6. 🪞 Can solve the LS-SVM both in the primal and dual space.
7. 🌡️ Isotonically calibrated `predict_proba` based on the leave-one-out predictions.
8. 🎲 Asymmetric conformal Bayesian confidence intervals for classification and regression.
"""

def __init__( # noqa: PLR0913
self,
*,
primal_feature_map: KernelApproximatingFeatureMap | None = None,
dual_feature_map: AffineSeparator | None = None,
dual: bool | None = None,
refit: bool = False,
primal_feature_map: KernelApproximatingFeatureMap | Literal["auto"] = "auto",
dual_feature_map: AffineSeparator | Literal["auto"] = "auto",
dual: bool | Literal["auto"] = "auto",
estimator_type: Literal["auto", "classifier", "regressor"] = "auto",
random_state: int | np.random.RandomState | None = 42,
estimator_type: Literal["classifier", "regressor"] | None = None,
) -> None:
self.primal_feature_map = primal_feature_map
self.dual_feature_map = dual_feature_map
self.dual = dual
self.refit = refit
self.random_state = random_state
self.estimator_type = estimator_type

Expand Down Expand Up @@ -156,6 +154,7 @@ def _optimize_β̂_γ(
)
# Store the leave-one-out residuals, leverage, error, and score.
self.loo_residuals_ = loo_residuals[:, optimum]
self.loo_ŷ_ = y + self.loo_residuals_
self.loo_leverage_ = h @ [:, optimum]
self.loo_error_ = self.loo_errors_γs_[optimum]
if self._estimator_type == "classifier":
Expand All @@ -164,12 +163,17 @@ def _optimize_β̂_γ(
self.loo_score_ = r2_score(y, ŷ_loo[:, optimum], sample_weight=s)
β̂, γ = β̂ @ [:, optimum], self.γs_[optimum]
# Resolve the linear system for better accuracy.
if self.refit:
β̂ = np.linalg.solve(γ * C + A, φSTSy)
self.L_ = cho_factor(γ * C + A)
β̂ = cho_solve(self.L_, φSTSy)
self.residuals_ = np.real(φ @ β̂) - y
if self._estimator_type == "classifier":
self.residuals_[(y > 0) & (self.residuals_ > 0)] = 0
self.residuals_[(y < 0) & (self.residuals_ < 0)] = 0
# Compute the leave-one-out nonconformity with the Sherman-Morrison formula.
σ2 = np.real(np.sum(φ * cho_solve(self.L_, φ.conj().T).T, axis=1))
σ2 = np.ascontiguousarray(σ2)
loo_σ2 = σ2 + (s * σ2) ** 2 / (1 - self.loo_leverage_)
self.loo_nonconformity_ = np.sqrt(loo_σ2)
# TODO: Print warning if optimal γ is found at the edge.
return β̂, γ

Expand Down Expand Up @@ -287,19 +291,24 @@ def _optimize_α̂_γ(
)
# Store the leave-one-out residuals, leverage, error, and score.
self.loo_residuals_ = loo_residuals[:, optimum]
self.loo_ŷ_ = y + self.loo_residuals_
self.loo_error_ = self.loo_errors_γs_[optimum]
if self._estimator_type == "classifier":
self.loo_score_ = accuracy_score(y, np.sign(ŷ_loo[:, optimum]), sample_weight=s)
elif self._estimator_type == "regressor":
self.loo_score_ = r2_score(y, ŷ_loo[:, optimum], sample_weight=s)
α̂, γ = α̂_loo[:, optimum], self.γs_[optimum]
# Resolve the linear system for better accuracy.
if self.refit:
α̂ = np.linalg.solve(γ * ρ * np.diag(sn**-2) + K, y)
self.L_ = cho_factor(γ * ρ * np.diag(sn**-2) + K)
α̂ = cho_solve(self.L_, y)
self.residuals_ = F @ α̂ - y
if self._estimator_type == "classifier":
self.residuals_[(y > 0) & (self.residuals_ > 0)] = 0
self.residuals_[(y < 0) & (self.residuals_ < 0)] = 0
# Compute the nonconformity. TODO: Apply a leave-one-out correction.
K = rbf_kernel(X, X, gamma=0.5)
σ2 = 1.0 - np.sum(K * cho_solve(self.L_, K.T).T, axis=1)
self.loo_nonconformity_ = np.sqrt(σ2)
# TODO: Print warning if optimal γ is found at the edge.
return α̂, γ

Expand Down Expand Up @@ -334,7 +343,9 @@ def fit(
or np.issubdtype(y.dtype, np.timedelta64)
):
inferred_estimator_type = "regressor"
self._estimator_type: str | None = self.estimator_type or inferred_estimator_type
self._estimator_type: str | None = (
inferred_estimator_type if self.estimator_type == "auto" else self.estimator_type
)
if self._estimator_type == "classifier":
self.classes_: GenericVector = unique_y
negatives = y == self.classes_[0]
Expand All @@ -346,18 +357,24 @@ def fit(
message = "Target type not supported"
raise ValueError(message)
# Determine whether we want to solve this in the primal or dual space.
self.dual_ = X.shape[0] <= 1024 if self.dual is None else self.dual # noqa: PLR2004
self.dual_ = X.shape[0] <= 1024 if self.dual == "auto" else self.dual # noqa: PLR2004
self.primal_ = not self.dual_
# Learn an optimal distance metric for the primal or dual space and apply it to the feature
# matrix X.
if self.primal_:
self.primal_feature_map_ = clone(
self.primal_feature_map or OrthogonalRandomFourierFeatures()
OrthogonalRandomFourierFeatures()
if self.primal_feature_map == "auto"
else self.primal_feature_map
)
self.primal_feature_map_.fit(X, y_, sample_weight_)
φ = self.primal_feature_map_.transform(X)
else:
self.dual_feature_map_ = clone(self.dual_feature_map or AffineSeparator())
nz_weight = sample_weight_ > 0
X, y_, sample_weight_ = X[nz_weight], y_[nz_weight], sample_weight_[nz_weight]
self.dual_feature_map_ = clone(
AffineSeparator() if self.dual_feature_map == "auto" else self.dual_feature_map
)
self.dual_feature_map_.fit(X, y_, sample_weight_)
self.X_ = self.dual_feature_map_.transform(X)
# Solve the primal or dual system. We optimise the following sub-objectives for the weights
Expand All @@ -375,21 +392,94 @@ def fit(
self.predict_proba_calibrator_ = IsotonicRegression(
out_of_bounds="clip", y_min=0, y_max=1, increasing=True
)
ŷ_loo = y_ + self.loo_residuals_
target = np.zeros_like(y_)
target[y_ == np.max(y_)] = 1.0
self.predict_proba_calibrator_.fit(ŷ_loo, target, sample_weight_)
self.predict_proba_calibrator_.fit(self.loo_ŷ_, target, sample_weight_)
# Lazily fit conformal predictors as quantile regression models that predict the lower and
# upper bounds of the (relative) leave-one-out residuals.
self.conformal_regressors_: dict[str, dict[float, QuantileRegressor]] = {
"Δ⁺": {},
"Δ⁻": {},
"Δ⁺/ŷ": {},
"Δ⁻/ŷ": {},
}
return self

def nonconformity_measure(self, X: FloatMatrix[F]) -> FloatVector[F]:
"""Compute the nonconformity of a set of examples."""
# Estimate the nonconformity as the variance of this model's Gaussian Process.
σ2: FloatVector[F]
if self.primal_:
# If β̂ := (LL')⁻¹ y* and cov(y*) := LL', then cov(β̂) = cov((LL')⁻¹ y*) = (LL')⁻¹
# assuming 𝔼(β̂) = 0. It follows that cov(ŷ(x)) = cov(φ(x)'β̂) = φ(x)'(LL')⁻¹φ(x).
φH = cast(KernelApproximatingFeatureMap, self.primal_feature_map_).transform(X)
σ2 = np.real(np.sum(φH * cho_solve(self.L_, φH.conj().T).T, axis=1))
σ2 = np.ascontiguousarray(σ2)
else:
# Compute the cov(ŷ(x)) as K(x, x) − K(x, X) (LL')⁻¹ K(X, x). TODO: Document derivation.
X = cast(AffineFeatureMap, self.dual_feature_map_).transform(X)
K = rbf_kernel(X, self.X_, gamma=0.5)
σ2 = 1.0 - np.sum(K * cho_solve(self.L_, K.T).T, axis=1)
# Convert the variance to a standard deviation.
σ = np.sqrt(σ2)
return σ

def predict_confidence_interval(
self, X: FloatMatrix[F], *, confidence_level: float = 0.95
) -> FloatMatrix[F] | FloatTensor[F]:
# Compute the nonconformity measure for the given examples.
X_nonconformity = self.nonconformity_measure(X)[:, np.newaxis]
# Determine the quantiles at the edge of the confidence interval.
quantile = 1 - (1 - confidence_level) / 2
# Lazily fit any missing conformal regressors.
# TODO: Perhaps exclude samples that were used in the feature map.
for target_type in ("Δ⁺", "Δ⁻", "Δ⁺/ŷ", "Δ⁻/ŷ"):
quantile_regressors = self.conformal_regressors_[target_type]
if quantile not in quantile_regressors:
sgn = (self.loo_residuals_ > 0) if "⁺" in target_type else (self.loo_residuals_ < 0)
eps = np.finfo(self.loo_ŷ_.dtype).eps
quantile_regressors[quantile] = QuantileRegressor(
quantile=quantile, alpha=np.sqrt(eps), solver="highs"
).fit(
self.loo_nonconformity_[sgn, np.newaxis],
np.abs(self.loo_residuals_[sgn]) / np.maximum(np.abs(self.loo_ŷ_)[sgn], eps)
if "/ŷ" in target_type
else np.abs(self.loo_residuals_[sgn]),
)
# Predict the confidence interval for the nonconformity measure.
ŷ = self.decision_function(X)
Δ_lower = np.minimum(
self.conformal_regressors_["Δ⁻"][quantile].predict(X_nonconformity),
np.abs(ŷ) * self.conformal_regressors_["Δ⁻/ŷ"][quantile].predict(X_nonconformity),
)
Δ_upper = np.minimum(
self.conformal_regressors_["Δ⁺"][quantile].predict(X_nonconformity),
np.abs(ŷ) * self.conformal_regressors_["Δ⁺/ŷ"][quantile].predict(X_nonconformity),
)
# Assemble the confidence interval.
C = np.hstack(((ŷ - Δ_lower)[:, np.newaxis], (ŷ + Δ_upper)[:, np.newaxis]))
# In case of classification, convert the decision function values to probabilities.
if self._estimator_type == "classifier":
C = np.hstack(
[
self.predict_proba_calibrator_.transform(C[:, 0])[:, np.newaxis],
self.predict_proba_calibrator_.transform(C[:, 1])[:, np.newaxis],
]
)
C = np.dstack([1 - C[:, ::-1], C])
return C

def decision_function(self, X: FloatMatrix[F]) -> FloatVector[F]:
"""Evaluate this predictor's decision function."""
"""Evaluate this predictor's prediction function."""
# Compute the point predictions ŷ(X).
ŷ: FloatVector[F]
if self.primal_:
# Apply the feature map φ and predict as ŷ(x) := φ(x)'β̂.
φ = cast(KernelApproximatingFeatureMap, self.primal_feature_map_).transform(X)
ŷ = np.real(φ @ self.β̂_)
ŷ = np.ascontiguousarray(ŷ)
else:
# Apply an affine transformation to X, then predict as ŷ(x) := k(x, X) + 1'.
# Apply an affine transformation to X, then predict as ŷ(x) := k(x, X) α̂ + 1'α̂.
X = cast(AffineFeatureMap, self.dual_feature_map_).transform(X)
K = rbf_kernel(X, self.X_, gamma=0.5)
b = np.sum(self.α̂_)
Expand All @@ -398,7 +488,7 @@ def decision_function(self, X: FloatMatrix[F]) -> FloatVector[F]:

def predict(self, X: FloatMatrix[F]) -> GenericVector:
"""Predict the output on a given dataset."""
# Evaluate ŷ given the feature matrix X.
# Compute the point predictions ŷ(X).
ŷ_df = self.decision_function(X)
if self._estimator_type == "classifier":
# For binary classification, round to the nearest class label. When the decision
Expand All @@ -415,23 +505,29 @@ def predict(self, X: FloatMatrix[F]) -> GenericVector:
ŷ = ŷ.astype(self.y_dtype_)
return ŷ

def predict_proba(self, X: FloatMatrix[F]) -> FloatMatrix[F]:
"""Predict the output probability (classification) or confidence interval (regression)."""
def predict_proba(
self,
X: FloatMatrix[F],
*,
confidence_interval: bool = False,
confidence_level: float = 0.95,
) -> FloatVector[F] | FloatMatrix[F] | FloatTensor[F]:
"""Predict the class probability or confidence interval."""
if confidence_interval:
# Return the confidence interval for classification or regression.
C = self.predict_confidence_interval(X, confidence_level=confidence_level)
return C
if self._estimator_type == "classifier":
# Return the class probabilities for classification.
ŷ_classification = self.decision_function(X)
p = self.predict_proba_calibrator_.transform(ŷ_classification)
P = np.hstack([1 - p[:, np.newaxis], p[:, np.newaxis]])
else:
# TODO: Replace point predictions with confidence interval.
# Return the point predictions for regression.
ŷ_regression = self.predict(X)
P = np.hstack((ŷ_regression[:, np.newaxis], ŷ_regression[:, np.newaxis]))
P = ŷ_regression
return P

@property
def loo_score(self) -> float:
"""Compute the leave-one-out score of this classifier or regressor."""
return cast(float, self.loo_score_)

def score(
self, X: FloatMatrix[F], y: GenericVector, sample_weight: FloatVector[F] | None = None
) -> float:
Expand Down
28 changes: 26 additions & 2 deletions tests/test_neo_ls_svm.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ def test_compare_neo_ls_svm_with_svm(dataset: Dataset, table_vectorizer: TableVe
X_train, X_test, y_train, y_test = dataset
# Create the pipelines.
num_unique = len(y_train.unique())
if num_unique == 2: # noqa: PLR2004
binary = num_unique == 2 # noqa: PLR2004
multiclass = 2 < num_unique <= np.ceil(np.sqrt(len(y_train))) # noqa: PLR2004
if binary:
neo_ls_svm_pipeline = make_pipeline(table_vectorizer, NeoLSSVM())
svm_pipeline = make_pipeline(table_vectorizer, SVC())
elif num_unique <= np.ceil(np.sqrt(len(y_train))):
elif multiclass:
neo_ls_svm_pipeline = make_pipeline(table_vectorizer, OneVsRestClassifier(NeoLSSVM()))
svm_pipeline = make_pipeline(table_vectorizer, OneVsRestClassifier(SVC()))
else:
Expand All @@ -33,6 +35,28 @@ def test_compare_neo_ls_svm_with_svm(dataset: Dataset, table_vectorizer: TableVe
neo_ls_svm_score = neo_ls_svm_pipeline.score(X_test, y_test)
svm_score = svm_pipeline.score(X_test, y_test)
assert neo_ls_svm_score > svm_score
# Verify the coverage of the confidence interval.
if multiclass:
return
confidence_level = 0.8
X_conf = neo_ls_svm_pipeline.predict_proba(
X_test, confidence_interval=True, confidence_level=confidence_level
)
if binary:
assert np.all(X_conf >= 0)
assert np.all(X_conf <= 1)
assert np.all(X_conf[:, 0, 0] <= X_conf[:, 1, 0])
assert np.all(X_conf[:, 0, 1] <= X_conf[:, 1, 1])
is_neg = y_test == neo_ls_svm_pipeline.steps[-1][1].classes_[0]
is_pos = ~is_neg
neg_covered = np.any(X_conf[:, :, 0] > 0.5, axis=1) & is_neg # noqa: PLR2004
pos_covered = np.any(X_conf[:, :, 1] > 0.5, axis=1) & is_pos # noqa: PLR2004
covered = neg_covered | pos_covered
elif not multiclass:
assert np.all(X_conf[:, 0] <= X_conf[:, 1])
covered = (X_conf[:, 0] <= y_test) & (y_test <= X_conf[:, 1])
coverage = np.mean(covered)
assert coverage >= confidence_level


def test_sklearn_check_estimator() -> None:
Expand Down

0 comments on commit 8fd0bbe

Please sign in to comment.