diff --git a/.gitignore b/.gitignore index 4cb5a7d0..bdd7c9c7 100644 --- a/.gitignore +++ b/.gitignore @@ -155,3 +155,4 @@ notebooks/*-requirements.txt test.ipynb test2.ipynb test3.ipynb +.vscode/launch.json diff --git a/ehrapy/preprocessing/_normalization.py b/ehrapy/preprocessing/_normalization.py index 50769329..f40c7cc9 100644 --- a/ehrapy/preprocessing/_normalization.py +++ b/ehrapy/preprocessing/_normalization.py @@ -7,9 +7,15 @@ import sklearn.preprocessing as sklearn_pp from ehrdata.core.constants import NUMERIC_TAG -from ehrapy._compat import DaskArray, _raise_array_type_not_implemented, function_2D_only, use_ehrdata +from ehrapy._compat import ( + DaskArray, + _apply_over_time_axis, + _raise_array_type_not_implemented, + use_ehrdata, +) from ehrapy.anndata.anndata_ext import ( _assert_numeric_vars, + _get_var_indices, _get_var_indices_for_type, ) @@ -30,7 +36,10 @@ def _scale_func_group( copy: bool, norm_name: str, ) -> EHRData | AnnData | None: - """Apply scaling function to selected columns of edata, either globally or per group.""" + """Apply scaling function to selected columns of edata, either globally or per group. + + Supports both 2D and 3D data with unified layer handling. + """ if group_key is not None and group_key not in edata.obs_keys(): raise KeyError(f"group key '{group_key}' not found in edata.obs.") @@ -43,53 +52,57 @@ def _scale_func_group( edata = _prep_edata_norm(edata, copy) - if layer is None: - var_values = edata[:, vars].X.copy() - else: - var_values = edata[:, vars].layers[layer].copy() + var_indices = _get_var_indices(edata, vars) + X = edata.X if layer is None else edata.layers[layer] + + if np.issubdtype(X.dtype, np.integer): + X = X.astype(np.float32) if group_key is None: - var_values = scale_func(var_values) + X[:, var_indices] = scale_func(X[:, var_indices]) else: + # Group-wise normalization is not supported for Dask arrays + if isinstance(X, DaskArray): + raise NotImplementedError( + f"Group-wise normalization with group_key='{group_key}' does not support array type {type(X)}. " + "Please convert to numpy array first or use normalization without group_key." + ) + for group in edata.obs[group_key].unique(): - group_idx = edata.obs[group_key] == group - var_values[group_idx] = scale_func(var_values[group_idx]) + group_mask = np.where(edata.obs[group_key] == group)[0] + X[np.ix_(group_mask, var_indices)] = scale_func(X[np.ix_(group_mask, var_indices)]) if layer is None: - edata.X = edata.X.astype(var_values.dtype) - edata[:, vars].X = var_values + edata.X = X else: - edata.layers[layer] = edata.layers[layer].astype(var_values.dtype) - edata[:, vars].layers[layer] = var_values + edata.layers[layer] = X _record_norm(edata, vars, norm_name) - if copy: - return edata - else: - return None + return edata if copy else None @singledispatch -def _scale_norm_function(arr): +def _scale_norm_function(arr, **kwargs): _raise_array_type_not_implemented(_scale_norm_function, type(arr)) -@_scale_norm_function.register +@_scale_norm_function.register(np.ndarray) +@_apply_over_time_axis def _(arr: np.ndarray, **kwargs): - return sklearn_pp.StandardScaler(**kwargs).fit_transform + return sklearn_pp.StandardScaler(**kwargs).fit_transform(arr) -@_scale_norm_function.register +@_scale_norm_function.register(DaskArray) +@_apply_over_time_axis def _(arr: DaskArray, **kwargs): import dask_ml.preprocessing as daskml_pp - return daskml_pp.StandardScaler(**kwargs).fit_transform + return daskml_pp.StandardScaler(**kwargs).fit_transform(arr) @use_ehrdata(deprecated_after="1.0.0") -@function_2D_only() def scale_norm( edata: EHRData | AnnData, vars: str | Sequence[str] | None = None, @@ -103,6 +116,11 @@ def scale_norm( Functionality is provided by :class:`~sklearn.preprocessing.StandardScaler`, see https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.StandardScaler.html for details. If `edata.X` is a Dask Array, functionality is provided by :class:`~dask_ml.preprocessing.StandardScaler`, see https://ml.dask.org/modules/generated/dask_ml.preprocessing.StandardScaler.html for details. + Supports both 2D and 3D data: + + - 2D data: Standard normalization across observations + - 3D data: Per-variable normalization across samples and timestamps + Args: edata: Central data object. Must already be encoded using :func:`~ehrapy.preprocessing.encode`. vars: List of the names of the numeric variables to normalize. @@ -113,15 +131,21 @@ def scale_norm( **kwargs: Additional arguments passed to the StandardScaler. Returns: - `None` if `copy=False` and modifies the passed edata, else returns an updated AnnData object. Also stores a record of applied normalizations as a dictionary in edata.uns["normalization"]. + `None` if `copy=False` and modifies the passed edata, else returns an updated object. Also stores a record of applied normalizations as a dictionary in edata.uns["normalization"]. Examples: >>> import ehrdata as ed >>> import ehrapy as ep - >>> edata = ed.dt.mimic_2() - >>> edata_norm = ep.pp.scale_norm(edata, copy=True) + >>> import numpy as np + >>> edata = ed.dt.physionet2012(layer="tem_data") + >>> np.nanmean(edata.layers["tem_data"]) + 74.213570 + >>> ep.pp.scale_norm(edata, layer="tem_data") + >>> np.nanmean(edata.layers["tem_data"]) + 0.0 + """ - scale_func = _scale_norm_function(edata.X if layer is None else edata.layers[layer], **kwargs) + scale_func = lambda arr: _scale_norm_function(arr, **kwargs) return _scale_func_group( edata=edata, @@ -135,24 +159,25 @@ def scale_norm( @singledispatch -def _minmax_norm_function(arr): +def _minmax_norm_function(arr, **kwargs): _raise_array_type_not_implemented(_minmax_norm_function, type(arr)) -@_minmax_norm_function.register +@_minmax_norm_function.register(np.ndarray) +@_apply_over_time_axis def _(arr: np.ndarray, **kwargs): - return sklearn_pp.MinMaxScaler(**kwargs).fit_transform + return sklearn_pp.MinMaxScaler(**kwargs).fit_transform(arr) -@_minmax_norm_function.register +@_minmax_norm_function.register(DaskArray) +@_apply_over_time_axis def _(arr: DaskArray, **kwargs): import dask_ml.preprocessing as daskml_pp - return daskml_pp.MinMaxScaler(**kwargs).fit_transform + return daskml_pp.MinMaxScaler(**kwargs).fit_transform(arr) @use_ehrdata(deprecated_after="1.0.0") -@function_2D_only() def minmax_norm( edata: EHRData | AnnData, vars: str | Sequence[str] | None = None, @@ -166,6 +191,11 @@ def minmax_norm( Functionality is provided by :class:`~sklearn.preprocessing.MinMaxScaler`, see https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.MinMaxScaler.html for details. If `edata.X` is a Dask Array, functionality is provided by :class:`~dask_ml.preprocessing.MinMaxScaler`, see https://ml.dask.org/modules/generated/dask_ml.preprocessing.MinMaxScaler.html for details. + Supports both 2D and 3D data: + + - 2D data: Standard normalization across observations + - 3D data: Per-variable normalization across samples and timestamps + Args: edata: Central data object. Must already be encoded using :func:`~ehrapy.preprocessing.encode`. @@ -177,15 +207,20 @@ def minmax_norm( **kwargs: Additional arguments passed to the MinMaxScaler. Returns: - `None` if `copy=False` and modifies the passed edata, else returns an updated AnnData object. Also stores a record of applied normalizations as a dictionary in edata.uns["normalization"]. + `None` if `copy=False` and modifies the passed edata, else returns an updated object. Also stores a record of applied normalizations as a dictionary in edata.uns["normalization"]. Examples: >>> import ehrdata as ed >>> import ehrapy as ep - >>> edata = ed.dt.mimic_2() - >>> edata_norm = ep.pp.minmax_norm(edata, copy=True) + >>> import numpy as np + >>> edata = ed.dt.physionet2012(layer="tem_data") + >>> np.nanmin(edata.layers["tem_data"]), np.nanmax(edata.layers["tem_data"]) + (-17.8, 36400.0) + >>> ep.pp.minmax_norm(edata, layer="tem_data") + >>> np.nanmin(edata.layers["tem_data"]), np.nanmax(edata.layers["tem_data"]) + (0.0, 1.0) """ - scale_func = _minmax_norm_function(edata.X if layer is None else edata.layers[layer], **kwargs) + scale_func = lambda arr: _minmax_norm_function(arr, **kwargs) return _scale_func_group( edata=edata, @@ -200,15 +235,15 @@ def minmax_norm( @singledispatch def _maxabs_norm_function(arr): - _raise_array_type_not_implemented(_scale_norm_function, type(arr)) + _raise_array_type_not_implemented(_maxabs_norm_function, type(arr)) -@_maxabs_norm_function.register +@_maxabs_norm_function.register(np.ndarray) +@_apply_over_time_axis def _(arr: np.ndarray): - return sklearn_pp.MaxAbsScaler().fit_transform + return sklearn_pp.MaxAbsScaler().fit_transform(arr) -@function_2D_only() @use_ehrdata(deprecated_after="1.0.0") def maxabs_norm( edata: EHRData | AnnData, @@ -220,6 +255,12 @@ def maxabs_norm( """Apply max-abs normalization. Functionality is provided by :class:`~sklearn.preprocessing.MaxAbsScaler`, see https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.MaxAbsScaler.html for details. + Note: Dask arrays are not supported for this function. Please convert to numpy array first. + + Supports both 2D and 3D data: + + - 2D data: Standard normalization across observations + - 3D data: Per-variable normalization across samples and timestamps Args: edata: Central data object. @@ -231,15 +272,23 @@ def maxabs_norm( copy: Whether to return a copy or act in place. Returns: - `None` if `copy=False` and modifies the passed edata, else returns an updated AnnData object. Also stores a record of applied normalizations as a dictionary in edata.uns["normalization"]. + `None` if `copy=False` and modifies the passed edata, else returns an updated object. Also stores a record of applied normalizations as a dictionary in edata.uns["normalization"]. Examples: >>> import ehrdata as ed >>> import ehrapy as ep - >>> edata = ed.dt.mimic_2() - >>> edata_norm = ep.pp.maxabs_norm(edata, copy=True) + >>> import numpy as np + >>> edata = ed.dt.physionet2012(layer="tem_data") + >>> np.nanmax(np.abs(edata.layers["tem_data"])) + 36400.0 + >>> ep.pp.maxabs_norm(edata, layer="tem_data") + >>> np.nanmax(np.abs(edata.layers["tem_data"])) + 1.0 """ - scale_func = _maxabs_norm_function(edata.X if layer is None else edata.layers[layer]) + X = edata.X if layer is None else edata.layers[layer] + if isinstance(X, DaskArray): + _raise_array_type_not_implemented(_maxabs_norm_function, type(X)) + scale_func = _maxabs_norm_function return _scale_func_group( edata=edata, @@ -257,20 +306,21 @@ def _robust_scale_norm_function(arr, **kwargs): _raise_array_type_not_implemented(_robust_scale_norm_function, type(arr)) -@_robust_scale_norm_function.register +@_robust_scale_norm_function.register(np.ndarray) +@_apply_over_time_axis def _(arr: np.ndarray, **kwargs): - return sklearn_pp.RobustScaler(**kwargs).fit_transform + return sklearn_pp.RobustScaler(**kwargs).fit_transform(arr) -@_robust_scale_norm_function.register +@_robust_scale_norm_function.register(DaskArray) +@_apply_over_time_axis def _(arr: DaskArray, **kwargs): import dask_ml.preprocessing as daskml_pp - return daskml_pp.RobustScaler(**kwargs).fit_transform + return daskml_pp.RobustScaler(**kwargs).fit_transform(arr) @use_ehrdata(deprecated_after="1.0.0") -@function_2D_only() def robust_scale_norm( edata: EHRData | AnnData, vars: str | Sequence[str] | None = None, @@ -285,6 +335,11 @@ def robust_scale_norm( see https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.RobustScaler.html for details. If `edata.X` is a Dask Array, functionality is provided by :class:`~dask_ml.preprocessing.RobustScaler`, see https://ml.dask.org/modules/generated/dask_ml.preprocessing.RobustScaler.html for details. + Supports both 2D and 3D data: + + - 2D data: Standard normalization across observations + - 3D data: Per-variable normalization across samples and timestamps + Args: edata: Central data object. Must already be encoded using :func:`~ehrapy.preprocessing.encode`. @@ -296,15 +351,20 @@ def robust_scale_norm( **kwargs: Additional arguments passed to the RobustScaler. Returns: - `None` if `copy=False` and modifies the passed edata, else returns an updated AnnData object. Also stores a record of applied normalizations as a dictionary in edata.uns["normalization"]. + `None` if `copy=False` and modifies the passed edata, else returns an updated object. Also stores a record of applied normalizations as a dictionary in edata.uns["normalization"]. Examples: >>> import ehrdata as ed >>> import ehrapy as ep - >>> edata = ed.dt.mimic_2() - >>> edata_norm = ep.pp.robust_scale_norm(edata, copy=True) + >>> import numpy as np + >>> edata = ed.dt.physionet2012(layer="tem_data") + >>> np.nanmedian(edata.layers["tem_data"]) + 69.0 + >>> ep.pp.robust_scale_norm(edata, layer="tem_data") + >>> np.nanmedian(edata.layers["tem_data"]) + 0.0 """ - scale_func = _robust_scale_norm_function(edata.X if layer is None else edata.layers[layer], **kwargs) + scale_func = lambda arr: _robust_scale_norm_function(arr, **kwargs) return _scale_func_group( edata=edata, @@ -318,24 +378,25 @@ def robust_scale_norm( @singledispatch -def _quantile_norm_function(arr): +def _quantile_norm_function(arr, **kwargs): _raise_array_type_not_implemented(_quantile_norm_function, type(arr)) -@_quantile_norm_function.register +@_quantile_norm_function.register(np.ndarray) +@_apply_over_time_axis def _(arr: np.ndarray, **kwargs): - return sklearn_pp.QuantileTransformer(**kwargs).fit_transform + return sklearn_pp.QuantileTransformer(**kwargs).fit_transform(arr) -@_quantile_norm_function.register +@_quantile_norm_function.register(DaskArray) +@_apply_over_time_axis def _(arr: DaskArray, **kwargs): import dask_ml.preprocessing as daskml_pp - return daskml_pp.QuantileTransformer(**kwargs).fit_transform + return daskml_pp.QuantileTransformer(**kwargs).fit_transform(arr) @use_ehrdata(deprecated_after="1.0.0") -@function_2D_only() def quantile_norm( edata: EHRData | AnnData, vars: str | Sequence[str] | None = None, @@ -350,6 +411,11 @@ def quantile_norm( see https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.QuantileTransformer.html for details. If `edata.X` is a Dask Array, functionality is provided by :class:`~dask_ml.preprocessing.QuantileTransformer`, see https://ml.dask.org/modules/generated/dask_ml.preprocessing.QuantileTransformer.html for details. + Supports both 2D and 3D data: + + - 2D data: Standard normalization across observations + - 3D data: Per-variable normalization across samples and timestamps + Args: edata: Central data object. Must already be encoded using :func:`~ehrapy.preprocessing.encode`. vars: List of the names of the numeric variables to normalize. @@ -360,15 +426,20 @@ def quantile_norm( **kwargs: Additional arguments passed to the QuantileTransformer. Returns: - `None` if `copy=False` and modifies the passed edata, else returns an updated data object. Also stores a record of applied normalizations as a dictionary in edata.uns["normalization"]. + `None` if `copy=False` and modifies the passed edata, else returns an updated object. Also stores a record of applied normalizations as a dictionary in edata.uns["normalization"]. Examples: >>> import ehrdata as ed >>> import ehrapy as ep - >>> edata = ed.dt.mimic_2() - >>> edata_norm = ep.pp.quantile_norm(edata, copy=True) + >>> import numpy as np + >>> edata = ed.dt.physionet2012(layer="tem_data") + >>> np.nanmin(edata.layers["tem_data"]), np.nanmax(edata.layers["tem_data"]) + (-17.8, 36400.0) + >>> ep.pp.quantile_norm(edata, layer="tem_data") + >>> np.nanmin(edata.layers["tem_data"]), np.nanmax(edata.layers["tem_data"]) + (0.0, 1.0) """ - scale_func = _quantile_norm_function(edata.X if layer is None else edata.layers[layer], **kwargs) + scale_func = lambda arr: _quantile_norm_function(arr, **kwargs) return _scale_func_group( edata=edata, @@ -386,13 +457,13 @@ def _power_norm_function(arr, **kwargs): _raise_array_type_not_implemented(_power_norm_function, type(arr)) -@_power_norm_function.register +@_power_norm_function.register(np.ndarray) +@_apply_over_time_axis def _(arr: np.ndarray, **kwargs): - return sklearn_pp.PowerTransformer(**kwargs).fit_transform + return sklearn_pp.PowerTransformer(**kwargs).fit_transform(arr) @use_ehrdata(deprecated_after="1.0.0") -@function_2D_only() def power_norm( edata: EHRData | AnnData, vars: str | Sequence[str] | None = None, @@ -405,6 +476,12 @@ def power_norm( Functionality is provided by :class:`~sklearn.preprocessing.PowerTransformer`, see https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.PowerTransformer.html for details. + Note: Dask arrays are not supported for this function. Please convert to numpy array first. + + Supports both 2D and 3D data: + + - 2D data: Standard normalization across observations + - 3D data: Per-variable normalization across samples and timestamps Args: edata: Central data object. @@ -417,15 +494,27 @@ def power_norm( **kwargs: Additional arguments passed to the PowerTransformer. Returns: - `None` if `copy=False` and modifies the passed edata, else returns an updated data object. Also stores a record of applied normalizations as a dictionary in edata.uns["normalization"]. + `None` if `copy=False` and modifies the passed edata, else returns an updated object. Also stores a record of applied normalizations as a dictionary in edata.uns["normalization"]. Examples: >>> import ehrdata as ed >>> import ehrapy as ep - >>> edata = ed.dt.mimic_2() - >>> edata_norm = ep.pp.power_norm(edata, copy=True) + >>> import numpy as np + >>> from scipy import stats + >>> edata = ed.dt.physionet2012(layer="tem_data") + >>> ep.pp.offset_negative_values(edata, layer="tem_data") + >>> skewed_data = np.power(edata.layers["tem_data"], 2) + >>> edata.layers["tem_data"] = skewed_data + >>> stats.skew(edata.layers["tem_data"].flatten()) + 504.250727 + >>> ep.pp.power_norm(edata, layer="tem_data") + >>> stats.skew(edata.layers["tem_data"].flatten()) + 0.144324 """ - scale_func = _power_norm_function(edata.X if layer is None else edata.layers[layer], **kwargs) + X = edata.X if layer is None else edata.layers[layer] + if isinstance(X, DaskArray): + _raise_array_type_not_implemented(_power_norm_function, type(X)) + scale_func = lambda arr: _power_norm_function(arr, **kwargs) return _scale_func_group( edata=edata, @@ -438,8 +527,43 @@ def power_norm( ) +@singledispatch +def _log_norm_function(arr, offset: int | float = 1, base: int | float | None = None): + _raise_array_type_not_implemented(_log_norm_function, type(arr)) + + +@_log_norm_function.register(np.ndarray) +@_apply_over_time_axis +def _(arr: np.ndarray, offset: int | float = 1, base: int | float | None = None) -> np.ndarray: + if offset == 1: + np.log1p(arr, out=arr) + else: + np.add(arr, offset, out=arr) + np.log(arr, out=arr) + + if base is not None: + np.divide(arr, np.log(base), out=arr) + + return arr + + +@_log_norm_function.register(DaskArray) +@_apply_over_time_axis +def _(arr: DaskArray, offset: int | float = 1, base: int | float | None = None) -> DaskArray: + import dask.array as da + + if offset == 1: + result = da.log1p(arr) + else: + result = da.log(arr + offset) + + if base is not None: + result = result / np.log(base) + + return result + + @use_ehrdata(deprecated_after="1.0.0") -@function_2D_only() def log_norm( edata: EHRData | AnnData, vars: str | Sequence[str] | None = None, @@ -453,6 +577,11 @@ def log_norm( Computes :math:`x = \\log(x + offset)`, where :math:`log` denotes the natural logarithm unless a different base is given and the default :math:`offset` is :math:`1`. + Supports both 2D and 3D data: + + - 2D data: Standard normalization across observations + - 3D data: Applied to all elements across samples and timestamps + Args: edata: Central data object. vars: List of the names of the numeric variables to normalize. @@ -463,13 +592,19 @@ def log_norm( copy: Whether to return a copy or act in place. Returns: - `None` if `copy=False` and modifies the passed edata, else returns an updated data object. Also stores a record of applied normalizations as a dictionary in edata.uns["normalization"]. + `None` if `copy=False` and modifies the passed edata, else returns an updated object. Also stores a record of applied normalizations as a dictionary in edata.uns["normalization"]. Examples: >>> import ehrdata as ed >>> import ehrapy as ep - >>> edata = ed.dt.mimic_2() - >>> edata_norm = ep.pp.log_norm(edata, copy=True) + >>> import numpy as np + >>> edata = ed.dt.physionet2012(layer="tem_data") + >>> ep.pp.offset_negative_values(edata, layer="tem_data") + >>> np.nanmax(edata.layers["tem_data"]) + 36400.0 + >>> ep.pp.log_norm(edata, layer="tem_data") + >>> np.nanmax(edata.layers["tem_data"]) + 10.502379 """ if isinstance(vars, str): vars = [vars] @@ -480,33 +615,45 @@ def log_norm( edata = _prep_edata_norm(edata, copy) - edata_to_check_for_negatives = edata[:, vars] if vars else edata - offset_tmp_applied = edata_to_check_for_negatives.X + offset + X = edata.X if layer is None else edata.layers[layer] + + if vars: + var_indices = _get_var_indices(edata, vars) + check_data = X[:, var_indices] if X.ndim == 2 else X[:, var_indices, :] + else: + check_data = X + + offset_tmp_applied = check_data + offset if np.any(offset_tmp_applied < 0): + data_type = f"Layer '{layer}'" if layer else "Matrix X" raise ValueError( - "Matrix X contains negative values. " + f"{data_type} contains negative values. " "Undefined behavior for log normalization. " - "Please specifiy a higher offset to this function " + "Please specify a higher offset to this function " "or offset negative values with ep.pp.offset_negative_values()." ) - var_values = edata[:, vars].X.copy() - - if offset == 1: - np.log1p(var_values, out=var_values) + if vars: + var_indices = _get_var_indices(edata, vars) + var_values = X[:, var_indices] if X.ndim == 2 else X[:, var_indices, :] + transformed_values = _log_norm_function(var_values, offset=offset, base=base) + if layer is None: + edata.X[:, var_indices] = transformed_values + else: + if X.ndim == 3: + edata.layers[layer][:, var_indices, :] = transformed_values + else: + edata.layers[layer][:, var_indices] = transformed_values else: - var_values = var_values + offset - np.log(var_values, out=var_values) - - if base is not None: - np.divide(var_values, np.log(base), out=var_values) - - edata.X = edata.X.astype(var_values.dtype) - edata[:, vars].X = var_values + transformed_values = _log_norm_function(X, offset=offset, base=base) + if layer is None: + edata.X = transformed_values + else: + edata.layers[layer] = transformed_values _record_norm(edata, vars, "log") - return edata + return edata if copy else None def _prep_edata_norm(edata: EHRData | AnnData, copy: bool = False) -> EHRData | AnnData | None: # pragma: no cover @@ -537,31 +684,41 @@ def _record_norm(edata: EHRData | AnnData, vars: Sequence[str], method: str) -> @use_ehrdata(deprecated_after="1.0.0") -@function_2D_only() def offset_negative_values(edata: EHRData | AnnData, layer: str = None, copy: bool = False) -> EHRData | AnnData | None: """Offsets negative values into positive ones with the lowest negative value becoming 0. This is primarily used to enable the usage of functions such as log_norm that do not allow negative values for mathematical or technical reasons. + Supports both 2D and 3D data: + + - 2D data: Standard offset across observations + - 3D data: Applied to all elements across samples and timestamps + Args: edata: Central data object. layer: The layer to offset. copy: Whether to return a modified copy of the data object. Returns: - `None` if `copy=False` and modifies the passed edata, else returns an updated data object. + `None` if `copy=False` and modifies the passed edata, else returns an updated object. + + Examples: + >>> import ehrdata as ed + >>> import ehrapy as ep + >>> import numpy as np + >>> edata = ed.dt.physionet2012(layer="tem_data") + >>> np.nanmin(edata.layers["tem_data"]) + -17.8 + >>> ep.pp.offset_negative_values(edata, layer="tem_data") + >>> np.nanmin(edata.layers["tem_data"]) + 0.0 """ - if copy: - edata = edata.copy() + edata = _prep_edata_norm(edata, copy) - if layer: - minimum = np.min(edata.layers[layer]) - if minimum < 0: - edata.layers[layer] = edata.layers[layer] + np.abs(minimum) - else: - minimum = np.min(edata.X) - if minimum < 0: - edata.X = edata.X + np.abs(minimum) + X = edata.X if layer is None else edata.layers[layer] + minimum = np.nanmin(X) + if minimum < 0: + np.add(X, np.abs(minimum), out=X) return edata if copy else None diff --git a/tests/conftest.py b/tests/conftest.py index a134caf2..6b8a40dc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -336,6 +336,7 @@ def edata_blobs_timeseries_small() -> ed.EHRData: seasonality=True, time_shifts=True, variable_length=False, + layer=DEFAULT_TEM_LAYER_NAME, ) edata.layers["layer_2"] = edata.X.copy() diff --git a/tests/preprocessing/test_normalization.py b/tests/preprocessing/test_normalization.py index b0681fa7..f07bb2dc 100644 --- a/tests/preprocessing/test_normalization.py +++ b/tests/preprocessing/test_normalization.py @@ -5,22 +5,20 @@ import numpy as np import pytest from anndata import AnnData -from ehrdata.core.constants import DEFAULT_TEM_LAYER_NAME +from ehrdata.core.constants import DEFAULT_TEM_LAYER_NAME, FEATURE_TYPE_KEY, NUMERIC_TAG import ehrapy as ep -from tests.conftest import ARRAY_TYPES_NONNUMERIC +from tests.conftest import ARRAY_TYPES_NONNUMERIC, ARRAY_TYPES_NUMERIC_3D_ABLE CURRENT_DIR = Path(__file__).parent from scipy import sparse def test_vars_checks(adata_to_norm): - """Test for checks that vars argument is valid.""" with pytest.raises(ValueError, match=r"Some selected vars are not numeric"): ep.pp.scale_norm(adata_to_norm, vars=["String1"]) -# TODO: check this for each function, with just default settings? @pytest.mark.parametrize( "array_type,expected_error", [ @@ -36,15 +34,8 @@ def test_norm_scale_array_types(adata_to_norm, array_type, expected_error): ep.pp.scale_norm(adata_to_norm) -def test_norm_scale_3D_edata(edata_blob_small): - ep.pp.scale_norm(edata_blob_small, layer="layer_2") - with pytest.raises(ValueError, match=r"only supports 2D data"): - ep.pp.scale_norm(edata_blob_small, layer=DEFAULT_TEM_LAYER_NAME) - - @pytest.mark.parametrize("array_type", [np.array, da.array]) def test_norm_scale(adata_to_norm, array_type): - """Test for the scaling normalization method.""" warnings.filterwarnings("ignore") adata_to_norm.X = array_type(adata_to_norm.X) ep.pp.scale_norm(adata_to_norm) @@ -104,6 +95,19 @@ def test_norm_scale_group(array_type, edata_mini_normalization): with pytest.raises(KeyError): ep.pp.scale_norm(edata_mini_casted, group_key="invalid_key", copy=True) + if isinstance(edata_mini_casted.X, da.Array): + with pytest.raises( + NotImplementedError, + match="Group-wise normalization|does not support array type.*dask", + ): + ep.pp.scale_norm( + edata_mini_casted, + vars=["sys_bp_entry", "dia_bp_entry"], + group_key="disease", + copy=True, + ) + return + edata_mini_norm = ep.pp.scale_norm( edata_mini_casted, vars=["sys_bp_entry", "dia_bp_entry"], @@ -143,15 +147,8 @@ def test_norm_minmax_array_types(adata_to_norm, array_type, expected_error): ep.pp.minmax_norm(adata_to_norm) -def test_norm_minmax_3D_edata(edata_blob_small): - ep.pp.minmax_norm(edata_blob_small, layer="layer_2") - with pytest.raises(ValueError, match=r"only supports 2D data"): - ep.pp.minmax_norm(edata_blob_small, layer=DEFAULT_TEM_LAYER_NAME) - - @pytest.mark.parametrize("array_type", ARRAY_TYPES_NONNUMERIC) def test_norm_minmax(array_type, adata_to_norm): - """Test for the minmax normalization method.""" adata_to_norm.X = array_type(adata_to_norm.X) adata_norm = ep.pp.minmax_norm(adata_to_norm, copy=True) @@ -194,6 +191,19 @@ def test_norm_minmax_group(array_type, edata_mini_normalization): with pytest.raises(KeyError): ep.pp.minmax_norm(edata_mini_casted, group_key="invalid_key", copy=True) + if isinstance(edata_mini_casted.X, da.Array): + with pytest.raises( + NotImplementedError, + match="Group-wise normalization|does not support array type.*dask", + ): + ep.pp.minmax_norm( + edata_mini_casted, + vars=["sys_bp_entry", "dia_bp_entry"], + group_key="disease", + copy=True, + ) + return + edata_mini_norm = ep.pp.minmax_norm( edata_mini_casted, vars=["sys_bp_entry", "dia_bp_entry"], @@ -224,18 +234,11 @@ def test_norm_maxabs_array_types(adata_to_norm, array_type, expected_error): ep.pp.maxabs_norm(adata_to_norm) -def test_norm_maxabs_3D_edata(edata_blob_small): - ep.pp.maxabs_norm(edata_blob_small, layer="layer_2") - with pytest.raises(ValueError, match=r"only supports 2D data"): - ep.pp.maxabs_norm(edata_blob_small, layer=DEFAULT_TEM_LAYER_NAME) - - @pytest.mark.parametrize("array_type", ARRAY_TYPES_NONNUMERIC) def test_norm_maxabs(array_type, adata_to_norm): - """Test for the maxabs normalization method.""" adata_to_norm.X = array_type(adata_to_norm.X) - if "dask" in array_type.__name__: + if isinstance(adata_to_norm.X, da.Array): with pytest.raises(NotImplementedError): adata_norm = ep.pp.maxabs_norm(adata_to_norm, copy=True) @@ -264,9 +267,9 @@ def test_norm_maxabs_group(array_type, edata_mini_normalization): edata_mini_casted = edata_mini_normalization.copy() edata_mini_casted.X = array_type(edata_mini_casted.X) - if "dask" in array_type.__name__: - with pytest.raises(NotImplementedError): - ep.pp.maxabs_norm(edata_mini_casted, copy=True) + if isinstance(edata_mini_casted.X, da.Array): + with pytest.raises(NotImplementedError, match="does not support array type.*dask"): + ep.pp.maxabs_norm(edata_mini_casted, group_key="disease", copy=True) else: with pytest.raises(KeyError): ep.pp.maxabs_norm(edata_mini_casted, group_key="invalid_key", copy=True) @@ -310,15 +313,8 @@ def test_norm_robust_scale_array_types(adata_to_norm, array_type, expected_error ep.pp.robust_scale_norm(adata_to_norm) -def test_norm_robust_scale_3D_edata(edata_blob_small): - ep.pp.robust_scale_norm(edata_blob_small, layer="layer_2") - with pytest.raises(ValueError, match=r"only supports 2D data"): - ep.pp.robust_scale_norm(edata_blob_small, layer=DEFAULT_TEM_LAYER_NAME) - - @pytest.mark.parametrize("array_type", ARRAY_TYPES_NONNUMERIC) def test_norm_robust_scale(array_type, adata_to_norm): - """Test for the robust_scale normalization method.""" adata_to_norm.X = array_type(adata_to_norm.X) adata_norm = ep.pp.robust_scale_norm(adata_to_norm, copy=True) @@ -361,6 +357,19 @@ def test_norm_robust_scale_group(array_type, edata_mini_normalization): with pytest.raises(KeyError): ep.pp.robust_scale_norm(edata_mini_casted, group_key="invalid_key", copy=True) + if isinstance(edata_mini_casted.X, da.Array): + with pytest.raises( + NotImplementedError, + match="Group-wise normalization|does not support array type.*dask", + ): + ep.pp.robust_scale_norm( + edata_mini_casted, + vars=["sys_bp_entry", "dia_bp_entry"], + group_key="disease", + copy=True, + ) + return + edata_mini_norm = ep.pp.robust_scale_norm( edata_mini_casted, vars=["sys_bp_entry", "dia_bp_entry"], @@ -392,15 +401,8 @@ def test_norm_quantile_array_types(adata_to_norm, array_type, expected_error): ep.pp.quantile_norm(adata_to_norm) -def test_norm_quantile_3D_edata(edata_blob_small): - ep.pp.quantile_norm(edata_blob_small, layer="layer_2") - with pytest.raises(ValueError, match=r"only supports 2D data"): - ep.pp.quantile_norm(edata_blob_small, layer=DEFAULT_TEM_LAYER_NAME) - - @pytest.mark.parametrize("array_type", ARRAY_TYPES_NONNUMERIC) def test_norm_quantile_uniform(array_type, adata_to_norm): - """Test for the quantile normalization method.""" warnings.filterwarnings("ignore", category=UserWarning) adata_to_norm.X = array_type(adata_to_norm.X) @@ -459,6 +461,19 @@ def test_norm_quantile_uniform_group(array_type, edata_mini_normalization): with pytest.raises(KeyError): ep.pp.quantile_norm(edata_mini_casted, group_key="invalid_key", copy=True) + if isinstance(edata_mini_casted.X, da.Array): + with pytest.raises( + NotImplementedError, + match="Group-wise normalization|does not support array type.*dask", + ): + ep.pp.quantile_norm( + edata_mini_casted, + vars=["sys_bp_entry", "dia_bp_entry"], + group_key="disease", + copy=True, + ) + return + edata_mini_norm = ep.pp.quantile_norm( edata_mini_casted, vars=["sys_bp_entry", "dia_bp_entry"], @@ -490,18 +505,11 @@ def test_norm_power_array_types(adata_to_norm, array_type, expected_error): ep.pp.power_norm(adata_to_norm) -def test_norm_power_3D_edata(edata_blob_small): - ep.pp.power_norm(edata_blob_small, layer="layer_2") - with pytest.raises(ValueError, match=r"only supports 2D data"): - ep.pp.power_norm(edata_blob_small, layer=DEFAULT_TEM_LAYER_NAME) - - @pytest.mark.parametrize("array_type", ARRAY_TYPES_NONNUMERIC) def test_norm_power(array_type, adata_to_norm): - """Test for the power transformation normalization method.""" adata_to_norm.X = array_type(adata_to_norm.X) - if "dask" in array_type.__name__: + if isinstance(adata_to_norm.X, da.Array): with pytest.raises(NotImplementedError): ep.pp.power_norm(adata_to_norm, copy=True) else: @@ -536,14 +544,14 @@ def test_norm_power_integers(edata_mini_integers_in_X): [-0.31234142], ] ) - assert np.allclose(adata_norm.X, in_days_norm) + assert np.allclose(adata_norm.X, in_days_norm, rtol=1e-4, atol=1e-4) @pytest.mark.parametrize("array_type", ARRAY_TYPES_NONNUMERIC) def test_norm_power_kwargs(array_type, adata_to_norm): adata_to_norm.X = array_type(adata_to_norm.X) - if "dask" in array_type.__name__: + if isinstance(adata_to_norm.X, da.Array): with pytest.raises(NotImplementedError): ep.pp.power_norm(adata_to_norm, copy=True) else: @@ -564,9 +572,9 @@ def test_norm_power_group(array_type, edata_mini_normalization): edata_mini_casted = edata_mini_normalization.copy() edata_mini_casted.X = array_type(edata_mini_casted.X) - if "dask" in array_type.__name__: - with pytest.raises(NotImplementedError): - ep.pp.power_norm(edata_mini_casted, copy=True) + if isinstance(edata_mini_casted.X, da.Array): + with pytest.raises(NotImplementedError, match="does not support array type.*dask"): + ep.pp.power_norm(edata_mini_casted, group_key="disease", copy=True) else: with pytest.raises(KeyError): ep.pp.power_norm(edata_mini_casted, group_key="invalid_key", copy=True) @@ -627,16 +635,7 @@ def test_norm_log_norm_array_types(adata_to_norm, array_type, expected_error): ep.pp.log_norm(adata_to_norm) -def test_norm_log_3D_edata(edata_blob_small): - edata_blob_small.X = np.abs(edata_blob_small.X) - edata_blob_small.layers[DEFAULT_TEM_LAYER_NAME] = np.abs(edata_blob_small.layers[DEFAULT_TEM_LAYER_NAME]) - ep.pp.log_norm(edata_blob_small, layer="layer_2") - with pytest.raises(ValueError, match=r"only supports 2D data"): - ep.pp.log_norm(edata_blob_small, layer=DEFAULT_TEM_LAYER_NAME) - - def test_norm_log1p(adata_to_norm): - """Test for the log normalization method.""" # Ensure that some test data is strictly positive log_adata = adata_to_norm.copy() log_adata.X[0, 4] = 1 @@ -684,7 +683,6 @@ def test_norm_log1p(adata_to_norm): def test_norm_record(adata_to_norm): - """Test for logging of applied normalization methods.""" adata_norm = ep.pp.minmax_norm(adata_to_norm, copy=True) assert adata_norm.uns["normalization"] == { @@ -701,22 +699,233 @@ def test_norm_record(adata_to_norm): def test_offset_negative_values(): - """Test for the offset_negative_values method.""" to_offset_adata = AnnData(X=np.array([[-1, -5, -10], [5, 6, -20]], dtype=np.float32)) expected_adata = AnnData(X=np.array([[19, 15, 10], [25, 26, 0]], dtype=np.float32)) assert np.array_equal(expected_adata.X, ep.pp.offset_negative_values(to_offset_adata, copy=True).X) -def test_offset_negative_values_3D_edata(edata_blob_small): - ep.pp.offset_negative_values(edata_blob_small, layer="layer_2") - with pytest.raises(ValueError, match=r"only supports 2D data"): - ep.pp.offset_negative_values(edata_blob_small, layer=DEFAULT_TEM_LAYER_NAME) - - def test_norm_numerical_only(): - """Test for the log_norm method.""" to_normalize_adata = AnnData(X=np.array([[1, 0, 0], [0, 0, 1]], dtype=np.float32)) expected_adata = AnnData(X=np.array([[0.6931472, 0, 0], [0, 0, 0.6931472]], dtype=np.float32)) assert np.array_equal(expected_adata.X, ep.pp.log_norm(to_normalize_adata, copy=True).X) + + +@pytest.mark.parametrize("array_type", ARRAY_TYPES_NUMERIC_3D_ABLE) +@pytest.mark.parametrize( + "norm_func", + [ + ep.pp.scale_norm, + ep.pp.minmax_norm, + ep.pp.maxabs_norm, + ep.pp.robust_scale_norm, + ep.pp.quantile_norm, + ep.pp.power_norm, + ], +) +def test_norm_3D(edata_blobs_timeseries_small, array_type, norm_func): + edata = edata_blobs_timeseries_small + edata.layers[DEFAULT_TEM_LAYER_NAME] = array_type(edata.layers[DEFAULT_TEM_LAYER_NAME]) + + if isinstance(edata.layers[DEFAULT_TEM_LAYER_NAME], da.Array) and norm_func in ( + ep.pp.maxabs_norm, + ep.pp.power_norm, + ): + with pytest.raises(NotImplementedError, match="does not support array type.*dask"): + norm_func(edata, layer=DEFAULT_TEM_LAYER_NAME) + return + + orig_shape = edata.layers[DEFAULT_TEM_LAYER_NAME].shape + + if norm_func == ep.pp.power_norm: + ep.pp.offset_negative_values(edata, layer=DEFAULT_TEM_LAYER_NAME) + + norm_func(edata, layer=DEFAULT_TEM_LAYER_NAME) + + assert edata.layers[DEFAULT_TEM_LAYER_NAME].shape == orig_shape + assert "normalization" in edata.uns + assert len(edata.uns["normalization"]) > 0 + + +def test_scale_norm_3D(edata_blobs_timeseries_small): + edata = edata_blobs_timeseries_small.copy() + ep.pp.scale_norm(edata, layer=DEFAULT_TEM_LAYER_NAME) + + n_obs, n_var, n_timestamps = edata.layers[DEFAULT_TEM_LAYER_NAME].shape + for var_idx in range(n_var): + flat = edata.layers[DEFAULT_TEM_LAYER_NAME][:, var_idx, :].reshape(-1) + if not np.all(np.isnan(flat)): + assert np.allclose(np.nanmean(flat), 0, atol=1e-6), f"Mean check failed for variable {var_idx}" + assert np.allclose(np.nanstd(flat), 1, atol=1e-6), f"Std check failed for variable {var_idx}" + + +def test_minmax_norm_3D(edata_blobs_timeseries_small): + edata = edata_blobs_timeseries_small.copy() + ep.pp.minmax_norm(edata, layer=DEFAULT_TEM_LAYER_NAME) + + n_obs, n_var, n_timestamps = edata.layers[DEFAULT_TEM_LAYER_NAME].shape + for var_idx in range(n_var): + flat = edata.layers[DEFAULT_TEM_LAYER_NAME][:, var_idx, :].reshape(-1) + if not np.all(np.isnan(flat)): + assert np.allclose(np.nanmin(flat), 0, atol=1e-6), f"Min check failed for variable {var_idx}" + assert np.allclose(np.nanmax(flat), 1, atol=1e-6), f"Max check failed for variable {var_idx}" + + +def test_maxabs_norm_3D(edata_blobs_timeseries_small): + edata = edata_blobs_timeseries_small.copy() + ep.pp.maxabs_norm(edata, layer=DEFAULT_TEM_LAYER_NAME) + + n_obs, n_var, n_timestamps = edata.layers[DEFAULT_TEM_LAYER_NAME].shape + for var_idx in range(n_var): + flat = edata.layers[DEFAULT_TEM_LAYER_NAME][:, var_idx, :].reshape(-1) + if not np.all(np.isnan(flat)): + assert np.allclose(np.nanmax(np.abs(flat)), 1, atol=1e-6), f"Max-abs check failed for variable {var_idx}" + + +def test_robust_scale_norm_3D(edata_blobs_timeseries_small): + edata = edata_blobs_timeseries_small.copy() + ep.pp.robust_scale_norm(edata, layer=DEFAULT_TEM_LAYER_NAME) + + n_obs, n_var, n_timestamps = edata.layers[DEFAULT_TEM_LAYER_NAME].shape + for var_idx in range(n_var): + flat = edata.layers[DEFAULT_TEM_LAYER_NAME][:, var_idx, :].reshape(-1) + if not np.all(np.isnan(flat)): + assert np.allclose(np.nanmedian(flat), 0, atol=1e-6), f"Median check failed for variable {var_idx}" + assert np.allclose(np.nanpercentile(flat, 75) - np.nanpercentile(flat, 25), 1, atol=1e-6), ( + f"IQR check failed for variable {var_idx}" + ) + + +def test_quantile_norm_3D(edata_blobs_timeseries_small): + edata = edata_blobs_timeseries_small.copy() + ep.pp.quantile_norm(edata, layer=DEFAULT_TEM_LAYER_NAME) + + n_obs, n_var, n_timestamps = edata.layers[DEFAULT_TEM_LAYER_NAME].shape + for var_idx in range(n_var): + flat = edata.layers[DEFAULT_TEM_LAYER_NAME][:, var_idx, :].reshape(-1) + if not np.all(np.isnan(flat)): + assert np.allclose(np.nanmin(flat), 0, atol=1e-6), f"Min check failed for variable {var_idx}" + assert np.allclose(np.nanmax(flat), 1, atol=1e-6), f"Max check failed for variable {var_idx}" + assert np.allclose(np.nanpercentile(flat, 25), 0.25, atol=0.05), f"Q25 check failed for variable {var_idx}" + assert np.allclose(np.nanpercentile(flat, 50), 0.5, atol=0.05), f"Q50 check failed for variable {var_idx}" + assert np.allclose(np.nanpercentile(flat, 75), 0.75, atol=0.05), f"Q75 check failed for variable {var_idx}" + + +def test_power_norm_3D(edata_blobs_timeseries_small): + edata = edata_blobs_timeseries_small.copy() + ep.pp.offset_negative_values(edata, layer=DEFAULT_TEM_LAYER_NAME) + ep.pp.power_norm(edata, layer=DEFAULT_TEM_LAYER_NAME) + + n_obs, n_var, n_timestamps = edata.layers[DEFAULT_TEM_LAYER_NAME].shape + for var_idx in range(n_var): + flat = edata.layers[DEFAULT_TEM_LAYER_NAME][:, var_idx, :].reshape(-1) + if not np.all(np.isnan(flat)): + assert np.allclose(np.nanmean(flat), 0, atol=1e-5), f"Mean check failed for variable {var_idx}" + assert np.allclose(np.nanstd(flat), 1, atol=1e-5), f"Std check failed for variable {var_idx}" + + +def test_log_norm_3D(edata_blobs_timeseries_small): + edata = edata_blobs_timeseries_small.copy() + ep.pp.offset_negative_values(edata, layer=DEFAULT_TEM_LAYER_NAME) + + layer_original = edata.layers[DEFAULT_TEM_LAYER_NAME].copy() + + ep.pp.log_norm(edata, layer=DEFAULT_TEM_LAYER_NAME) + + expected = np.log1p(layer_original) + assert np.allclose(edata.layers[DEFAULT_TEM_LAYER_NAME], expected, rtol=1e-6, equal_nan=True) + + assert not np.allclose(layer_original, edata.layers[DEFAULT_TEM_LAYER_NAME], equal_nan=True) + + +def test_offset_negative_values_3D(edata_blobs_timeseries_small): + edata = edata_blobs_timeseries_small.copy() + edata.layers[DEFAULT_TEM_LAYER_NAME] = edata.layers[DEFAULT_TEM_LAYER_NAME] - 2 + assert np.nanmin(edata.layers[DEFAULT_TEM_LAYER_NAME]) < 0 + + ep.pp.offset_negative_values(edata, layer=DEFAULT_TEM_LAYER_NAME) + + assert np.allclose(np.nanmin(edata.layers[DEFAULT_TEM_LAYER_NAME]), 0, atol=1e-10) + + non_nan_values = edata.layers[DEFAULT_TEM_LAYER_NAME][~np.isnan(edata.layers[DEFAULT_TEM_LAYER_NAME])] + assert np.all(non_nan_values >= 0) + + +@pytest.mark.parametrize("array_type", ARRAY_TYPES_NUMERIC_3D_ABLE) +@pytest.mark.parametrize( + "norm_func", + [ + ep.pp.scale_norm, + ep.pp.minmax_norm, + ep.pp.maxabs_norm, + ep.pp.robust_scale_norm, + ep.pp.quantile_norm, + ep.pp.power_norm, + ], +) +def test_norm_group_3D(edata_blobs_timeseries_small, array_type, norm_func): + edata = edata_blobs_timeseries_small + layer = DEFAULT_TEM_LAYER_NAME + edata.var[FEATURE_TYPE_KEY] = NUMERIC_TAG + + edata.layers[layer] = array_type(edata.layers[layer]) + + if norm_func == ep.pp.power_norm: + ep.pp.offset_negative_values(edata, layer=layer) + + # create two groups with different distributions + n_obs = edata.n_obs + group_size = n_obs // 2 + edata.obs["group"] = ["A"] * group_size + ["B"] * (n_obs - group_size) + + # raise NotImplementedError for all dask arrays + if isinstance(edata.layers[layer], da.Array): + with pytest.raises( + NotImplementedError, + match="Group-wise normalization|does not support array type.*dask", + ): + norm_func(edata, layer=layer, group_key="group") + return + + original_shape = edata.layers[layer].shape + layer_before = edata.layers[layer].copy() + + norm_func(edata, layer=layer, group_key="group") + + # verify shape and tracking + assert edata.layers[layer].shape == original_shape + assert "normalization" in edata.uns + assert len(edata.uns["normalization"]) > 0 + + layer_after = edata.layers[layer] + + # verify data changed + assert not np.allclose(layer_before, layer_after, equal_nan=True) + + group_a = layer_after[:group_size].flatten() + group_b = layer_after[group_size:].flatten() + group_a = group_a[~np.isnan(group_a)] + group_b = group_b[~np.isnan(group_b)] + + def near0(x): + return abs(x) < 1e-5 + + def near1(x): + return abs(x - 1.0) < 1e-5 + + # validate per-group normalization + if norm_func in {ep.pp.scale_norm, ep.pp.power_norm}: + assert near0(np.nanmean(group_a)) and near0(np.nanmean(group_b)) + assert near1(np.nanstd(group_a)) and near1(np.nanstd(group_b)) + + elif norm_func in {ep.pp.minmax_norm, ep.pp.quantile_norm}: + assert near0(np.nanmin(group_a)) and near0(np.nanmin(group_b)) + assert near1(np.nanmax(group_a)) and near1(np.nanmax(group_b)) + + elif norm_func == ep.pp.maxabs_norm: + assert near1(np.nanmax(np.abs(group_a))) + assert near1(np.nanmax(np.abs(group_b))) + + elif norm_func == ep.pp.robust_scale_norm: + assert near0(np.nanmedian(group_a)) and near0(np.nanmedian(group_b))