Skip to content

Commit

Permalink
Merge pull request #157 from IFCA/feature-mmd-chunk-pairwise-distance
Browse files Browse the repository at this point in the history
Allow chunk pairwise distance in MMD
  • Loading branch information
jaime-cespedes-sisniega authored Apr 7, 2023
2 parents afbc599 + 88e89c7 commit 3279c10
Show file tree
Hide file tree
Showing 8 changed files with 225 additions and 23 deletions.
4 changes: 2 additions & 2 deletions frouros/detectors/data_drift/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ def X_ref(self, value: Optional[np.ndarray]) -> None: # noqa: N802
self._check_array(X=value)
self._X_ref = value

def fit(self, X: np.ndarray) -> Dict[str, Any]: # noqa: N803
def fit(self, X: np.ndarray, **kwargs) -> Dict[str, Any]: # noqa: N803
"""Fit detector.
:param X: feature data
Expand All @@ -186,7 +186,7 @@ def fit(self, X: np.ndarray) -> Dict[str, Any]: # noqa: N803
self._check_fit_dimensions(X=X)
for callback in self.callbacks: # type: ignore
callback.on_fit_start()
self._fit(X=X)
self._fit(X=X, **kwargs)
for callback in self.callbacks: # type: ignore
callback.on_fit_end()

Expand Down
7 changes: 6 additions & 1 deletion frouros/detectors/data_drift/batch/distance_based/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ def _distance_measure(
self,
X_ref: np.ndarray, # noqa: N803
X: np.ndarray, # noqa: N803
**kwargs,
) -> DistanceResult:
pass

Expand Down Expand Up @@ -166,6 +167,7 @@ def _distance_measure(
self,
X_ref: np.ndarray, # noqa: N803
X: np.ndarray, # noqa: N803
**kwargs,
) -> DistanceResult:
distance_bins = self._distance_measure_bins(X_ref=X_ref, X=X)
distance = DistanceResult(distance=distance_bins)
Expand All @@ -186,7 +188,9 @@ def _calculate_bins_values(

@abc.abstractmethod
def _distance_measure_bins(
self, X_ref: np.ndarray, X: np.ndarray # noqa: N803
self,
X_ref: np.ndarray, # noqa: N803
X: np.ndarray, # noqa: N803
) -> float:
pass

Expand Down Expand Up @@ -246,6 +250,7 @@ def _distance_measure(
self,
X_ref: np.ndarray, # noqa: N803
X: np.ndarray, # noqa: N803
**kwargs,
) -> DistanceResult:
pass

Expand Down
1 change: 1 addition & 0 deletions frouros/detectors/data_drift/batch/distance_based/emd.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def _distance_measure(
self,
X_ref: np.ndarray, # noqa: N803
X: np.ndarray, # noqa: N803
**kwargs,
) -> DistanceResult:
emd = self._emd(X=X_ref, Y=X, **self.kwargs)
distance = DistanceResult(distance=emd)
Expand Down
5 changes: 3 additions & 2 deletions frouros/detectors/data_drift/batch/distance_based/js.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(
self,
num_bins: int = 10,
callbacks: Optional[Union[Callback, List[Callback]]] = None,
**kwargs
**kwargs,
) -> None:
"""Init method.
Expand All @@ -50,6 +50,7 @@ def _distance_measure(
self,
X_ref: np.ndarray, # noqa: N803
X: np.ndarray, # noqa: N803
**kwargs,
) -> DistanceResult:
js = self._js(X=X_ref, Y=X, num_bins=self.num_bins, **self.kwargs)
distance = DistanceResult(distance=js)
Expand All @@ -61,7 +62,7 @@ def _js(
Y: np.ndarray,
*,
num_bins: int,
**kwargs: Dict[str, Any]
**kwargs: Dict[str, Any],
) -> float:
( # noqa: N806
X_ref_rvs,
Expand Down
1 change: 1 addition & 0 deletions frouros/detectors/data_drift/batch/distance_based/kl.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def _distance_measure(
self,
X_ref: np.ndarray, # noqa: N803
X: np.ndarray, # noqa: N803
**kwargs,
) -> DistanceResult:
kl = self._kl(X=X_ref, Y=X, num_bins=self.num_bins, **self.kwargs)
distance = DistanceResult(distance=kl)
Expand Down
176 changes: 160 additions & 16 deletions frouros/detectors/data_drift/batch/distance_based/mmd.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
"""MMD (Maximum Mean Discrepancy) module."""

from typing import Callable, Optional, List, Union
import itertools
import math
from typing import Callable, Iterator, Optional, List, Union

import numpy as np # type: ignore
from scipy.spatial.distance import cdist # type: ignore
import tqdm # type: ignore

from frouros.callbacks import Callback
from frouros.detectors.data_drift.base import MultivariateData
Expand Down Expand Up @@ -43,12 +46,15 @@ class MMD(DistanceBasedBase):
def __init__(
self,
kernel: Callable = rbf_kernel,
chunk_size: Optional[int] = None,
callbacks: Optional[Union[Callback, List[Callback]]] = None,
) -> None:
"""Init method.
:param kernel: kernel function to use
:type kernel: Callable
:param chunk_size:
:type chunk_size: Optional[int]
:param callbacks: callbacks
:type callbacks: Optional[Union[Callback, List[Callback]]]
"""
Expand All @@ -61,13 +67,42 @@ def __init__(
callbacks=callbacks,
)
self.kernel = kernel
self.chunk_size = chunk_size
self._chunk_size_x = None
self._expected_k_x = None
self._X_num_samples = None

@property
def chunk_size(self) -> Optional[int]:
"""Chunk size property.
:return: chunk size to use
:rtype: int
"""
return self._chunk_size

@chunk_size.setter
def chunk_size(self, value: Optional[int]) -> None:
"""Chunk size method setter.
:param value: value to be set
:type value: Optional[int]
:raises TypeError: Type error exception
"""
if value is not None:
if isinstance(value, int): # type: ignore
if value <= 0:
raise ValueError("chunk_size must be greater than 0 or None.")
else:
raise TypeError("chunk_size must be of type int or None.")
self._chunk_size = value

@property
def kernel(self) -> Callable:
"""Kernel property.
:return: kernel function to use
:rtype: Kernel
:rtype: Callable
"""
return self._kernel

Expand All @@ -80,38 +115,147 @@ def kernel(self, value: Callable) -> None:
:raises TypeError: Type error exception
"""
if not isinstance(value, Callable): # type: ignore
raise TypeError("value must be of type Callable.")
raise TypeError("kernel must be of type Callable.")
self._kernel = value

def _distance_measure(
self,
X_ref: np.ndarray, # noqa: N803
X: np.ndarray, # noqa: N803
**kwargs,
) -> DistanceResult:
mmd = self._mmd(X=X_ref, Y=X, kernel=self.kernel)
mmd = self._mmd(X=X_ref, Y=X, kernel=self.kernel, **kwargs)
distance_test = DistanceResult(distance=mmd)
return distance_test

def _fit(
self,
X: np.ndarray, # noqa: N803
**kwargs,
) -> None:
super()._fit(X=X)
# Add dimension only for the kernel calculation (if dim == 1)
if X.ndim == 1:
X = np.expand_dims(X, axis=1) # noqa: N806
self._X_num_samples = len(X) # type: ignore # noqa: N806

self._chunk_size_x = (
self._X_num_samples
if self.chunk_size is None
else self.chunk_size # type: ignore
)

X_chunks = self._get_chunks( # noqa: N806
data=X,
chunk_size=self._chunk_size_x, # type: ignore
)
X_chunks_combinations = itertools.product(X_chunks, repeat=2) # noqa: N806

if kwargs.get("verbose", False):
num_chunks = (
math.ceil(self._X_num_samples / self._chunk_size_x) ** 2 # type: ignore
)
k_x_sum = np.array(
[
self.kernel(*X_chunk).sum()
for X_chunk in tqdm.tqdm( # noqa: N806
X_chunks_combinations, total=num_chunks # noqa: N806
)
]
).sum()
else:
k_x_sum = np.array(
[
self.kernel(*X_chunk).sum()
for X_chunk in X_chunks_combinations # noqa: N806
]
).sum()
self._expected_k_x = k_x_sum / (
self._X_num_samples * (self._X_num_samples - 1) # type: ignore
)

@staticmethod
def _mmd(
def _get_chunks(data: np.ndarray, chunk_size: int) -> Iterator:
chunks = (
data[i : i + chunk_size] # noqa: E203
for i in range(0, len(data), chunk_size)
)
return chunks

def _mmd( # pylint: disable=too-many-locals
self,
X: np.ndarray, # noqa: N803
Y: np.ndarray,
*,
kernel: Callable,
**kwargs,
) -> float: # noqa: N803
X_num_samples = X.shape[0] # noqa: N806
Y_num_samples = Y.shape[0] # noqa: N806
data = np.concatenate([X, Y]) # noqa: N806
# Only check for X dimension (X == Y dim comparison has been already made)
if X.ndim == 1:
data = np.expand_dims(data, axis=1)
X = np.expand_dims(X, axis=1) # noqa: N806
Y = np.expand_dims(Y, axis=1) # noqa: N806

X_chunks = self._get_chunks( # noqa: N806
data=X,
chunk_size=self._chunk_size_x, # type: ignore
)
Y_num_samples = len(Y) # noqa: N806
chunk_size_y = Y_num_samples if self.chunk_size is None else self.chunk_size
Y_chunks, Y_chunks_copy = itertools.tee( # noqa: N806
self._get_chunks(
data=Y,
chunk_size=chunk_size_y, # type: ignore
),
2,
)
Y_chunks_combinations = itertools.product( # noqa: N806
Y_chunks,
repeat=2,
)
XY_chunks_combinations = itertools.product( # noqa: N806
X_chunks,
Y_chunks_copy,
)

if kwargs.get("verbose", False):
num_chunks_y = math.ceil(Y_num_samples / self.chunk_size) # type: ignore
num_chunks_y_combinations = num_chunks_y**2
num_chunks_xy = (
math.ceil(len(X) / self._chunk_size_x) * num_chunks_y # type: ignore
)
sum_y = np.array(
[
kernel(*Y_chunk).sum()
for Y_chunk in tqdm.tqdm( # noqa: N806
Y_chunks_combinations, total=num_chunks_y_combinations
)
]
).sum()
sum_xy = np.array(
[
kernel(*XY_chunk).sum()
for XY_chunk in tqdm.tqdm( # noqa: N806
XY_chunks_combinations, total=num_chunks_xy
)
]
).sum()
else:
sum_y = np.array(
[
kernel(*Y_chunk).sum()
for Y_chunk in Y_chunks_combinations # noqa: N806
]
).sum()
sum_xy = np.array(
[
kernel(*XY_chunk).sum()
for XY_chunk in XY_chunks_combinations # noqa: N806
]
).sum()

k_matrix = kernel(X=data, Y=data)
k_x = k_matrix[:X_num_samples, :X_num_samples]
k_y = k_matrix[Y_num_samples:, Y_num_samples:]
k_xy = k_matrix[:X_num_samples, Y_num_samples:]
mmd = (
k_x.sum() / (X_num_samples * (X_num_samples - 1))
+ k_y.sum() / (Y_num_samples * (Y_num_samples - 1))
- 2 * k_xy.sum() / (X_num_samples * Y_num_samples)
self._expected_k_x
+ sum_y / (Y_num_samples * (Y_num_samples - 1))
- 2 * sum_xy / (self._X_num_samples * Y_num_samples) # type: ignore
)
return mmd
52 changes: 51 additions & 1 deletion frouros/tests/integration/test_data_drift.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Test data drift detectors."""

from typing import Tuple
from typing import Tuple, Union

import pytest # type: ignore
import numpy as np # type: ignore
Expand Down Expand Up @@ -243,6 +243,56 @@ def test_batch_distance_based_multivariate_same_distribution(
assert np.isclose(statistic, expected_distance)


@pytest.mark.parametrize(
"detector, expected_distance",
[(MMD(chunk_size=10), 0.12183835), (MMD(chunk_size=None), 0.12183835)],
)
def test_batch_distance_based_chunk_size_valid(
X_ref_multivariate: np.ndarray, # noqa: N803
X_test_multivariate: np.ndarray, # noqa: N803
detector: DataDriftBatchBase,
expected_distance: float,
) -> None:
"""Test batch distance based chunk size valid method.
:param X_ref_multivariate: reference multivariate data
:type X_ref_multivariate: numpy.ndarray
:param X_test_multivariate: test multivariate data
:type X_test_multivariate: numpy.ndarray
:param detector: detector test
:type detector: DataDriftBatchBase
:param expected_distance: expected distance value
:type expected_distance: float
"""
_ = detector.fit(X=X_ref_multivariate)
statistic, _ = detector.compare(X=X_test_multivariate)

assert np.isclose(statistic, expected_distance)


@pytest.mark.parametrize(
"chunk_size, expected_exception",
[
(1.5, TypeError),
("10", TypeError),
(-1, ValueError),
],
)
def test_batch_distance_based_chunk_size_invalid(
chunk_size: Union[int, float, str],
expected_exception: Union[TypeError, ValueError],
) -> None:
"""Test batch distance based chunk size invalid method.
:param chunk_size: chunk size
:type chunk_size: Union[int, float, str]
:param expected_exception: expected exception
:type expected_exception: Union[TypeError, ValueError]
"""
with pytest.raises(expected_exception):
_ = MMD(chunk_size=chunk_size) # type: ignore


@pytest.mark.parametrize(
"detector, expected_statistic, expected_p_value",
[
Expand Down
Loading

0 comments on commit 3279c10

Please sign in to comment.