diff --git a/src/modacor/dataclasses/basedata.py b/src/modacor/dataclasses/basedata.py index 93378d8..285aad5 100644 --- a/src/modacor/dataclasses/basedata.py +++ b/src/modacor/dataclasses/basedata.py @@ -35,7 +35,7 @@ # make a varianceDict that quacks like a dict, but is secretly a view on the uncertainties dict -class _VarianceDict(MutableMapping, dict): +class _VarianceDict(MutableMapping): def __init__(self, parent: BaseData): self._parent = parent @@ -50,15 +50,6 @@ def __iter__(self): def __len__(self): return len(self._parent.uncertainties) - def keys(self): - # Return keys of the underlying uncertainties dict - return self._parent.uncertainties.keys() - - def items(self): - # Return items as (key, variance) pairs - tmp = {key: self[key] for key in self._parent.uncertainties} - return tmp.items() - def __contains__(self, x) -> bool: return x in self._parent.uncertainties @@ -134,6 +125,28 @@ def _copy_uncertainties(unc_dict: Dict[str, np.ndarray]) -> Dict[str, np.ndarray return {k: np.array(v, copy=True) for k, v in unc_dict.items()} +def _inherit_metadata(source: BaseData, result: BaseData) -> BaseData: + """ + Copy metadata-like attributes from `source` to `result` (axes, rank_of_data, weights) + without touching numerical content (signal, units, uncertainties). + """ + # Shallow-copy axes list to avoid aliasing of the list object itself + result.axes = list(source.axes) + + # Keep the same rank_of_data; attrs validation ensures it is still valid + result.rank_of_data = source.rank_of_data + + # Try to propagate weights; if shapes are incompatible, keep defaults + try: + arr = np.asarray(source.weights) + validate_broadcast(result.signal, arr, "weights") + result.weights = np.broadcast_to(arr, result.signal.shape).copy() + except ValueError: + logger.debug("Could not broadcast source weights to result shape; leaving default weights on result BaseData.") + + return result + + def _binary_basedata_op( left: BaseData, right: BaseData, @@ -217,7 +230,8 @@ def _binary_basedata_op( result.uncertainties[key] = sigma - return result + # Inherit metadata (axes, rank_of_data, weights) from the left operand + return _inherit_metadata(left, result) def _unary_basedata_op( @@ -258,7 +272,8 @@ def _unary_basedata_op( sigma_y[valid] = deriv[valid] * np.abs(err_b[valid]) result.uncertainties[key] = sigma_y - return result + # Preserve metadata from the original element + return _inherit_metadata(element, result) class UncertaintyOpsMixin: @@ -357,11 +372,7 @@ def __rtruediv__(self, other: Any) -> BaseData: # ---- unary dunder + convenience methods ---- def __neg__(self) -> BaseData: - return BaseData( - signal=-self.signal, - units=self.units, - uncertainties=_copy_uncertainties(self.uncertainties), - ) + return negate_basedata_element(self) def __pow__(self, exponent: float, modulo=None) -> BaseData: if modulo is not None: @@ -452,14 +463,14 @@ class BaseData(UncertaintyOpsMixin): """ # required: - # Core data array stored as an xarray DataArray + # Core data array stored as a numpy ndarray signal: np.ndarray = field( converter=signal_converter, validator=v.instance_of(np.ndarray), on_setattr=setters.validate ) # Unit of signal*scaling+offset - required input 'dimensionless' for dimensionless data units: pint.Unit = field(validator=v.instance_of(pint.Unit), converter=ureg.Unit, on_setattr=setters.validate) # type: ignore # optional: - # Dict of variances represented as xarray DataArray objects; defaulting to an empty dict + # Dict of variances represented as numpy ndarray objects; defaulting to an empty dict uncertainties: Dict[str, np.ndarray] = field( factory=dict, converter=dict_signal_converter, validator=v.instance_of(dict), on_setattr=setters.validate ) @@ -491,6 +502,14 @@ def __attrs_post_init__(self): # Validate weights validate_broadcast(self.signal, self.weights, "weights") + # Warn if axes length does not match signal.ndim + if self.axes and len(self.axes) != self.signal.ndim: + logger.debug( + "BaseData.axes length (%d) does not match signal.ndim (%d).", + len(self.axes), + self.signal.ndim, + ) + @property def variances(self) -> _VarianceDict: """ @@ -565,27 +584,102 @@ def to_units(self, new_units: pint.Unit, multiplicative_conversion=True) -> None logger.debug("No unit conversion needed, units are the same.") return + if not multiplicative_conversion: + # This path is subtle for offset units (e.g. degC <-> K) and we + # don't want to silently get uncertainties wrong. + raise NotImplementedError( + "Non-multiplicative unit conversions are not yet implemented for BaseData.\n" + "If you need this, we should design explicit rules (e.g. using delta units)." + ) + logger.debug(f"Converting from {self.units} to {new_units}.") - if multiplicative_conversion: - # simple unit conversion, can be done to scalar - # Convert signal - cfact = new_units.m_from(self.units) - self.signal *= cfact - self.units = new_units - # Convert uncertainty - for key in self.uncertainties: # fastest as far as my limited testing goes against iterating over items(): - self.uncertainties[key] *= cfact + # simple unit conversion, can be done to scalar + # Convert signal + cfact = new_units.m_from(self.units) + self.signal *= cfact + self.units = new_units + # Convert uncertainty + for key in self.uncertainties: # fastest as far as my limited testing goes against iterating over items(): + self.uncertainties[key] *= cfact + + def indexed(self, indexer: Any, *, rank_of_data: int | None = None) -> "BaseData": + """ + Return a new BaseData corresponding to ``self`` indexed by ``indexer``. + + Parameters + ---------- + indexer : + Any valid NumPy indexer (int, slice, tuple of slices, boolean mask, ...), + applied consistently to ``signal`` and all uncertainty / weight arrays. + rank_of_data : + Optional explicit rank_of_data for the returned BaseData. If omitted, + it will default to ``min(self.rank_of_data, result.signal.ndim)``. + + Notes + ----- + - Units are preserved. + - Uncertainties and weights are sliced with the same indexer where possible. + - Axes handling is conservative: existing axes are kept unchanged. If you + want axes to track slicing semantics more strictly, a higher-level + helper can be added later. + """ + sig = np.asarray(self.signal)[indexer] + # Slice uncertainties with the same indexer + new_uncs: Dict[str, np.ndarray] = {} + for k, u in self.uncertainties.items(): + u_arr = np.asarray(u, dtype=float) + # broadcast to signal shape, then apply the same indexer + u_full = np.broadcast_to(u_arr, self.signal.shape) + new_uncs[k] = u_full[indexer].copy() + + # Try to slice weights; if shapes don't line up, fall back to scalar 1.0 + try: + w_arr = np.asarray(self.weights, dtype=float) + new_weights = w_arr[indexer].copy() + except Exception: + new_weights = np.array(1.0, dtype=float) + + # Decide rank_of_data for the result + if rank_of_data is None: + new_rank = min(self.rank_of_data, np.ndim(sig)) else: - new_signal = ureg.Quantity(self.signal, self.units).to(new_units).magnitude - # Convert uncertainties - for key in self.uncertainties: - # I am not sure but I think this would be the right way for non-multiplicative conversions - self.uncertainties[key] *= new_signal / self.signal + new_rank = int(rank_of_data) + + result = BaseData( + signal=np.asarray(sig, dtype=float), + units=self.units, + uncertainties=new_uncs, + weights=new_weights, + # For now we keep axes as-is; more sophisticated axis handling can be + # added once the usage patterns are clear. + axes=list(self.axes), + rank_of_data=new_rank, + ) + return result + + def copy(self, with_axes: bool = True) -> "BaseData": + """ + Return a new BaseData with copied signal/uncertainties/weights. + Axes are shallow-copied (list copy) by default, so axis objects + themselves are still shared. + """ + new = BaseData( + signal=np.array(self.signal, copy=True), + units=self.units, + uncertainties=_copy_uncertainties(self.uncertainties), + weights=np.array(self.weights, copy=True), + axes=list(self.axes) if with_axes else [], + rank_of_data=self.rank_of_data, + ) + return new def __repr__(self): - return f"BaseData(signal={self.signal}, uncertainties={self.uncertainties}, units={self.units})" + return ( + f"BaseData(shape={self.signal.shape}, dtype={self.signal.dtype}, units={self.units}, " + f"n_uncertainties={len(self.uncertainties)}, rank_of_data={self.rank_of_data})" + ) def __str__(self): return f'{self.signal} {self.units} ± {[f"{u} ({k})" for k, u in self.uncertainties.items()]}' @@ -598,11 +692,12 @@ def __str__(self): def negate_basedata_element(element: BaseData) -> BaseData: """Negate a BaseData element with uncertainty and units propagation.""" - return BaseData( + result = BaseData( signal=-element.signal, units=element.units, uncertainties=_copy_uncertainties(element.uncertainties), ) + return _inherit_metadata(element, result) def sqrt_basedata_element(element: BaseData) -> BaseData: diff --git a/src/modacor/dataclasses/messagehandler.py b/src/modacor/dataclasses/messagehandler.py index 54b55ed..32768fe 100644 --- a/src/modacor/dataclasses/messagehandler.py +++ b/src/modacor/dataclasses/messagehandler.py @@ -14,7 +14,18 @@ import logging -# logger = logging.getLogger(__name__) +_default_handler: MessageHandler | None = None + + +def get_default_handler(level: int = logging.INFO) -> MessageHandler: + """ + MoDaCor-wide default message handler. Useful for overarching logging like in the pipeline runner. + For specific modules or classes, it's better to create dedicated named MessageHandler instances. + """ + global _default_handler + if _default_handler is None: + _default_handler = MessageHandler(level=level, name="MoDaCor") + return _default_handler class MessageHandler: @@ -24,6 +35,7 @@ class MessageHandler: Args: level (int): The logging level to use. Defaults to logging.INFO. + name (str): Logger name (typically __name__). """ def __init__(self, level: int = logging.INFO, name: str = "MoDaCor", **kwargs): @@ -33,20 +45,18 @@ def __init__(self, level: int = logging.INFO, name: str = "MoDaCor", **kwargs): self.logger = logging.getLogger(name) self.logger.setLevel(level) - self.consoleLogHandler = logging.StreamHandler() - self.consoleLogHandler.setLevel(level) + # Avoid adding multiple console handlers if this handler is created multiple times + if not any(isinstance(h, logging.StreamHandler) for h in self.logger.handlers): + console_handler = logging.StreamHandler() + console_handler.setLevel(level) - formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - self.consoleLogHandler.setFormatter(formatter) - self.logger.addHandler(self.consoleLogHandler) + formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") + console_handler.setFormatter(formatter) + self.logger.addHandler(console_handler) - def log(self, message: str, level: int = None) -> None: # , name: str = None): + def log(self, message: str, level: int = None) -> None: if level is None: level = self.level - - # if name is None: - # name = self.name - # does not take a name: # self.logger = logging.getLogger(name) self.logger.log(msg=message, level=level) def info(self, message: str): diff --git a/src/modacor/dataclasses/process_step.py b/src/modacor/dataclasses/process_step.py index 27342e4..eab87ba 100644 --- a/src/modacor/dataclasses/process_step.py +++ b/src/modacor/dataclasses/process_step.py @@ -72,6 +72,8 @@ class ProcessStep: # if the process produces intermediate arrays, they are stored here, optionally cached produced_outputs: dict[str, Any] = field(factory=dict) + # intermediate prepared data for the process step + _prepared_data: dict[str, Any] = field(factory=dict) # a message handler, supporting logging, warnings, errors, etc. emitted by the process # during execution @@ -129,6 +131,7 @@ def reset(self): self.__prepared = False self.executed = False self.produced_outputs = {} + self._prepared_data = {} def modify_config_by_dict(self, by_dict: dict = {}) -> None: """Modify the configuration of the process step by a dictionary""" diff --git a/src/modacor/modules/__init__.py b/src/modacor/modules/__init__.py index e69de29..f66d1b4 100644 --- a/src/modacor/modules/__init__.py +++ b/src/modacor/modules/__init__.py @@ -0,0 +1,31 @@ +# SPDX-License-Identifier: BSD-3-Clause +# /usr/bin/env python3 +# -*- coding: utf-8 -*- + +from __future__ import annotations + +__coding__ = "utf-8" +__authors__ = ["Brian R. Pauw"] # add names to the list as appropriate +__copyright__ = "Copyright 2025, The MoDaCor team" +__date__ = "25/11/2025" +__status__ = "Development" # "Development", "Production" +# end of header and standard imports + +# official steps are imported here for ease +from modacor.modules.base_modules.divide import Divide +from modacor.modules.base_modules.multiply import Multiply +from modacor.modules.base_modules.poisson_uncertainties import PoissonUncertainties +from modacor.modules.base_modules.reduce_dimensionality import ReduceDimensionality +from modacor.modules.base_modules.subtract import Subtract +from modacor.modules.base_modules.subtract_databundles import SubtractDatabundles +from modacor.modules.technique_modules.xs_geometry import XSGeometry + +__all__ = [ + "Divide", + "Multiply", + "PoissonUncertainties", + "ReduceDimensionality", + "SubtractDatabundles", + "Subtract", + "XSGeometry", +] diff --git a/src/modacor/modules/base_modules/reduce_dimensionality.py b/src/modacor/modules/base_modules/reduce_dimensionality.py index d28f69f..0dc18f4 100644 --- a/src/modacor/modules/base_modules/reduce_dimensionality.py +++ b/src/modacor/modules/base_modules/reduce_dimensionality.py @@ -20,9 +20,13 @@ from modacor.dataclasses.basedata import BaseData from modacor.dataclasses.databundle import DataBundle +from modacor.dataclasses.messagehandler import MessageHandler from modacor.dataclasses.process_step import ProcessStep from modacor.dataclasses.process_step_describer import ProcessStepDescriber +# Facility-pluggable logger; by default this uses std logging +logger = MessageHandler(name=__name__) + class ReduceDimensionality(ProcessStep): """ @@ -81,11 +85,15 @@ def _normalize_axes(axes: Any) -> int | tuple[int, ...] | None: - list/tuple[int] → tuple of axes """ if axes is None: + logger.debug("ReduceDimensionality: axes=None → reducing over all axes.") return None if isinstance(axes, int): + logger.debug(f"ReduceDimensionality: single axis requested: axes={axes}.") return axes # list/tuple of ints - return tuple(int(a) for a in axes) + normalized = tuple(int(a) for a in axes) + logger.debug(f"ReduceDimensionality: multiple axes requested: axes={normalized}.") + return normalized @staticmethod def _weighted_mean_with_uncertainty( @@ -159,13 +167,49 @@ def _weighted_mean_with_uncertainty( uncertainties_out[key] = sigma - return BaseData( + # --- build result BaseData (numeric content) --- + result = BaseData( signal=signal_out, units=bd.units, uncertainties=uncertainties_out, weights=np.array(1.0) if reduction == "mean" else w_sum, ) + # --- metadata: axes + rank_of_data --- + + # New dimensionality after reduction + new_ndim = result.signal.ndim + + # Determine which axes were reduced, in normalized (non-negative) form + if axis is None: + # reducing over all axes + reduced_axes_tuple: tuple[int, ...] = tuple(range(x.ndim)) + elif isinstance(axis, tuple): + reduced_axes_tuple = axis + else: + reduced_axes_tuple = (axis,) + + reduced_axes_norm: set[int] = set() + for a in reduced_axes_tuple: + a_norm = a if a >= 0 else x.ndim + a + reduced_axes_norm.add(a_norm) + + # Reduce axes metadata if we have a full set (one entry per dimension). + old_axes = bd.axes + if len(old_axes) == x.ndim: + # Keep only axes that were NOT reduced + new_axes = [ax for i, ax in enumerate(old_axes) if i not in reduced_axes_norm] + else: + # If metadata length does not match ndim, fall back to empty list + new_axes = [] + + result.axes = new_axes + + # Rank of data: cannot exceed new ndim, and should not exceed original rank + result.rank_of_data = min(bd.rank_of_data, new_ndim) + + return result + # ---------------------------- main API --------------------------------- def calculate(self) -> dict[str, DataBundle]: @@ -191,4 +235,6 @@ def calculate(self) -> dict[str, DataBundle]: databundle["signal"] = averaged output[key] = databundle + logger.info(f"ReduceDimensionality: calculation finished for {len(output)} keys.") + return output diff --git a/src/modacor/modules/technique_modules/xs_geometry.py b/src/modacor/modules/technique_modules/xs_geometry.py new file mode 100644 index 0000000..ad9c4e3 --- /dev/null +++ b/src/modacor/modules/technique_modules/xs_geometry.py @@ -0,0 +1,504 @@ +# SPDX-License-Identifier: BSD-3-Clause +# /usr/bin/env python3 +# -*- coding: utf-8 -*- + +from __future__ import annotations + +__coding__ = "utf-8" +__authors__ = ["Brian R. Pauw"] # add names to the list as appropriate +__copyright__ = "Copyright 2025, The MoDaCor team" +__date__ = "22/11/2025" +__status__ = "Development" # "Development", "Production" +# end of header and standard imports + +__version__ = "20251122.1" +__all__ = ["XSGeometry"] + +from pathlib import Path +from typing import Dict, Tuple + +import numpy as np + +from modacor import ureg +from modacor.dataclasses.basedata import BaseData +from modacor.dataclasses.helpers import basedata_from_sources +from modacor.dataclasses.messagehandler import MessageHandler +from modacor.dataclasses.process_step import ProcessStep +from modacor.dataclasses.process_step_describer import ProcessStepDescriber + +# Module-level handler; facilities can swap MessageHandler implementation as needed +logger = MessageHandler(name=__name__) + + +class XSGeometry(ProcessStep): + """ + Calculates the geometric information Q, Q0, Q1, Q2, Psi, TwoTheta, and Omega (solid angle) + for X-ray scattering data and adds them to the databundle. + + Geometry model + -------------- + * The last `rank_of_data` dimensions of `signal` are the detector dimensions, + ordered as (..., y, x) for 2D and (..., y) for 1D. + * `beam_center.signal` is given in [y, x] pixel coordinates for 2D, + and [y] for 1D. + * `pixel_size.signal` is [pixel_size_y, pixel_size_x] in length units / pixel. + * `pixel_size` is a BaseData vector of length 2 or 3 (length units): + - first component = pixel size along the "Q0" axis, + - second component = pixel size along the "Q1" axis, + - third component (if present) is currently unused. + * `detector_distance` and `wavelength` are BaseData scalars with length units. + + All computed outputs (Q, Q0, Q1, Q2, Psi, TwoTheta, Omega) are BaseData objects. + """ + + documentation = ProcessStepDescriber( + calling_name="Add Q, Psi, TwoTheta, Omega", # Omega is Solid Angle + calling_id="XSGeometry", + calling_module_path=Path(__file__), + calling_version=__version__, + required_data_keys=["signal"], # list of databundle keys required by the process + required_arguments=[ + "detector_distance_source", + "detector_distance_units_source", + "detector_distance_uncertainties_sources", + "pixel_size_source", + "pixel_size_units_source", + "pixel_size_uncertainties_sources", + "beam_center_source", + "beam_center_units_source", + "beam_center_uncertainties_sources", + "wavelength_source", + "wavelength_units_source", + "wavelength_uncertainties_sources", + ], # list of argument key-val combos required by the process + calling_arguments={ + "detector_distance_source": None, + "detector_distance_units_source": None, + "detector_distance_uncertainties_sources": {}, + "pixel_size_source": None, + "pixel_size_units_source": None, + "pixel_size_uncertainties_sources": {}, + "beam_center_source": None, + "beam_center_units_source": None, + "beam_center_uncertainties_sources": {}, + "wavelength_source": None, + "wavelength_units_source": None, + "wavelength_uncertainties_sources": {}, + }, + works_on={ + "Q": ["signal", "uncertainties"], + "Q0": ["signal", "uncertainties"], + "Q1": ["signal", "uncertainties"], + "Q2": ["signal", "uncertainties"], + "Psi": ["signal", "uncertainties"], + "TwoTheta": ["signal", "uncertainties"], + "Omega": ["signal", "uncertainties"], + }, + step_keywords=[ + "geometry", + "Q", + "Psi", + "TwoTheta", + "Solid Angle", + "Omega", + "X-ray scattering", + ], + step_doc="Add geometric information Q, Psi, TwoTheta, and Solid Angle to the data", + step_reference="DOI 10.1088/0953-8984/25/38/383201", + step_note="This calculates geometric factors relevant for X-ray scattering data", + ) + + # ------------------------------------------------------------------ + # Small helpers: geometry loading & shape utilities + # ------------------------------------------------------------------ + + def _load_geometry(self) -> Dict[str, BaseData]: + """ + Load all required geometry parameters as BaseData objects. + + Expected configuration keys: + - detector_distance + - pixel_size + - beam_center + - wavelength + for each their *_source / *_units_source / *_uncertainties_sources. + """ + geom: Dict[str, BaseData] = {} + required_keys = ["detector_distance", "pixel_size", "beam_center", "wavelength"] + + logger.debug( + f"XSGeometry: loading geometry for keys {required_keys} " + f"from configuration for processing keys={self.configuration.get('with_processing_keys')}" + ) + + for key in required_keys: + for subkey in [f"{key}_source", f"{key}_units_source", f"{key}_uncertainties_sources"]: + if subkey not in self.configuration: + raise ValueError(f"Missing required configuration parameter: {subkey}") + geom[key] = basedata_from_sources( + io_sources=self.io_sources, + signal_source=self.configuration.get(f"{key}_source"), + units_source=self.configuration.get(f"{key}_units_source", None), + uncertainty_sources=self.configuration.get(f"{key}_uncertainties_sources", {}), + ) + + logger.debug( + "XSGeometry: loaded geometry BaseData objects: " + + ", ".join(f"{k}: shape={v.signal.shape}, units={v.units}" for k, v in geom.items()) + ) + + return geom + + def _validate_geometry( + self, + geom: Dict[str, BaseData], + RoD: int, + spatial_shape: tuple[int, ...], + ) -> None: + """ + Validate that geometry inputs are consistent with the detector rank. + """ + beam_center_bd = geom["beam_center"] + pixel_size_bd = geom["pixel_size"] + + if RoD not in (0, 1, 2): + raise NotImplementedError(f"XSGeometry supports RoD 0, 1, or 2; got RoD={RoD}.") # noqa: E702 + + # Beam center: for RoD>0, we expect a vector of length RoD. + if RoD > 0: + if beam_center_bd.signal.size != RoD: + raise ValueError( + f"Beam center must have {RoD} components for RoD={RoD}, got size={beam_center_bd.signal.size}." + ) + + # Pixel size: vector of 2 or 3 components. + if pixel_size_bd.shape not in ((2,), (3,)): + raise ValueError(f"Pixel size should be a 2D or 3D vector, got shape={pixel_size_bd.shape}.") + + # Sanity check on spatial_shape vs RoD + if RoD == 1 and len(spatial_shape) != 1: + raise ValueError(f"RoD=1 expects 1D spatial shape, got {spatial_shape}.") + if RoD == 2 and len(spatial_shape) != 2: + raise ValueError(f"RoD=2 expects 2D spatial shape, got {spatial_shape}.") + + logger.debug( + f"XSGeometry: validated geometry for RoD={RoD}, spatial_shape={spatial_shape}, " + f"beam_center.size={beam_center_bd.signal.size}, pixel_size.shape={pixel_size_bd.shape}" + ) + + def _make_index_basedata( + self, + shape: tuple[int, ...], + axis: int, + uncertainty_key: str = "pixel_index", + ) -> BaseData: + """ + Create a BaseData representing pixel indices along a given axis. + + Each index gets an uncertainty of ±0.5 pixel to reflect the + pixel-center assumption. + """ + if len(shape) == 0: + signal = np.array(0.0, dtype=float) + else: + grids = np.meshgrid( + *[np.arange(n, dtype=float) for n in shape], + indexing="ij", + ) + signal = grids[axis] + + # always add half-pixel uncertainty estimate to pixel indices + uncertainties: Dict[str, np.ndarray] = {uncertainty_key: np.full_like(signal, 0.5, dtype=float)} + + return BaseData( + signal=signal, + units=ureg.pixel, + uncertainties=uncertainties, + ) + + # ------------------------------------------------------------------ + # Coordinate calculation per dimensionality + # ------------------------------------------------------------------ + + def _compute_coordinates( + self, + RoD: int, + spatial_shape: tuple[int, ...], + beam_center_bd: BaseData, + px0_bd: BaseData, + px1_bd: BaseData, + detector_distance_bd: BaseData, + ) -> Tuple[BaseData, BaseData, BaseData, BaseData]: + """ + Compute detector-plane coordinates (x0, x1), in-plane radius r_perp, + and distance R from sample to pixel center, all as BaseData. + + Returns + ------- + x0_bd, x1_bd, r_perp_bd, R_bd + """ + if RoD == 0: + # 0D: no spatial axes, use the detector distance directly. + x0_bd = BaseData(signal=np.array(0.0), units=px0_bd.units) + x1_bd = BaseData(signal=np.array(0.0), units=px1_bd.units) + r_perp_bd = BaseData(signal=np.array(0.0), units=px0_bd.units) + R_bd = detector_distance_bd + logger.debug("XSGeometry: RoD=0, using detector distance directly for R.") + return x0_bd, x1_bd, r_perp_bd, R_bd + + if RoD == 1: + (n0,) = spatial_shape + idx0_bd = self._make_index_basedata(shape=(n0,), axis=0) + + rel_idx0_bd = idx0_bd - beam_center_bd.indexed(0, rank_of_data=0) + x0_bd = rel_idx0_bd * px0_bd + x1_bd = BaseData( + signal=np.zeros_like(x0_bd.signal), + units=x0_bd.units, + ) + logger.debug( + f"XSGeometry: computed 1D coordinates for shape={spatial_shape}, x0.units={x0_bd.units}, x1 is zero." + ) + + else: # RoD == 2 + # image dimensions + n0, n1 = spatial_shape + # Axis 1 (columns) → x0, Axis 0 (rows) → x1 + idx0_bd = self._make_index_basedata(shape=(n0, n1), axis=0) + idx1_bd = self._make_index_basedata(shape=(n0, n1), axis=1) + + rel_idx0_bd = idx0_bd - beam_center_bd.indexed(0, rank_of_data=0) + rel_idx1_bd = idx1_bd - beam_center_bd.indexed(1, rank_of_data=0) + + x0_bd = rel_idx0_bd * px0_bd + x1_bd = rel_idx1_bd * px1_bd + + logger.debug( + f"XSGeometry: computed 2D coordinates for spatial_shape={spatial_shape}, " + f"x0.shape={x0_bd.signal.shape}, x1.shape={x1_bd.signal.shape}" + ) + + # Common for RoD = 1, 2 + r_perp_bd = ((x0_bd**2) + (x1_bd**2)).sqrt() + R_bd = ((r_perp_bd**2) + (detector_distance_bd**2)).sqrt() + + logger.debug( + f"XSGeometry: computed r_perp and R; r_perp.shape={r_perp_bd.signal.shape}, R.shape={R_bd.signal.shape}" # noqa: E702 + ) + + return x0_bd, x1_bd, r_perp_bd, R_bd + + # ------------------------------------------------------------------ + # Derived quantities: angles, Q, Psi, solid angle + # ------------------------------------------------------------------ + + def _compute_angles( + self, + r_perp_bd: BaseData, + detector_distance_bd: BaseData, + ) -> Tuple[BaseData, BaseData, BaseData]: + """ + Compute 2θ, θ, and sin(θ) as BaseData. + """ + ratio_bd = r_perp_bd / detector_distance_bd # dimensionless + two_theta_bd = ratio_bd.arctan() # radians + theta_bd = 0.5 * two_theta_bd # radians + sin_theta_bd = theta_bd.sin() # dimensionless + + logger.debug( + f"XSGeometry: computed angles; two_theta.units={two_theta_bd.units}, theta.units={theta_bd.units}" # noqa: E702 + ) + + return two_theta_bd, theta_bd, sin_theta_bd + + def _compute_Q_and_components( + self, + sin_theta_bd: BaseData, + wavelength_bd: BaseData, + x0_bd: BaseData, + x1_bd: BaseData, + r_perp_bd: BaseData, + ) -> Tuple[BaseData, BaseData, BaseData, BaseData]: + """ + Compute Q magnitude and components Q0, Q1, Q2. + """ + four_pi = 4.0 * np.pi + Q_bd = (four_pi * sin_theta_bd) / wavelength_bd + + safe_signal = np.where(r_perp_bd.signal == 0.0, 1.0, r_perp_bd.signal) + r_perp_safe_bd = BaseData(signal=safe_signal, units=r_perp_bd.units) + + dir0_bd = x0_bd / r_perp_safe_bd + dir1_bd = x1_bd / r_perp_safe_bd + + Q0_bd = Q_bd * dir0_bd + Q1_bd = Q_bd * dir1_bd + Q2_bd = BaseData(signal=np.zeros_like(Q_bd.signal), units=Q_bd.units) + + logger.debug( + f"XSGeometry: computed Q components; Q.shape={Q_bd.signal.shape}, Q.units={Q_bd.units}" # noqa: E702 + ) + return Q_bd, Q0_bd, Q1_bd, Q2_bd + + def _compute_psi( + self, + x0_bd: BaseData, + x1_bd: BaseData, + ) -> BaseData: + """ + Compute azimuthal angle Psi from nominal coordinates only (no propagated uncertainty). + + Psi = atan2(x1, x0) + """ + psi_signal = np.arctan2(x1_bd.signal, x0_bd.signal) + psi_bd = BaseData( + signal=psi_signal, + units=ureg.radian, + ) + logger.debug(f"XSGeometry: computed Psi; shape={psi_bd.signal.shape}, units={psi_bd.units}") # noqa: E702 + return psi_bd + + def _compute_solid_angle( + self, + R_bd: BaseData, + px0_bd: BaseData, + px1_bd: BaseData, + detector_distance_bd: BaseData, + ) -> BaseData: + """ + Compute solid angle per pixel (Omega) as BaseData. + + Approximation: + dΩ ≈ A * D / R³ + with A = pixel area (px0 * px1), D = detector distance, R = ray length. + """ + area_bd = px0_bd * px1_bd + R3_bd = R_bd**3 + Omega_bd = (area_bd * detector_distance_bd) / R3_bd # dimensionless (sr) + + logger.debug( + f"XSGeometry: computed solid angle; Omega.shape={Omega_bd.signal.shape}, Omega.units={Omega_bd.units}" # noqa: E702 + ) + + return Omega_bd + + # ------------------------------------------------------------------ + # Main execution methods + # ------------------------------------------------------------------ + + def prepare_execution(self): + """ + Precalculate Q, Q0, Q1, Q2, Psi, TwoTheta, and Omega as BaseData objects and + store them in self._prepared_data. + """ + super().prepare_execution() + + pkey = self.configuration.get("with_processing_keys") + signal_bd: BaseData = self.processing_data[pkey[0]]["signal"] + RoD = signal_bd.rank_of_data + spatial_shape: tuple[int, ...] = signal_bd.shape[-RoD:] if RoD > 0 else () + + logger.info(f"XSGeometry: preparing execution for keys={pkey}, RoD={RoD}, spatial_shape={spatial_shape}") + + # 2. Load and validate geometry + geom = self._load_geometry() + self._validate_geometry(geom, RoD, spatial_shape) + + detector_distance_bd = geom["detector_distance"] + pixel_size_bd = geom["pixel_size"] + beam_center_bd = geom["beam_center"] + wavelength_bd = geom["wavelength"] + + # 3. Extract pixel pitches along Q0/Q1 + px0_bd = pixel_size_bd.indexed(0, rank_of_data=0) + px1_bd = pixel_size_bd.indexed(1, rank_of_data=0) + + # 4. Coordinates (x0, x1, r_perp, R) + x0_bd, x1_bd, r_perp_bd, R_bd = self._compute_coordinates( + RoD=RoD, + spatial_shape=spatial_shape, + beam_center_bd=beam_center_bd, + px0_bd=px0_bd, + px1_bd=px1_bd, + detector_distance_bd=detector_distance_bd, + ) + + # 5. Angles: 2θ, θ, sin θ + two_theta_bd, theta_bd, sin_theta_bd = self._compute_angles( + r_perp_bd=r_perp_bd, + detector_distance_bd=detector_distance_bd, + ) + + # 6. Q magnitude and 7. components + Q_bd, Q0_bd, Q1_bd, Q2_bd = self._compute_Q_and_components( + sin_theta_bd=sin_theta_bd, + wavelength_bd=wavelength_bd, + x0_bd=x0_bd, + x1_bd=x1_bd, + r_perp_bd=r_perp_bd, + ) + + # 8. Psi + Psi_bd = self._compute_psi( + x0_bd=x0_bd, + x1_bd=x1_bd, + ) + + # 9. Solid angle (Omega) + Omega_bd = self._compute_solid_angle( + R_bd=R_bd, + px0_bd=px0_bd, + px1_bd=px1_bd, + detector_distance_bd=detector_distance_bd, + ) + + # 10. Set rank_of_data on outputs and stash in prepared_data + for bd in (Q_bd, Q0_bd, Q1_bd, Q2_bd, Psi_bd, theta_bd, Omega_bd): + bd.rank_of_data = RoD + + self._prepared_data = { + "Q": Q_bd, + "Q0": Q0_bd, + "Q1": Q1_bd, + "Q2": Q2_bd, + "Psi": Psi_bd, + "TwoTheta": two_theta_bd, + "Omega": Omega_bd, + } + + logger.info(f"XSGeometry: prepared geometry outputs for keys={pkey}: Q, Q0, Q1, Q2, Psi, TwoTheta, Omega.") + + def calculate(self): + """ + Add Q, Q0, Q1, Q2, Psi, TwoTheta, and Omega (solid angle) as BaseData objects + to the databundles specified in 'with_processing_keys'. + """ + data = self.processing_data + output: Dict[str, object] = {} + + with_keys = self.configuration.get("with_processing_keys", []) + if not with_keys: + logger.warning("XSGeometry: no with_processing_keys specified; nothing to calculate.") + return output + + logger.info(f"XSGeometry: adding geometry outputs to keys={with_keys}") + + for key in with_keys: + databundle = data.get(key) + if databundle is None: + logger.warning(f"XSGeometry: processing_data has no entry for key={key!r}; skipping.") # noqa: E702 + continue + + databundle["Q"] = self._prepared_data["Q"] + databundle["Q0"] = self._prepared_data["Q0"] + databundle["Q1"] = self._prepared_data["Q1"] + databundle["Q2"] = self._prepared_data["Q2"] + databundle["Psi"] = self._prepared_data["Psi"] + databundle["TwoTheta"] = self._prepared_data["TwoTheta"] + databundle["Omega"] = self._prepared_data["Omega"] + + output[key] = databundle + + logger.info(f"XSGeometry: geometry outputs attached for {len(output)} keys.") + + return output diff --git a/src/modacor/runner/process_step_registry.py b/src/modacor/runner/process_step_registry.py index 9fd6393..2e101da 100644 --- a/src/modacor/runner/process_step_registry.py +++ b/src/modacor/runner/process_step_registry.py @@ -7,44 +7,136 @@ __coding__ = "utf-8" __authors__ = ["Brian R. Pauw"] # add names to the list as appropriate __copyright__ = "Copyright 2025, The MoDaCor team" -__date__ = "16/11/2025" +__date__ = "25/11/2025" __status__ = "Development" # "Development", "Production" # end of header and standard imports import importlib import re +from pathlib import Path from typing import Dict, Type from ..dataclasses.process_step import ProcessStep +# --------------------------------------------------------------------------- +# Name / path helpers +# --------------------------------------------------------------------------- + def _pascal_to_snake(name: str) -> str: - """Convert from PascalCase to snake_case module name.""" - return re.sub(r"(? xs_geometry + Divide -> divide + Q2Mapper -> q2_mapper + APIClient -> api_client + """ + return re.sub(r"(? str: + """ + Convert a file path under the given package_root into a dotted module name. + + Example + ------- + py_file = /.../modacor/modules/technique_modules/xs_geometry.py + package_root = /.../modacor + + -> "modacor.modules.technique_modules.xs_geometry" + """ + rel = py_file.with_suffix("").relative_to(package_root) + return ".".join((package_root.name, *rel.parts)) + + +def find_module(modules_root: Path, module_name: str) -> str: + """ + Find the fully-qualified module path for a snake_case module name. + + Searches under: modules/**/.py + + Parameters + ---------- + modules_root : + Path to the 'modules' directory, e.g. .../modacor/modules + module_name : + Snake_case module name, e.g. "xs_geometry", "divide". + + Returns + ------- + str + Fully-qualified module path, e.g. + "modacor.modules.technique_modules.xs_geometry". + + Raises + ------ + ModuleNotFoundError + If no matching file is found. + RuntimeError + If multiple matching files are found (ambiguous). + """ + package_root = modules_root.parent # e.g. .../modacor + + candidates = [p for p in modules_root.rglob(f"{module_name}.py") if p.is_file()] + + if not candidates: + raise ModuleNotFoundError(f"No module file '{module_name}.py' found under {modules_root}.") + + if len(candidates) > 1: + details = ", ".join(str(c) for c in candidates) + raise RuntimeError(f"Ambiguous module name '{module_name}': found multiple candidates: {details}") + + return _path_to_module_name(candidates[0], package_root) + + +# Default locations +_DEFAULT_MODULES_ROOT = (Path(__file__).resolve().parents[1] / "modules").resolve() +_DEFAULT_CURATED_MODULE_NAME = "modacor.modules" class ProcessStepRegistry: """ Registry for ProcessStep subclasses. - Responsibilities: - - Map a logical name (usually the class name used in YAML, e.g. "Divide") - to a ProcessStep subclass. - - Optionally lazy-load modules from a base package if the class is not - yet registered. + Resolution strategy + ------------------- + 1. Check internal registry cache. + 2. Try to resolve the class from the curated root module + (by default: ``modacor.modules``). + 3. If still not found, search the filesystem under + ``modules_root/**/.py``, import it, and grab the class. """ - def __init__(self, base_package: str | None = None) -> None: + def __init__( + self, + modules_root: Path | None = None, + curated_module: str | None = _DEFAULT_CURATED_MODULE_NAME, + ) -> None: """ Parameters ---------- - base_package: - Optional base package for lazy imports, e.g. - "modacor.modules.base_modules". If provided, `get()` will try to - import `.` on cache miss. + modules_root : + Root directory for filesystem discovery. Defaults to + the 'modules' directory alongside this file, i.e. ".../modacor/modules". + curated_module : + Dotted module path for the curated/explicit process steps module. + Defaults to "modacor.modules". Set to None to disable the curated + lookup step. """ self._registry: Dict[str, Type[ProcessStep]] = {} - self._base_package = base_package + self._modules_root = (modules_root or _DEFAULT_MODULES_ROOT).resolve() + + self._curated_module_name: str | None = curated_module + self._curated_module = None + if curated_module is not None: + try: + self._curated_module = importlib.import_module(curated_module) + except ModuleNotFoundError: + # If it doesn't exist, we silently disable the curated step. + self._curated_module = None # ------------------------------------------------------------------ # # Registration / lookup API @@ -56,9 +148,9 @@ def register(self, cls: Type[ProcessStep], name: str | None = None) -> None: Parameters ---------- - cls: + cls : The ProcessStep subclass to register. - name: + name : Optional explicit name. If omitted, `cls.__name__` is used. """ if not issubclass(cls, ProcessStep): @@ -70,23 +162,47 @@ def get(self, name: str) -> Type[ProcessStep]: """ Retrieve a ProcessStep subclass by name. - On cache miss, if `base_package` is set, it will attempt a lazy import - of `.` and then look for `name` in - that module. + Resolution order + ---------------- + 1. If `name` has been explicitly registered, return it. + 2. If the curated module (e.g. modacor.modules) exposes an attribute `name`, + validate it as a ProcessStep subclass, cache and return it. + 3. Else, convert `name` from PascalCase to snake_case and search for a + module named `.py` under `modules_root/**`. + Import that module, fetch `name`, validate it, cache and return. """ + # 1. Already registered? if name in self._registry: return self._registry[name] - if self._base_package is None: + # 2. Curated module lookup (modacor.modules by default) + if self._curated_module is not None: + try: + cls = getattr(self._curated_module, name) + except AttributeError: + cls = None + else: + if not issubclass(cls, ProcessStep): + raise TypeError( + f"Object {name!r} in curated module " + f"{self._curated_module_name!r} is not a ProcessStep subclass." + ) + self._registry[name] = cls + return cls + + # 3. Filesystem-based discovery under modules_root/**/.py + module_name = _pascal_to_snake(name) + + try: + module_path = find_module(self._modules_root, module_name) + except (ModuleNotFoundError, RuntimeError) as exc: raise KeyError( - f"ProcessStep {name!r} not found in registry and no base_package configured for lazy loading." # noqa: E713 - ) + f"ProcessStep {name!r} not found in registry, not exported from " # noqa: E713 + f"{self._curated_module_name!r}, and no module file '{module_name}.py' " + f"could be resolved under {self._modules_root}." + ) from exc - # Lazy-load from base package - module_name = _pascal_to_snake(name) - module_path = f"{self._base_package}.{module_name}" module = importlib.import_module(module_path) - try: cls = getattr(module, name) except AttributeError as exc: @@ -95,13 +211,14 @@ def get(self, name: str) -> Type[ProcessStep]: if not issubclass(cls, ProcessStep): raise TypeError(f"Object {name!r} in module {module_path!r} is not a ProcessStep subclass.") - # Cache and return self._registry[name] = cls return cls - def __contains__(self, name: str) -> bool: # convenience + def __contains__(self, name: str) -> bool: return name in self._registry -# Default registry used by the Pipeline -DEFAULT_PROCESS_STEP_REGISTRY = ProcessStepRegistry(base_package="modacor.modules.base_modules") +# Default registry used by the Pipeline: +# - curated lookups via modacor.modules +# - filesystem discovery under src/modacor/modules/**/snake_case.py +DEFAULT_PROCESS_STEP_REGISTRY = ProcessStepRegistry() diff --git a/src/modacor/tests/modules/base_modules/test_reduce_dimensionality.py b/src/modacor/tests/modules/base_modules/test_reduce_dimensionality.py index 0387f44..4060dab 100644 --- a/src/modacor/tests/modules/base_modules/test_reduce_dimensionality.py +++ b/src/modacor/tests/modules/base_modules/test_reduce_dimensionality.py @@ -11,6 +11,8 @@ __status__ = "Development" # "Development", "Production" # end of header and standard imports +# import pytest +import logging import unittest import numpy as np @@ -261,3 +263,96 @@ def test_weighted_average_execution_via_call(self): # No unexpected NaNs for this simple case self.assertFalse(np.isnan(result_bd.signal).any()) self.assertFalse(np.isnan(result_bd.uncertainties["u"]).any()) + + +def test_reduce_dimensionality_rank_and_axes_reduce_as_expected(): + # 2D signal with matching axes metadata + sig = np.arange(12.0).reshape(3, 4) + bd = BaseData(signal=sig, units=ureg.dimensionless) + + # original rank and axes + bd.rank_of_data = 2 + axis0 = BaseData(signal=np.arange(3.0), units=ureg.dimensionless) + axis1 = BaseData(signal=np.arange(4.0), units=ureg.dimensionless) + bd.axes = [axis0, axis1] + + # --- reduce over axis=1 -> shape (3,) --- + out_axis1 = ReduceDimensionality._weighted_mean_with_uncertainty( + bd=bd, + axis=1, + use_weights=False, + nan_policy="omit", + reduction="mean", + ) + + # shape reduced correctly + assert out_axis1.signal.shape == (3,) + # rank reduced: min(old_rank=2, new_ndim=1) -> 1 + assert out_axis1.rank_of_data == 1 + # axes: axis 1 removed, axis 0 preserved + assert len(out_axis1.axes) == 1 + assert out_axis1.axes[0] is axis0 + + # --- reduce over all axes -> scalar --- + out_all = ReduceDimensionality._weighted_mean_with_uncertainty( + bd=bd, + axis=None, + use_weights=False, + nan_policy="omit", + reduction="mean", + ) + + # scalar output + assert out_all.signal.shape == () + # rank cannot exceed new_ndim (0) + assert out_all.rank_of_data == 0 + # axes should be empty for scalar + assert out_all.axes == [] + + +def test_reduce_dimensionality_emits_info_and_debug_logs(caplog): + """ + Ensure that ReduceDimensionality.calculate() emits at least one INFO and + one DEBUG log record via the MessageHandler-backed logger. + """ + signal = np.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]], dtype=float) + unc = 0.1 * np.ones_like(signal) + weights = np.array([[1.0], [2.0]], dtype=float) + + processing_data = ProcessingData() + bd = BaseData( + signal=signal, + units=ureg.Unit("count"), + uncertainties={"u": unc}, + weights=weights, + ) + bundle = DataBundle(signal=bd) + processing_data["bundle"] = bundle + + step = ReduceDimensionality(io_sources=TEST_IO_SOURCES) + step.modify_config_by_kwargs( + with_processing_keys=["bundle"], + axes=0, + use_weights=True, + nan_policy="omit", + reduction="mean", + ) + step.processing_data = processing_data + + logger_name = "modacor.modules.base_modules.reduce_dimensionality" + + with caplog.at_level(logging.DEBUG, logger=logger_name): + step.calculate() + + # Sanity: output exists + assert "bundle" in processing_data + out_bd: BaseData = processing_data["bundle"]["signal"] + assert isinstance(out_bd, BaseData) + + # Collect log records from expected logger + records = [rec for rec in caplog.records if rec.name == logger_name] + assert records, "Expected at least one log record from ReduceDimensionality logger." + + levels = {rec.levelno for rec in records} + assert logging.INFO in levels, "Expected at least one INFO log from ReduceDimensionality." + assert logging.DEBUG in levels, "Expected at least one DEBUG log from ReduceDimensionality." diff --git a/src/modacor/tests/modules/technique_modules/test_xsgeometry.py b/src/modacor/tests/modules/technique_modules/test_xsgeometry.py new file mode 100644 index 0000000..ae6b0ca --- /dev/null +++ b/src/modacor/tests/modules/technique_modules/test_xsgeometry.py @@ -0,0 +1,538 @@ +# SPDX-License-Identifier: BSD-3-Clause +# /usr/bin/env python3 +# -*- coding: utf-8 -*- + +from __future__ import annotations + +from modacor.dataclasses.databundle import DataBundle + +__coding__ = "utf-8" +__authors__ = ["Brian R. Pauw"] # add names to the list as appropriate +__copyright__ = "Copyright 2025, The MoDaCor team" +__date__ = "22/11/2025" +__status__ = "Development" # "Development", "Production" +# end of header and standard imports + +""" +Tests for the XSGeometry processing step. + +We test: +- Low-level geometry helpers (_compute_coordinates, _compute_angles, _compute_Q, ...) +- Basic symmetry properties for a simple 2D detector +- Simple checks for 1D and 0D cases +- A thin integration test for prepare_execution() + calculate() +""" + +import numpy as np +import pytest +from numpy.testing import assert_allclose + +from modacor import ureg +from modacor.dataclasses.basedata import BaseData +from modacor.dataclasses.processing_data import ProcessingData +from modacor.io.io_sources import IoSources +from modacor.modules.technique_modules.xs_geometry import XSGeometry + +# --------------------------------------------------------------------------- +# Small helpers for building geometry BaseData +# --------------------------------------------------------------------------- + + +def _bd_scalar(value: float, unit) -> BaseData: + return BaseData(signal=np.array(float(value), dtype=float), units=unit) + + +def _bd_vector(values, unit) -> BaseData: + return BaseData(signal=np.asarray(values, dtype=float), units=unit) + + +def make_geom_2d(n0: int, n1: int): + """ + Convenience helper: simple 2D geometry + + - Detector: n0 x n1 + - Detector distance: 1.0 m with 1 mm uncertainty + - Pixel size: 1e-3 m/pixel in both directions (with small uncertainty) + - Beam center: exact centre pixel (with 0.25 pixel uncertainty) + - Wavelength: 1 Å (1e-10 m) with 2% uncertainty + """ + # --- detector distance: 1.0 m ± 1 mm --- + D_value = 1.0 + D_unc = 1e-3 # 1 mm + D_bd = BaseData( + signal=np.array(D_value, dtype=float), + units=ureg.meter, + uncertainties={"propagate_to_all": np.array(D_unc, dtype=float)}, + ) + + # --- pixel size: 1e-3 m/pixel ± 1e-6 m/pixel --- + pixel_signal = np.asarray([1e-3, 1e-3], dtype=float) + pixel_unc = np.full_like(pixel_signal, 1e-6, dtype=float) + pixel_size_bd = BaseData( + signal=pixel_signal, + units=ureg.meter / ureg.pixel, + uncertainties={"propagate_to_all": pixel_unc}, + ) + + # --- beam centre: at the centre pixel, ±0.25 pixel --- + center_row = (n0 - 1) / 2.0 + center_col = (n1 - 1) / 2.0 + beam_signal = np.asarray([center_row, center_col], dtype=float) + beam_unc = np.full_like(beam_signal, 0.25, dtype=float) + beam_center_bd = BaseData( + signal=beam_signal, + units=ureg.pixel, + uncertainties={"pixel_index": beam_unc}, + ) + + # --- wavelength: 1 Å ± 2% --- + lambda_value = 1.0e-10 # 1 Å in meters + lambda_unc = 0.02 * lambda_value + wavelength_bd = BaseData( + signal=np.array(lambda_value, dtype=float), + units=ureg.meter, + uncertainties={"propagate_to_all": np.array(lambda_unc, dtype=float)}, + ) + + return D_bd, pixel_size_bd, beam_center_bd, wavelength_bd + + +def make_geom_1d(n: int): + """ + Simple 1D geometry: n pixels in a line. + + Units follow the same conventions as make_geom_2d. + """ + # --- detector distance: 1.0 m ± 1 mm --- + D_value = 1.0 + D_unc = 1e-3 + D_bd = BaseData( + signal=np.array(D_value, dtype=float), + units=ureg.meter, + uncertainties={"propagate_to_all": np.array(D_unc, dtype=float)}, + ) + + # --- pixel size: 1e-3 m/pixel ± 1e-6 m/pixel --- + pixel_signal = np.asarray([1e-3, 1e-3], dtype=float) + pixel_unc = np.full_like(pixel_signal, 1e-6, dtype=float) + pixel_size_bd = BaseData( + signal=pixel_signal, + units=ureg.meter / ureg.pixel, + uncertainties={"propagate_to_all": pixel_unc}, + ) + + # --- beam centre: central pixel ±0.25 pixel --- + center = (n - 1) / 2.0 + beam_signal = np.asarray([center], dtype=float) + beam_unc = np.full_like(beam_signal, 0.25, dtype=float) + beam_center_bd = BaseData( + signal=beam_signal, + units=ureg.pixel, + uncertainties={"pixel_index": beam_unc}, + ) + + # --- wavelength: 1 Å ± 2% --- + lambda_value = 1.0e-10 + lambda_unc = 0.02 * lambda_value + wavelength_bd = BaseData( + signal=np.array(lambda_value, dtype=float), + units=ureg.meter, + uncertainties={"propagate_to_all": np.array(lambda_unc, dtype=float)}, + ) + + return D_bd, pixel_size_bd, beam_center_bd, wavelength_bd + + +# --------------------------------------------------------------------------- +# Tests for helper methods (math / symmetry) +# --------------------------------------------------------------------------- + + +def test_xsgeometry_2d_center_q_zero_and_symmetry(): + """ + For a symmetric 2D detector with the beam at the center: + - Q at the center pixel should be ≈ 0. + - With the current convention (x0 along rows, x1 along columns): + * Q1 is antisymmetric left-right, Q0 symmetric left-right. + * Q0 is antisymmetric up-down, Q1 symmetric up-down. + - Psi should behave consistently with Psi = atan2(x1, x0): + right of center ~ +π/2 + left of center ~ -π/2 + above center ~ π + below center ~ 0 + - Omega (solid angle) should be largest at the beam center and smaller at corners. + """ + step = XSGeometry(io_sources=IoSources()) + + n0, n1 = 5, 5 + spatial_shape = (n0, n1) + D_bd, pixel_size_bd, beam_center_bd, wavelength_bd = make_geom_2d(n0, n1) + px0_bd, px1_bd = pixel_size_bd.indexed(0, rank_of_data=0), pixel_size_bd.indexed(1, rank_of_data=0) + + # coordinates + x0_bd, x1_bd, r_perp_bd, R_bd = step._compute_coordinates( + RoD=2, + spatial_shape=spatial_shape, + beam_center_bd=beam_center_bd, + px0_bd=px0_bd, + px1_bd=px1_bd, + detector_distance_bd=D_bd, + ) + + # angles, Q magnitude, components, Psi, Omega + _, theta_bd, sin_theta_bd = step._compute_angles( + r_perp_bd=r_perp_bd, + detector_distance_bd=D_bd, + ) + Q_bd, Q0_bd, Q1_bd, Q2_bd = step._compute_Q_and_components( + sin_theta_bd=sin_theta_bd, + wavelength_bd=wavelength_bd, + x0_bd=x0_bd, + x1_bd=x1_bd, + r_perp_bd=r_perp_bd, + ) + Psi_bd = step._compute_psi(x0_bd=x0_bd, x1_bd=x1_bd) + Omega_bd = step._compute_solid_angle( + R_bd=R_bd, + px0_bd=px0_bd, + px1_bd=px1_bd, + detector_distance_bd=D_bd, + ) + + center = (n0 // 2, n1 // 2) + # index order: (row, col) == (y, x) + row_c, col_c = center + + # Q at center ≈ 0 + assert_allclose(Q_bd.signal[center], 0.0, atol=1e-12) + assert_allclose(Q0_bd.signal[center], 0.0, atol=1e-12) + assert_allclose(Q1_bd.signal[center], 0.0, atol=1e-12) + assert_allclose(Q2_bd.signal[center], 0.0, atol=1e-12) + + # left-right symmetry (vary columns at fixed central row) + col_left = col_c - 1 + col_right = col_c + 1 + q0_left = Q0_bd.signal[row_c, col_left] + q0_right = Q0_bd.signal[row_c, col_right] + q1_left = Q1_bd.signal[row_c, col_left] + q1_right = Q1_bd.signal[row_c, col_right] + + # Q1 antisymmetric, Q0 symmetric with current convention + assert_allclose(abs(q1_left), abs(q1_right), rtol=1e-6, atol=1e-12) + assert q1_left == pytest.approx(-q1_right, rel=1e-6, abs=1e-12) + assert_allclose(q0_left, q0_right, rtol=1e-6, atol=1e-12) + + # up-down symmetry (vary rows at fixed central column) + row_up = row_c - 1 + row_down = row_c + 1 + q0_up = Q0_bd.signal[row_up, col_c] + q0_down = Q0_bd.signal[row_down, col_c] + q1_up = Q1_bd.signal[row_up, col_c] + q1_down = Q1_bd.signal[row_down, col_c] + + # Q0 antisymmetric, Q1 symmetric + assert_allclose(abs(q0_up), abs(q0_down), rtol=1e-6, atol=1e-12) + assert q0_up == pytest.approx(-q0_down, rel=1e-6, abs=1e-12) + assert_allclose(q1_up, q1_down, rtol=1e-6, atol=1e-12) + + # Psi behaviour (x0 vertical / rows, x1 horizontal / columns) + psi_right = Psi_bd.signal[row_c, col_right] + psi_left = Psi_bd.signal[row_c, col_left] + psi_up = Psi_bd.signal[row_up, col_c] + psi_down = Psi_bd.signal[row_down, col_c] + + # right of center -> x1 > 0, x0 ~ 0 → +π/2 + assert psi_right == pytest.approx(+np.pi / 2.0, abs=1e-4) + # left of center -> x1 < 0, x0 ~ 0 → -π/2 + assert psi_left == pytest.approx(-np.pi / 2.0, abs=1e-4) + # above centre -> x0 < 0, x1 ~ 0 → π + assert psi_up == pytest.approx(np.pi, abs=1e-4) + # below centre -> x0 > 0, x1 ~ 0 → 0 + assert psi_down == pytest.approx(0.0, abs=1e-4) + + # Omega largest at center, smaller at corner + omega_center = Omega_bd.signal[center] + omega_corner = Omega_bd.signal[0, 0] + assert omega_corner < omega_center + assert np.all(Omega_bd.signal > 0.0) + + # sanity: theta increases with radius + theta_row = theta_bd.signal[row_c, :] + assert theta_row[col_c] == pytest.approx(0.0, abs=1e-12) + assert theta_row[col_left] > theta_row[col_c] + assert theta_row[col_right] > theta_row[col_c] + + Q_mag_from_components = (Q0_bd**2 + Q1_bd**2 + Q2_bd**2).sqrt() + assert_allclose( + Q_mag_from_components.signal, + Q_bd.signal, + rtol=1e-10, + atol=1e-12, + ) + + +def test_xsgeometry_1d_center_q_zero_and_monotonic(): + """ + For a symmetric 1D detector: + - Q at the center pixel should be ≈ 0. + - |Q| should increase as we move away from the center. + - Q1 and Q2 should be zero. + """ + step = XSGeometry(io_sources=IoSources()) + + n = 7 + spatial_shape = (n,) + D_bd, pixel_size_bd, beam_center_bd, wavelength_bd = make_geom_1d(n) + px0_bd, px1_bd = pixel_size_bd.indexed(0, rank_of_data=0), pixel_size_bd.indexed(1, rank_of_data=0) + + x0_bd, x1_bd, r_perp_bd, R_bd = step._compute_coordinates( + RoD=1, + spatial_shape=spatial_shape, + beam_center_bd=beam_center_bd, + px0_bd=px0_bd, + px1_bd=px1_bd, + detector_distance_bd=D_bd, + ) + + _, theta_bd, sin_theta_bd = step._compute_angles( + r_perp_bd=r_perp_bd, + detector_distance_bd=D_bd, + ) + + Q_bd, Q0_bd, Q1_bd, Q2_bd = step._compute_Q_and_components( + sin_theta_bd=sin_theta_bd, + wavelength_bd=wavelength_bd, + x0_bd=x0_bd, + x1_bd=x1_bd, + r_perp_bd=r_perp_bd, + ) + + center = n // 2 + + # center ≈ 0 + assert_allclose(Q_bd.signal[center], 0.0, atol=1e-12) + assert_allclose(Q0_bd.signal[center], 0.0, atol=1e-12) + # Q1, Q2 should be identically zero + assert_allclose(Q1_bd.signal, 0.0, atol=1e-12) + assert_allclose(Q2_bd.signal, 0.0, atol=1e-12) + + # |Q| grows away from center + abs_Q = np.abs(Q_bd.signal) + assert abs_Q[center + 1] > abs_Q[center] + assert abs_Q[center + 2] > abs_Q[center + 1] + + +def test_xsgeometry_0d_shapes_and_units(): + """ + For a 0D detector (scalar signal): + - All geometry outputs should be scalars. + - Q and its components should be zero. + - Omega should be positive scalar. + """ + step = XSGeometry(io_sources=IoSources()) + + # 0D: no spatial shape + spatial_shape: tuple[int, ...] = () + D_bd = _bd_scalar(1.0, ureg.meter) + # pixel size / beam center technically irrelevant for RoD=0, but we supply valid shapes + pixel_size_bd = _bd_vector([1e-3, 1e-3], ureg.meter / ureg.pixel) + beam_center_bd = _bd_vector([0.0], ureg.pixel) + wavelength_bd = _bd_scalar(1.0, ureg.meter) + + px0_bd, px1_bd = pixel_size_bd.indexed(0, rank_of_data=0), pixel_size_bd.indexed(1, rank_of_data=0) + x0_bd, x1_bd, r_perp_bd, R_bd = step._compute_coordinates( + RoD=0, + spatial_shape=spatial_shape, + beam_center_bd=beam_center_bd, + px0_bd=px0_bd, + px1_bd=px1_bd, + detector_distance_bd=D_bd, + ) + + _, theta_bd, sin_theta_bd = step._compute_angles( + r_perp_bd=r_perp_bd, + detector_distance_bd=D_bd, + ) + Q_bd, Q0_bd, Q1_bd, Q2_bd = step._compute_Q_and_components( + sin_theta_bd=sin_theta_bd, + wavelength_bd=wavelength_bd, + x0_bd=x0_bd, + x1_bd=x1_bd, + r_perp_bd=r_perp_bd, + ) + Omega_bd = step._compute_solid_angle( + R_bd=R_bd, + px0_bd=px0_bd, + px1_bd=px1_bd, + detector_distance_bd=D_bd, + ) + + # all scalars + assert Q_bd.signal.shape == () + assert Q0_bd.signal.shape == () + assert Q1_bd.signal.shape == () + assert Q2_bd.signal.shape == () + assert theta_bd.signal.shape == () + assert Omega_bd.signal.shape == () + + # Q and components should be zero + assert_allclose(Q_bd.signal, 0.0, atol=1e-12) + assert_allclose(Q0_bd.signal, 0.0, atol=1e-12) + assert_allclose(Q1_bd.signal, 0.0, atol=1e-12) + assert_allclose(Q2_bd.signal, 0.0, atol=1e-12) + + # solid angle > 0 + assert Omega_bd.signal > 0.0 + + +@pytest.mark.filterwarnings("ignore:divide by zero encountered in divide:RuntimeWarning") +def test_xsgeometry_pixel_index_uncertainty_propagates_to_coordinates(): + """ + Check that the 'pixel_index' uncertainty defined on beam_center and the index grid + shows up on the detector-plane coordinates x0 and r_perp. + """ + step = XSGeometry(io_sources=IoSources()) + + n0, n1 = 5, 5 + spatial_shape = (n0, n1) + D_bd, pixel_size_bd, beam_center_bd, wavelength_bd = make_geom_2d(n0, n1) + px0_bd, px1_bd = pixel_size_bd.indexed(0, rank_of_data=0), pixel_size_bd.indexed(1, rank_of_data=0) + + x0_bd, x1_bd, r_perp_bd, R_bd = step._compute_coordinates( + RoD=2, + spatial_shape=spatial_shape, + beam_center_bd=beam_center_bd, + px0_bd=px0_bd, + px1_bd=px1_bd, + detector_distance_bd=D_bd, + ) + + # We expect 'pixel_index' uncertainties to be present on x0 and r_perp + assert "pixel_index" in x0_bd.uncertainties + assert "pixel_index" in r_perp_bd.uncertainties + + unc_x0 = x0_bd.uncertainties["pixel_index"] + unc_r = r_perp_bd.uncertainties["pixel_index"] + + # Off-centre pixels should have finite, non-zero uncertainties. + # Choose a pixel where x0 != 0 to avoid the relative-error degeneracy at x0 == 0. + row_c, col_c = n0 // 2, n1 // 2 + row_up = row_c - 1 + + assert np.isfinite(unc_x0[row_up, col_c]) + assert unc_x0[row_up, col_c] > 0.0 + + assert np.isfinite(unc_r[row_up, col_c]) + assert unc_r[row_up, col_c] > 0.0 + + +@pytest.mark.filterwarnings("ignore:divide by zero encountered in divide:RuntimeWarning") +def test_xsgeometry_Q_has_nonzero_uncertainty_off_center(): + """ + Check that the 'pixel_index' uncertainties propagate all the way to Q + and are non-zero away from the beam centre. + """ + step = XSGeometry(io_sources=IoSources()) + + n0, n1 = 5, 5 + spatial_shape = (n0, n1) + D_bd, pixel_size_bd, beam_center_bd, wavelength_bd = make_geom_2d(n0, n1) + px0_bd, px1_bd = pixel_size_bd.indexed(0, rank_of_data=0), pixel_size_bd.indexed(1, rank_of_data=0) + + x0_bd, x1_bd, r_perp_bd, R_bd = step._compute_coordinates( + RoD=2, + spatial_shape=spatial_shape, + beam_center_bd=beam_center_bd, + px0_bd=px0_bd, + px1_bd=px1_bd, + detector_distance_bd=D_bd, + ) + + _, theta_bd, sin_theta_bd = step._compute_angles( + r_perp_bd=r_perp_bd, + detector_distance_bd=D_bd, + ) + Q_bd, Q0_bd, Q1_bd, Q2_bd = step._compute_Q_and_components( + sin_theta_bd=sin_theta_bd, + wavelength_bd=wavelength_bd, + x0_bd=x0_bd, + x1_bd=x1_bd, + r_perp_bd=r_perp_bd, + ) + + # We expect the 'pixel_index' uncertainty key to exist on Q as well + assert "pixel_index" in Q_bd.uncertainties + + unc_Q = Q_bd.uncertainties["pixel_index"] + + row_c, col_c = n0 // 2, n1 // 2 + col_right = col_c + 1 + + # At the exact beam pixel, Q ≈ 0 and derivative is singular; uncertainty may be inf/NaN. + # Off-centre, we expect finite, non-zero uncertainty. + assert np.isfinite(unc_Q[row_c, col_right]) + assert unc_Q[row_c, col_right] > 0.0 + + +# --------------------------------------------------------------------------- +# Thin integration test using prepare_execution + calculate +# --------------------------------------------------------------------------- + + +def test_xsgeometry_prepare_and_calculate_integration(): + """ + Integration-style test: + - Build a minimal processing_data and configuration. + - Override _load_geometry to return synthetic BaseData. + - Run prepare_execution() and calculate(). + - Check that the expected geometry keys are present in the output databundle. + """ + step = XSGeometry(io_sources=IoSources()) + + # Build a simple 2D signal databundle + n0, n1 = 5, 5 + signal_bd = BaseData( + signal=np.ones((n0, n1), dtype=float), + units=ureg.dimensionless, + rank_of_data=2, + ) + + processing_data = ProcessingData() + # match what XSGeometry expects: processing_data["signal"]["signal"] is a BaseData + processing_data["signal"] = DataBundle({"signal": signal_bd}) + + # Fake geometry via helper; we inject this directly into _load_geometry. + D_bd, pixel_size_bd, beam_center_bd, wavelength_bd = make_geom_2d(n0, n1) + fake_geom = { + "detector_distance": D_bd, + "pixel_size": pixel_size_bd, + "beam_center": beam_center_bd, + "wavelength": wavelength_bd, + } + + # Minimal configuration + step.configuration = { + "with_processing_keys": ["signal"], + "output_processing_key": None, + } + + # Attach processing_data and bypass I/O in _load_geometry + step.processing_data = processing_data + step._prepared_data = {} + step._load_geometry = lambda: fake_geom # type: ignore[assignment] + + # Execute prepare + calculate directly (no ProcessStep.execute merge) + step.prepare_execution() + out = step.calculate() + + assert "signal" in out + databundle = out["signal"] + + for key in ["Q", "Q0", "Q1", "Q2", "Psi", "TwoTheta", "Omega"]: + assert key in databundle, f"Missing geometry key '{key}' in databundle." + assert isinstance(databundle[key], BaseData), f"{key} is not a BaseData." + + # simple sanity check on Q field + Q_bd = databundle["Q"] + center = (n0 // 2, n1 // 2) + assert_allclose(Q_bd.signal[center], 0.0, atol=1e-12) diff --git a/src/modacor/tests/runner/test_pipeline.py b/src/modacor/tests/runner/test_pipeline.py index a9ebc77..cacdd0b 100644 --- a/src/modacor/tests/runner/test_pipeline.py +++ b/src/modacor/tests/runner/test_pipeline.py @@ -200,7 +200,7 @@ def test_pipeline_from_yaml_unknown_module_raises(): # Use a registry with no base_package and no registrations, # so any lookup will fail with KeyError. - registry = ProcessStepRegistry(base_package=None) + registry = ProcessStepRegistry() yaml_str = """ name: bad_pipeline @@ -359,7 +359,7 @@ def calculate(self): return {} # Registry that only knows about DummyStep (no lazy imports) - registry = ProcessStepRegistry(base_package=None) + registry = ProcessStepRegistry() registry.register(DummyStep) # 1. Original YAML: simple 2-step linear pipeline diff --git a/src/modacor/tests/runner/test_process_step_registry.py b/src/modacor/tests/runner/test_process_step_registry.py index 6d15e3c..f8e14b0 100644 --- a/src/modacor/tests/runner/test_process_step_registry.py +++ b/src/modacor/tests/runner/test_process_step_registry.py @@ -59,36 +59,7 @@ class NotAStep: def test_get_unknown_without_base_package_raises(): - registry = ProcessStepRegistry(base_package=None) + registry = ProcessStepRegistry() with pytest.raises(KeyError): registry.get("DoesNotExistStep") - - -def test_lazy_import_success(monkeypatch): - """ - Simulate a lazily-imported ProcessStep class in a fake module. - """ - # This must match the base_package + snake_case(class_name) convention - module_name = "modacor.tests.fake_steps.dummy_lazy_step" - - # Create a fake module - mod = types.ModuleType(module_name) - - class DummyLazyStep(ProcessStep): - def execute(self, **kwargs): - pass - - setattr(mod, "DummyLazyStep", DummyLazyStep) - - # Inject into sys.modules so importlib can find it - monkeypatch.setitem(sys.modules, module_name, mod) - - registry = ProcessStepRegistry(base_package="modacor.tests.fake_steps") - - cls = registry.get("DummyLazyStep") - assert cls is DummyLazyStep - - # Should be cached now - cls_again = registry.get("DummyLazyStep") - assert cls_again is DummyLazyStep diff --git a/src/modacor/tests/test_basedata.py b/src/modacor/tests/test_basedata.py index e5147c3..e3cf0f1 100644 --- a/src/modacor/tests/test_basedata.py +++ b/src/modacor/tests/test_basedata.py @@ -4,6 +4,8 @@ from __future__ import annotations +import logging + __coding__ = "utf-8" __authors__ = ["Brian R. Pauw"] # add names to the list as appropriate __copyright__ = "Copyright 2025, The MoDaCor team" @@ -17,7 +19,7 @@ from modacor import ureg # import tiled.client # not sure what the class of tiled.client is... -from ..dataclasses.basedata import ( # adjust the import path as needed +from modacor.dataclasses.basedata import ( # adjust the import path as needed BaseData, signal_converter, validate_broadcast, @@ -35,6 +37,11 @@ def simple_basedata(): return BaseData(signal=sig, uncertainties=uncs, units=ureg.dimensionless) +# --------------------------------------------------------------------------- +# Basic helpers & broadcast tests +# --------------------------------------------------------------------------- + + def test_signal_converter_converts_scalars_and_preserves_arrays(): # Scalar int → array arr1 = signal_converter(5) @@ -90,6 +97,11 @@ def test_validate_broadcast_raises_on_incompatible_shapes(): validate_broadcast(signal, np.ones((3, 4, 4)), "bad2") +# --------------------------------------------------------------------------- +# Variances / uncertainties +# --------------------------------------------------------------------------- + + def test_initial_variances_and_uncertainties(simple_basedata): bd = simple_basedata # variances property squares each uncertainty @@ -102,6 +114,61 @@ def test_initial_variances_and_uncertainties(simple_basedata): assert bd.uncertainties["sem"] == pytest.approx(0.2) +def test_variances_setter_updates_uncertainties_and_validates_shape(simple_basedata): + bd = simple_basedata + # valid new variances (scalar and full array) + new_vars = { + "poisson": np.full((2, 3), 0.25), + "sem": 0.04, + } + bd.variances = new_vars + # uncertainties become sqrt(var) + assert np.allclose(bd.uncertainties["poisson"], 0.25**0.5) + assert bd.uncertainties["sem"] == pytest.approx(0.04**0.5) + + # invalid shape (wrong shape) + with pytest.raises(ValueError): + bd.variances = {"poisson": np.ones((3, 2))} + + +def test_variances_setitem_updates_underlying_uncertainties(simple_basedata): + bd = simple_basedata + + new_var = np.array([[4.0, 9.0, 16.0], [25.0, 36.0, 49.0]], dtype=float) + + bd.variances["poisson"] = new_var + + # Underlying uncertainties should now be sqrt(new_var) + np.testing.assert_allclose(bd.uncertainties["poisson"], np.sqrt(new_var)) + + # Reading variances again returns the original variance values + np.testing.assert_allclose(bd.variances["poisson"], new_var) + + +def test_variances_setitem_rejects_incompatible_shape(simple_basedata): + """ + Assigning a variance array that cannot broadcast to signal.shape should raise. + """ + bd = simple_basedata + + bad_var = np.ones((2,), dtype=float) # signal has shape (2,3) + + with pytest.raises(ValueError): + bd.variances["bad"] = bad_var + + +def test_variances_setter_rejects_non_dict(simple_basedata): + bd = simple_basedata + + with pytest.raises(TypeError): + bd.variances = ["not", "a", "dict"] # type: ignore[arg-type] + + +# --------------------------------------------------------------------------- +# Rank-of-data validation +# --------------------------------------------------------------------------- + + def test_validate_rank_of_data_bounds_and_ndim(): class Dummy: def __init__(self, signal, rank): @@ -126,21 +193,24 @@ def __init__(self, signal, rank): validate_rank_of_data(dummy2, type("A", (), {"name": "rank_of_data"}), 2) -def test_variances_setter_updates_uncertainties_and_validates_shape(simple_basedata): +def test_rank_of_data_validation_errors(simple_basedata): bd = simple_basedata - # valid new variances (scalar and full array) - new_vars = { - "poisson": np.full((2, 3), 0.25), - "sem": 0.04, - } - bd.variances = new_vars - # uncertainties become sqrt(var) - assert np.allclose(bd.uncertainties["poisson"], 0.25**0.5) - assert bd.uncertainties["sem"] == pytest.approx(0.04**0.5) + # valid rank + bd.rank_of_data = 1 # <= ndim + assert bd.rank_of_data == 1 - # invalid shape (wrong shape) + # invalid rank, 3 > ndim with pytest.raises(ValueError): - bd.variances = {"poisson": np.ones((3, 2))} + bd.rank_of_data = 3 + + # invalid rank > 3 + with pytest.raises(ValueError): + bd.rank_of_data = 5 + + +# --------------------------------------------------------------------------- +# Weights broadcast validation and axes tests +# --------------------------------------------------------------------------- def test_weighting_broadcast_validation(simple_basedata): @@ -155,19 +225,33 @@ def test_weighting_broadcast_validation(simple_basedata): bd.__attrs_post_init__() -def test_rank_of_data_validation_errors(simple_basedata): +def test_axes_sanity_check_logs_when_length_mismatched(simple_basedata, caplog): bd = simple_basedata - # valid rank - bd.rank_of_data = 1 # <= ndim - assert bd.rank_of_data == 1 + # signal.ndim == 2, but axes length is 1 → mismatch + bd.axes = [None] - # invalid rank, 3 > ndim - with pytest.raises(ValueError): - bd.rank_of_data = 3 + with caplog.at_level(logging.DEBUG): + bd.__attrs_post_init__() - # invalid rank > 3 - with pytest.raises(ValueError): - bd.rank_of_data = 5 + messages = [rec.getMessage() for rec in caplog.records] + assert any("BaseData.axes length" in msg for msg in messages) + + +def test_axes_sanity_check_no_log_when_length_matches(simple_basedata, caplog): + bd = simple_basedata + # signal.ndim == 2, axes length == 2 → OK + bd.axes = [None, None] + + with caplog.at_level(logging.DEBUG): + bd.__attrs_post_init__() + + messages = [rec.getMessage() for rec in caplog.records] + assert not any("BaseData.axes length" in msg for msg in messages) + + +# --------------------------------------------------------------------------- +# Unit conversion behaviour +# --------------------------------------------------------------------------- def test_to_units_converts_properly(): @@ -178,3 +262,258 @@ def test_to_units_converts_properly(): expected = sig * 100 # m to cm assert bd.units == ureg.centimeter np.testing.assert_allclose(bd.signal, expected) + + +def test_to_units_multiplicative_conversion_scales_signal_and_uncertainties(): + sig = np.array([1.0, 2.0, 3.0], dtype=float) + uncs = {"stat": np.array([0.1, 0.2, 0.3], dtype=float)} + bd = BaseData(signal=sig.copy(), units=ureg.meter, uncertainties=uncs) + + signal_before = bd.signal.copy() + uncs_before = {k: v.copy() for k, v in bd.uncertainties.items()} + + # Use same pint logic as BaseData.to_units + cfact = ureg.millimeter.m_from(ureg.meter) + + bd.to_units(ureg.millimeter, multiplicative_conversion=True) + + # Units updated + assert bd.units == ureg.millimeter + + # Signal scaled + np.testing.assert_allclose(bd.signal, signal_before * cfact) + + # Each uncertainty scaled by the same factor + for key, unc_after in bd.uncertainties.items(): + np.testing.assert_allclose(unc_after, uncs_before[key] * cfact) + + +def test_to_units_same_units_is_noop(simple_basedata): + bd = simple_basedata + + signal_before = bd.signal.copy() + uncs_before = {k: v.copy() for k, v in bd.uncertainties.items()} + + bd.to_units(ureg.dimensionless, multiplicative_conversion=True) + + # Nothing should have changed + np.testing.assert_allclose(bd.signal, signal_before) + for key, unc_after in bd.uncertainties.items(): + np.testing.assert_allclose(unc_after, uncs_before[key]) + assert bd.units == ureg.dimensionless + + +def test_to_units_incompatible_units_raises(simple_basedata): + bd = simple_basedata + # dimensionless vs. time is not compatible + with pytest.raises(ValueError): + bd.to_units(ureg.second, multiplicative_conversion=True) + + +def test_to_units_non_multiplicative_path_not_implemented(simple_basedata): + """ + Once the non-multiplicative branch in BaseData.to_units is guarded + with NotImplementedError, this ensures we don't silently do the wrong thing. + """ + bd = simple_basedata + bd.units = ureg.kelvin + + with pytest.raises(NotImplementedError): + bd.to_units(ureg.rankine, multiplicative_conversion=False) + + +# --------------------------------------------------------------------------- +# Metadata preservation in ops +# --------------------------------------------------------------------------- + + +def test_binary_ops_preserve_rank_axes_and_weights(simple_basedata): + bd = simple_basedata + + # Set some non-default metadata + bd.rank_of_data = 2 + bd.axes = [None, None] # two-dimensional signal + bd.weights = np.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]) + + other = BaseData(signal=np.ones_like(bd.signal), units=bd.units) + + # BaseData / BaseData + res = bd / other + assert res.rank_of_data == bd.rank_of_data + assert res.axes == bd.axes + assert res.axes is not bd.axes # new list, no aliasing + np.testing.assert_allclose(res.weights, bd.weights) + + # BaseData / scalar + res2 = bd / 2.0 + assert res2.rank_of_data == bd.rank_of_data + assert res2.axes == bd.axes + assert res2.axes is not bd.axes + np.testing.assert_allclose(res2.weights, bd.weights) + + +def test_unary_ops_preserve_rank_axes_and_weights(simple_basedata): + bd = simple_basedata + + bd.rank_of_data = 1 + bd.axes = [None, None] # arbitrary axes metadata + bd.weights = np.array([[1.0, 1.0, 1.0], [2.0, 2.0, 2.0]]) + + neg = -bd + sqrt_bd = bd.sqrt() + log_bd = bd.log() # valid for all positive elements; 0 will yield NaN, which is fine + + for out in (neg, sqrt_bd, log_bd): + assert out.rank_of_data == bd.rank_of_data + assert out.axes == bd.axes + assert out.axes is not bd.axes + np.testing.assert_allclose(out.weights, bd.weights) + + +# --------------------------------------------------------------------------- +# indexed() tests +# --------------------------------------------------------------------------- + + +def test_indexed_scalar_component_preserves_units_and_uncertainties(): + """ + Indexing a 1D BaseData with an integer should return a scalar BaseData + with sliced signal, uncertainties, and weights, and preserved units. + """ + sig = np.array([10.0, 20.0, 30.0], dtype=float) + uncs = {"u": np.array([1.0, 2.0, 3.0], dtype=float)} + weights = np.array([0.1, 0.2, 0.3], dtype=float) + + bd = BaseData( + signal=sig, + units=ureg.meter, + uncertainties=uncs, + weights=weights, + rank_of_data=1, + ) + + # Take the middle component + sub = bd.indexed(1) + + # Signal becomes scalar + assert sub.signal.shape == () + assert sub.signal == pytest.approx(20.0) + + # Units preserved + assert sub.units == ureg.meter + + # Uncertainties sliced + assert "u" in sub.uncertainties + assert sub.uncertainties["u"].shape == () + assert sub.uncertainties["u"] == pytest.approx(2.0) + + # Weights sliced + assert sub.weights.shape == () + assert sub.weights == pytest.approx(0.2) + + # rank_of_data default: min(original_rank, ndim_of_result) → min(1, 0) = 0 + assert sub.rank_of_data == 0 + + +def test_indexed_slice_reduces_dimension_and_preserves_metadata(): + """ + Indexing with a slice should reduce dimensionality and keep metadata + (axes, units, rank_of_data) consistent. + """ + sig = np.arange(6, dtype=float).reshape((2, 3)) + uncs = {"stat": np.ones_like(sig, dtype=float)} + weights = np.full_like(sig, 0.5, dtype=float) + + bd = BaseData( + signal=sig, + units=ureg.dimensionless, + uncertainties=uncs, + weights=weights, + axes=[None, None], + rank_of_data=2, + ) + + # Take the first row + sub = bd.indexed(0) + + # Shape reduced (2,3) -> (3,) + assert sub.signal.shape == (3,) + np.testing.assert_allclose(sub.signal, sig[0]) + + # Uncertainties and weights sliced consistently + assert "stat" in sub.uncertainties + np.testing.assert_allclose(sub.uncertainties["stat"], uncs["stat"][0]) + np.testing.assert_allclose(sub.weights, weights[0]) + + # Units preserved + assert sub.units == bd.units + + # Axes preserved as a *new* list + assert sub.axes == bd.axes + assert sub.axes is not bd.axes + + # Default rank_of_data: min(2, 1) = 1 + assert sub.rank_of_data == 1 + + # Explicit rank_of_data override works + sub0 = bd.indexed(0, rank_of_data=0) + assert sub0.rank_of_data == 0 + + +# --------------------------------------------------------------------------- +# Copy tests: +# --------------------------------------------------------------------------- + + +def test_copy_creates_independent_arrays_and_axes_list(simple_basedata): + bd = simple_basedata + + # Set some metadata so we can check it is carried over + bd.rank_of_data = 2 + bd.axes = [None, None] + bd.weights = np.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]]) + + bd_copy = bd.copy() + + # Different object + assert bd_copy is not bd + assert isinstance(bd_copy, BaseData) + + # Signal copied, not aliased + np.testing.assert_allclose(bd_copy.signal, bd.signal) + assert bd_copy.signal is not bd.signal + + # Weights copied, not aliased + np.testing.assert_allclose(bd_copy.weights, bd.weights) + assert bd_copy.weights is not bd.weights + + # Uncertainties copied, not aliased + assert set(bd_copy.uncertainties.keys()) == set(bd.uncertainties.keys()) + for key in bd.uncertainties: + np.testing.assert_allclose(bd_copy.uncertainties[key], bd.uncertainties[key]) + assert bd_copy.uncertainties[key] is not bd.uncertainties[key] + + # Axes list shallow-copied: new list, same elements + assert bd_copy.axes == bd.axes + assert bd_copy.axes is not bd.axes + # Elements themselves are the same objects (shallow copy) + for a_orig, a_copy in zip(bd.axes, bd_copy.axes): + assert a_orig is a_copy + + # rank_of_data and units preserved + assert bd_copy.rank_of_data == bd.rank_of_data + assert bd_copy.units == bd.units + + +def test_copy_without_axes_uses_empty_axes(simple_basedata): + bd = simple_basedata + bd.axes = [None, None] + + bd_copy = bd.copy(with_axes=False) + + # Axes dropped + assert bd_copy.axes == [] + # Other content still copied + np.testing.assert_allclose(bd_copy.signal, bd.signal) + for key in bd.uncertainties: + np.testing.assert_allclose(bd_copy.uncertainties[key], bd.uncertainties[key])