diff --git a/Microsoft/Windows/PowerShell/ModuleAnalysisCache b/Microsoft/Windows/PowerShell/ModuleAnalysisCache new file mode 100644 index 0000000..987d010 Binary files /dev/null and b/Microsoft/Windows/PowerShell/ModuleAnalysisCache differ diff --git a/osipy/__init__.py b/osipy/__init__.py index 447f2b0..8921e0a 100644 --- a/osipy/__init__.py +++ b/osipy/__init__.py @@ -82,7 +82,7 @@ from osipy.common.dataset import PerfusionDataset from osipy.common.io import export_bids, load_dicom, load_nifti, load_perfusion from osipy.common.parameter_map import ParameterMap - from osipy.common.types import AIFType, FittingMethod, LabelingType, Modality + from osipy.common.types import AIFType, AnalysisResult, FittingMethod, LabelingType, Modality from osipy.dce import ( ExtendedToftsModel, PatlakModel, @@ -122,6 +122,8 @@ "ASLPipeline", # AIF "ArterialInputFunction", + # Result contract + "AnalysisResult", # Pipelines "DCEPipeline", "DSCPipeline", @@ -217,6 +219,7 @@ "ParameterMap": ("osipy.common.parameter_map", "ParameterMap"), # Types "AIFType": ("osipy.common.types", "AIFType"), + "AnalysisResult": ("osipy.common.types", "AnalysisResult"), "FittingMethod": ("osipy.common.types", "FittingMethod"), "LabelingType": ("osipy.common.types", "LabelingType"), "Modality": ("osipy.common.types", "Modality"), diff --git a/osipy/common/__init__.py b/osipy/common/__init__.py index 106e658..5d6fe58 100644 --- a/osipy/common/__init__.py +++ b/osipy/common/__init__.py @@ -59,6 +59,7 @@ AcquisitionParams, AIFType, ASLAcquisitionParams, + AnalysisResult, DCEAcquisitionParams, DSCAcquisitionParams, FittingMethod, @@ -73,6 +74,8 @@ "ASLAcquisitionParams", # Acquisition parameters "AcquisitionParams", + # Result contract + "AnalysisResult", "DCEAcquisitionParams", "DSCAcquisitionParams", "DataValidationError", diff --git a/osipy/common/types.py b/osipy/common/types.py index b29d7ee..7db6983 100644 --- a/osipy/common/types.py +++ b/osipy/common/types.py @@ -256,3 +256,38 @@ class IVIMAcquisitionParams(AcquisitionParams): | ASLAcquisitionParams | IVIMAcquisitionParams ) + + +@dataclass +class AnalysisResult: + """Standardised pipeline output contract. + + Every call to :func:`osipy.pipeline.run_analysis` returns this object, + regardless of modality. Downstream consumers (CLI tools, batch scripts, + visualisation code) only need to know this single class — they never have + to branch on modality to extract parameter maps or masks. + + Attributes + ---------- + modality : Modality + Source pipeline (DCE, DSC, ASL, or IVIM). + parameter_maps : dict[str, ParameterMap] + All output parameter maps keyed by OSIPI CAPLEX name + (e.g. ``'Ktrans'``, ``'CBF'``, ``'D'``). + **Guaranteed**: always a non-empty dict, never ``None``. + quality_mask : NDArray[np.bool_] + Boolean array marking valid voxels, same spatial shape as + the parameter maps. **Guaranteed**: always present, never ``None``. + provenance : dict[str, Any] + Audit trail for reproducibility. Contains at minimum: + + * ``osipy_version`` – library version string + * ``captured_at`` – ISO-8601 UTC timestamp of the analysis run + * ``modality`` – modality value string + * ``config`` – serialised pipeline configuration dict + """ + + modality: Modality + parameter_maps: "dict[str, ParameterMap]" + quality_mask: "NDArray[np.bool_]" + provenance: "dict[str, Any]" = field(default_factory=dict) diff --git a/osipy/pipeline/__init__.py b/osipy/pipeline/__init__.py index 22671da..975077b 100644 --- a/osipy/pipeline/__init__.py +++ b/osipy/pipeline/__init__.py @@ -29,11 +29,12 @@ from osipy.pipeline.dce_pipeline import DCEPipeline, DCEPipelineConfig from osipy.pipeline.dsc_pipeline import DSCPipeline, DSCPipelineConfig from osipy.pipeline.ivim_pipeline import IVIMPipeline, IVIMPipelineConfig -from osipy.pipeline.runner import PipelineResult, run_analysis +from osipy.pipeline.runner import AnalysisResult, PipelineResult, run_analysis __all__ = [ "ASLPipeline", "ASLPipelineConfig", + "AnalysisResult", "DCEPipeline", "DCEPipelineConfig", "DSCPipeline", diff --git a/osipy/pipeline/runner.py b/osipy/pipeline/runner.py index 159efce..2df4201 100644 --- a/osipy/pipeline/runner.py +++ b/osipy/pipeline/runner.py @@ -4,6 +4,10 @@ pipelines with automatic modality detection. """ +from __future__ import annotations + +import dataclasses +import datetime from dataclasses import dataclass, field from typing import TYPE_CHECKING, Any @@ -11,7 +15,7 @@ from osipy.common.exceptions import DataValidationError from osipy.common.parameter_map import ParameterMap -from osipy.common.types import Modality +from osipy.common.types import AnalysisResult, Modality if TYPE_CHECKING: from numpy.typing import NDArray @@ -19,7 +23,12 @@ @dataclass class PipelineResult: - """Generic pipeline result container. + """Generic pipeline result container (internal). + + This class is kept for backwards compatibility with code that calls + individual ``*Pipeline.run()`` methods directly. The public-facing + unified result type returned by :func:`run_analysis` is + :class:`~osipy.common.types.AnalysisResult`. Attributes ---------- @@ -39,15 +48,82 @@ class PipelineResult: metadata: dict[str, Any] = field(default_factory=dict) +# --------------------------------------------------------------------------- +# Provenance helpers +# --------------------------------------------------------------------------- + +def _serialise_config(config: Any) -> dict[str, Any]: + """Convert a pipeline config dataclass to a JSON-safe dict.""" + if config is None: + return {} + if dataclasses.is_dataclass(config): + raw = dataclasses.asdict(config) + elif hasattr(config, "__dict__"): + raw = dict(vars(config)) + else: + return {"repr": str(config)} + + # Make values JSON-safe (convert non-primitives to str) + def _safe(v: Any) -> Any: + if isinstance(v, (str, int, float, bool, type(None))): + return v + if isinstance(v, dict): + return {k: _safe(vv) for k, vv in v.items()} + if isinstance(v, (list, tuple)): + return [_safe(i) for i in v] + return str(v) + + return {k: _safe(v) for k, v in raw.items()} + + +def _make_provenance( + modality: Modality, + config: Any, + extra_metadata: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Build the provenance dict injected into every AnalysisResult.""" + from osipy._version import __version__ + + prov: dict[str, Any] = { + "osipy_version": __version__, + "captured_at": datetime.datetime.now(datetime.UTC).isoformat( + timespec="seconds" + ), + "modality": modality.value, + "config": _serialise_config(config), + } + if extra_metadata: + prov.update(extra_metadata) + return prov + + +def _pipeline_result_to_analysis_result( + pr: PipelineResult, + provenance: dict[str, Any], +) -> AnalysisResult: + """Wrap a PipelineResult in the public AnalysisResult contract.""" + return AnalysisResult( + modality=pr.modality, + parameter_maps=pr.parameter_maps, + quality_mask=pr.quality_mask, + provenance=provenance, + ) + + +# --------------------------------------------------------------------------- +# Public entry point +# --------------------------------------------------------------------------- + def run_analysis( data: "NDArray[np.floating[Any]]", modality: Modality | str, **kwargs: Any, -) -> PipelineResult: +) -> AnalysisResult: """Run perfusion analysis with specified modality. This is a unified entry point that dispatches to the appropriate - pipeline based on the modality. + pipeline based on the modality and returns a standardised + :class:`~osipy.common.types.AnalysisResult` regardless of modality. Parameters ---------- @@ -61,8 +137,12 @@ def run_analysis( Returns ------- - PipelineResult - Analysis results. + AnalysisResult + Standardised analysis result with: + + * ``parameter_maps`` – dict of named :class:`~osipy.common.parameter_map.ParameterMap` + * ``quality_mask`` – boolean voxel-validity array (guaranteed non-None) + * ``provenance`` – audit trail (version, timestamp, config) Examples -------- @@ -78,14 +158,13 @@ def run_analysis( ... model='extended_tofts', ... ) >>> - >>> # Run IVIM analysis - >>> result = run_analysis( - ... dwi_data, - ... modality='ivim', - ... b_values=b_values, - ... ) + >>> # Uniform interface – works for every modality + >>> for name, pmap in result.parameter_maps.items(): + ... print(name, pmap.values.shape) + >>> result.quality_mask # always a bool ndarray + >>> result.provenance['osipy_version'] # reproducibility audit trail """ - # Normalize modality + # Normalise modality if isinstance(modality, str): modality_map = { "dce": Modality.DCE, @@ -108,13 +187,16 @@ def run_analysis( raise DataValidationError(msg) +# --------------------------------------------------------------------------- +# Private per-modality helpers +# --------------------------------------------------------------------------- + def _run_dce_analysis( data: "NDArray[np.floating[Any]]", **kwargs: Any -) -> PipelineResult: +) -> AnalysisResult: """Run DCE-MRI analysis.""" from osipy.pipeline.dce_pipeline import DCEPipeline, DCEPipelineConfig - # Extract configuration options model = kwargs.pop("model", "extended_tofts") aif_source = kwargs.pop("aif_source", "population") @@ -126,7 +208,7 @@ def _run_dce_analysis( pipeline = DCEPipeline(config) result = pipeline.run(data, **kwargs) - return PipelineResult( + pr = PipelineResult( modality=Modality.DCE, parameter_maps=result.fit_result.parameter_maps, quality_mask=result.fit_result.quality_mask, @@ -135,11 +217,14 @@ def _run_dce_analysis( "fitting_stats": result.fit_result.fitting_stats, }, ) + return _pipeline_result_to_analysis_result( + pr, _make_provenance(Modality.DCE, config) + ) def _run_dsc_analysis( data: "NDArray[np.floating[Any]]", **kwargs: Any -) -> PipelineResult: +) -> AnalysisResult: """Run DSC-MRI analysis.""" from osipy.pipeline.dsc_pipeline import DSCPipeline, DSCPipelineConfig @@ -154,8 +239,8 @@ def _run_dsc_analysis( pipeline = DSCPipeline(config) result = pipeline.run(data, **kwargs) - # Extract parameter maps - param_maps = { + # Extract parameter maps into a uniform dict + param_maps: dict[str, ParameterMap] = { "CBV": result.perfusion_maps.cbv, "CBF": result.perfusion_maps.cbf, "MTT": result.perfusion_maps.mtt, @@ -163,19 +248,30 @@ def _run_dsc_analysis( if result.perfusion_maps.ttp is not None: param_maps["TTP"] = result.perfusion_maps.ttp - return PipelineResult( + # Guarantee quality mask is never None + if result.perfusion_maps.quality_mask is not None: + qm = result.perfusion_maps.quality_mask + quality_mask = qm.values if hasattr(qm, "values") else qm + else: + # DSC had no mask — default to all-valid + first_map = next(iter(param_maps.values())) + arr = first_map.values if hasattr(first_map, "values") else first_map + quality_mask = np.ones(arr.shape, dtype=bool) + + pr = PipelineResult( modality=Modality.DSC, parameter_maps=param_maps, - quality_mask=result.perfusion_maps.quality_mask - if result.perfusion_maps.quality_mask is not None - else np.ones(result.perfusion_maps.cbv.data.shape, dtype=bool), + quality_mask=quality_mask, metadata={}, ) + return _pipeline_result_to_analysis_result( + pr, _make_provenance(Modality.DSC, config) + ) def _run_asl_analysis( data: "NDArray[np.floating[Any]]", **kwargs: Any -) -> PipelineResult: +) -> AnalysisResult: """Run ASL analysis.""" from osipy.asl import LabelingScheme from osipy.pipeline.asl_pipeline import ASLPipeline, ASLPipelineConfig @@ -190,25 +286,30 @@ def _run_asl_analysis( pipeline = ASLPipeline(config) - # Handle different input formats if "control_data" in kwargs: result = pipeline.run(data, **kwargs) else: - # Assume interleaved data m0_data = kwargs.pop("m0_data", 1.0) result = pipeline.run_from_alternating(data, m0_data, **kwargs) - return PipelineResult( + cbf_map = result.cbf_result.cbf_map + qm = result.cbf_result.quality_mask + quality_mask = qm.values if hasattr(qm, "values") else qm + + pr = PipelineResult( modality=Modality.ASL, - parameter_maps={"CBF": result.cbf_result.cbf_map}, - quality_mask=result.cbf_result.quality_mask, + parameter_maps={"CBF": cbf_map}, + quality_mask=quality_mask, metadata={}, ) + return _pipeline_result_to_analysis_result( + pr, _make_provenance(Modality.ASL, config) + ) def _run_ivim_analysis( data: "NDArray[np.floating[Any]]", **kwargs: Any -) -> PipelineResult: +) -> AnalysisResult: """Run IVIM analysis.""" from osipy.pipeline.ivim_pipeline import IVIMPipeline, IVIMPipelineConfig @@ -216,13 +317,20 @@ def _run_ivim_analysis( pipeline = IVIMPipeline(config) result = pipeline.run(data, **kwargs) - return PipelineResult( + fr = result.fit_result + qm = fr.quality_mask + quality_mask = qm.values if hasattr(qm, "values") else qm + + pr = PipelineResult( modality=Modality.IVIM, parameter_maps={ - "D": result.fit_result.d_map, - "D*": result.fit_result.d_star_map, - "f": result.fit_result.f_map, + "D": fr.d_map, + "D*": fr.d_star_map, + "f": fr.f_map, }, - quality_mask=result.fit_result.quality_mask, - metadata=result.fit_result.fitting_stats, + quality_mask=quality_mask, + metadata=fr.fitting_stats, + ) + return _pipeline_result_to_analysis_result( + pr, _make_provenance(Modality.IVIM, config) ) diff --git a/osipy/scripts/capture_current_state.py b/osipy/scripts/capture_current_state.py new file mode 100644 index 0000000..8800dec --- /dev/null +++ b/osipy/scripts/capture_current_state.py @@ -0,0 +1,402 @@ +"""capture_current_state.py +=========================== +**Before-the-refactor snapshot tool for osipy pipelines.** + +Run this script *before* the AnalysisResult refactor to document +(and optionally save) the exact shapes and access paths of every +result class today. After the refactor, running the script again +lets you compare old vs. new side-by-side. + +Usage +----- +From the repo root (activate your venv first!): + + python -m osipy.scripts.capture_current_state + # or, to save the snapshot as JSON: + python -m osipy.scripts.capture_current_state --save + +The script uses tiny synthetic data (4x4x2 voxels, few time-points) +so it runs in seconds without real MRI data. +""" + +from __future__ import annotations + +import argparse +import datetime +import json +import sys +import textwrap +from pathlib import Path +from typing import Any + +import numpy as np + +# ────────────────────────────────────────────────────────────────────────────── +# Helpers +# ────────────────────────────────────────────────────────────────────────────── + +SEP = "=" * 72 +INDENT = " " + + +def _banner(title: str) -> None: + print(f"\n{SEP}") + print(f" {title}") + print(SEP) + + +def _field(label: str, value: Any, depth: int = 1) -> None: + pad = INDENT * depth + print(f"{pad}{label}: {value}") + + +def _describe_array(arr: np.ndarray | None, label: str = "array") -> str: + if arr is None: + return "None" + return f"ndarray shape={arr.shape} dtype={arr.dtype}" + + +def _describe_dict(d: dict, label: str = "") -> None: + if not d: + print(f"{INDENT * 2}(empty)") + return + for k, v in d.items(): + if hasattr(v, "values"): # ParameterMap + _field( + f'["{k}"]', + f"ParameterMap name={v.name!r} shape={v.values.shape}" + f" units={v.units!r}", + depth=2, + ) + elif isinstance(v, np.ndarray): + _field(f'["{k}"]', _describe_array(v), depth=2) + else: + _field(f'["{k}"]', repr(v), depth=2) + + +# ────────────────────────────────────────────────────────────────────────────── +# Synthetic data factories +# ────────────────────────────────────────────────────────────────────────────── + +SHAPE_3D = (4, 4, 2) # spatial voxels +N_TIMEPOINTS = 12 +N_B_VALUES = 6 + +rng = np.random.default_rng(42) + + +def _make_dce_data() -> tuple[np.ndarray, np.ndarray]: + """Returns (signal [4,4,2,T], time [T]).""" + signal = rng.uniform(0.8, 1.2, (*SHAPE_3D, N_TIMEPOINTS)).astype(np.float64) + time = np.linspace(0, 60, N_TIMEPOINTS) + return signal, time + + +def _make_dsc_data() -> tuple[np.ndarray, np.ndarray]: + signal = rng.uniform(800.0, 1200.0, (*SHAPE_3D, N_TIMEPOINTS)).astype(np.float64) + time = np.linspace(0, 60, N_TIMEPOINTS) + return signal, time + + +def _make_asl_data() -> tuple[np.ndarray, np.ndarray, np.ndarray]: + label = rng.uniform(400.0, 600.0, (*SHAPE_3D, 4)).astype(np.float64) + control = label + rng.uniform(10.0, 30.0, label.shape) + m0 = np.full(SHAPE_3D, 1000.0) + return label, control, m0 + + +def _make_ivim_data() -> tuple[np.ndarray, np.ndarray]: + b_values = np.array([0, 10, 20, 50, 100, 200], dtype=np.float64) + signal = np.zeros((*SHAPE_3D, N_B_VALUES)) + for i, b in enumerate(b_values): + signal[..., i] = np.exp(-0.001 * b) * 1000.0 + signal += rng.normal(0, 5.0, signal.shape) + return signal, b_values + + +# ────────────────────────────────────────────────────────────────────────────── +# Per-pipeline snapshots +# ────────────────────────────────────────────────────────────────────────────── + +SnapshotDict = dict[str, Any] + + +def _snapshot_dce() -> SnapshotDict: + """Run DCEPipeline and inspect DCEPipelineResult.""" + _banner("DCE Pipeline -> DCEPipelineResult") + + from osipy.pipeline.dce_pipeline import DCEPipeline, DCEPipelineConfig + + signal, time = _make_dce_data() + config = DCEPipelineConfig(model="tofts", aif_source="population") + pipeline = DCEPipeline(config) + result = pipeline.run(signal, time) + + print(f"\n Result class : {type(result).__name__}") + print(f" Module : {type(result).__module__}") + + print(f"\n +-- result.fit_result ({type(result.fit_result).__name__})") + print(f" | .parameter_maps (dict):") + _describe_dict(result.fit_result.parameter_maps) + print(f" | .quality_mask : {_describe_array(result.fit_result.quality_mask)}") + print(f" | .model_name : {result.fit_result.model_name!r}") + + print(f"\n +-- result.t1_map : {result.t1_map}") + print(f" +-- result.aif : {type(result.aif).__name__}") + print(f" +-- result.config.model : {result.config.model!r}") + + print("\n ! Access path for Ktrans: result.fit_result.parameter_maps['Ktrans']") + print(" ! Access path for mask: result.fit_result.quality_mask") + + return { + "class": type(result).__name__, + "parameter_maps_keys": list(result.fit_result.parameter_maps.keys()), + "quality_mask_shape": list(result.fit_result.quality_mask.shape), + "model": result.fit_result.model_name, + "access_path_params": "result.fit_result.parameter_maps[name]", + "access_path_mask": "result.fit_result.quality_mask", + "mask_guaranteed": True, + } + + +def _snapshot_dsc() -> SnapshotDict: + """Run DSCPipeline and inspect DSCPipelineResult.""" + _banner("DSC Pipeline -> DSCPipelineResult") + + from osipy.pipeline.dsc_pipeline import DSCPipeline, DSCPipelineConfig + + signal, time = _make_dsc_data() + config = DSCPipelineConfig(te=30.0, apply_leakage_correction=False) + pipeline = DSCPipeline(config) + result = pipeline.run(signal, time) + + print(f"\n Result class : {type(result).__name__}") + + pm = result.perfusion_maps + print(f"\n +-- result.perfusion_maps ({type(pm).__name__})") + print(f" | .cbv : {_describe_array(pm.cbv.values if hasattr(pm.cbv,'values') else pm.cbv)}") + print(f" | .cbf : {_describe_array(pm.cbf.values if hasattr(pm.cbf,'values') else pm.cbf)}") + print(f" | .mtt : {_describe_array(pm.mtt.values if hasattr(pm.mtt,'values') else pm.mtt)}") + + qm = pm.quality_mask + qm_val = qm.values if hasattr(qm, "values") else qm + print(f" | .quality_mask : {_describe_array(qm_val)}") + + is_none = (qm is None) + print(f" +-- quality_mask is None? {is_none}") + print("\n ! Access path for CBF: result.perfusion_maps.cbf") + print(" ! Access path for mask: result.perfusion_maps.quality_mask (CAN BE None)") + print(" ! No dict interface – every map is a separate attribute!") + + return { + "class": type(result).__name__, + "parameter_maps_keys": ["cbv", "cbf", "mtt", "ttp"], + "quality_mask_shape": list(qm_val.shape) if qm_val is not None else None, + "access_path_params": "result.perfusion_maps. (individual attrs)", + "access_path_mask": "result.perfusion_maps.quality_mask", + "mask_guaranteed": False, + } + + +def _snapshot_asl() -> SnapshotDict: + """Run ASLPipeline and inspect ASLPipelineResult.""" + _banner("ASL Pipeline -> ASLPipelineResult") + + from osipy.asl import LabelingScheme + from osipy.pipeline.asl_pipeline import ASLPipeline, ASLPipelineConfig + + label, control, m0 = _make_asl_data() + config = ASLPipelineConfig( + labeling_scheme=LabelingScheme.PCASL, pld=1800.0 + ) + pipeline = ASLPipeline(config) + result = pipeline.run(label, control, m0) + + cbf = result.cbf_result + print(f"\n Result class : {type(result).__name__}") + print(f"\n +-- result.cbf_result ({type(cbf).__name__})") + + cbf_map = cbf.cbf_map + cbf_arr = cbf_map.values if hasattr(cbf_map, "values") else cbf_map + print(f" | .cbf_map : {_describe_array(cbf_arr)}") + + qm = cbf.quality_mask + qm_arr = qm.values if hasattr(qm, "values") else qm + print(f" | .quality_mask : {_describe_array(qm_arr)}") + print(f" +-- result.m0_map : {result.m0_map}") + + print("\n ! Access path for CBF: result.cbf_result.cbf_map") + print(" ! Access path for mask: result.cbf_result.quality_mask") + print(" ! Not a dict – only one parameter (CBF)") + + return { + "class": type(result).__name__, + "parameter_maps_keys": ["cbf_map"], + "quality_mask_shape": list(qm_arr.shape) if qm_arr is not None else None, + "access_path_params": "result.cbf_result.cbf_map", + "access_path_mask": "result.cbf_result.quality_mask", + "mask_guaranteed": True, + } + + +def _snapshot_ivim() -> SnapshotDict: + """Run IVIMPipeline and inspect IVIMPipelineResult.""" + _banner("IVIM Pipeline -> IVIMPipelineResult") + + from osipy.pipeline.ivim_pipeline import IVIMPipeline, IVIMPipelineConfig + + signal, b_values = _make_ivim_data() + config = IVIMPipelineConfig() + pipeline = IVIMPipeline(config) + result = pipeline.run(signal, b_values) + + fr = result.fit_result + print(f"\n Result class : {type(result).__name__}") + print(f"\n +-- result.fit_result ({type(fr).__name__})") + + d_arr = fr.d_map.values if hasattr(fr.d_map, "values") else fr.d_map + ds_arr = fr.d_star_map.values if hasattr(fr.d_star_map, "values") else fr.d_star_map + f_arr = fr.f_map.values if hasattr(fr.f_map, "values") else fr.f_map + qm_arr = fr.quality_mask.values if hasattr(fr.quality_mask, "values") else fr.quality_mask + + print(f" | .d_map : {_describe_array(d_arr)}") + print(f" | .d_star_map : {_describe_array(ds_arr)}") + print(f" | .f_map : {_describe_array(f_arr)}") + print(f" | .quality_mask: {_describe_array(qm_arr)}") + + print("\n ! Access path for D: result.fit_result.d_map") + print(" ! Access path for D*: result.fit_result.d_star_map") + print(" ! Access path for f: result.fit_result.f_map") + print(" ! Access path for mask: result.fit_result.quality_mask") + print(" ! Not a dict – each parameter is an individual attribute!") + + return { + "class": type(result).__name__, + "parameter_maps_keys": ["d_map", "d_star_map", "f_map"], + "quality_mask_shape": list(qm_arr.shape) if qm_arr is not None else None, + "access_path_params": "result.fit_result.", + "access_path_mask": "result.fit_result.quality_mask", + "mask_guaranteed": True, + } + + +def _snapshot_runner() -> SnapshotDict: + """Show what run_analysis() currently returns for each modality.""" + _banner("runner.run_analysis() – Unified PipelineResult wrapper") + + from osipy.pipeline.runner import PipelineResult, run_analysis + + print(f"\n PipelineResult fields:") + import dataclasses + for f in dataclasses.fields(PipelineResult): + print(f"{INDENT * 2}.{f.name}: {f.type}") + + print(f"\n [OK] run_analysis() already maps each modality -> PipelineResult") + print(f" [OK] parameter_maps is always a dict[str, ParameterMap]") + print(f" [OK] quality_mask is always a numpy bool array") + print(f" [X] No provenance/version/timestamp in PipelineResult.metadata") + print(f" [X] PipelineResult is internal – not the proposed AnalysisResult") + + return { + "class": "PipelineResult", + "has_parameter_maps_dict": True, + "has_quality_mask": True, + "has_provenance": False, + "note": "run_analysis() wraps each pipeline, but lacks provenance.", + } + + +# ────────────────────────────────────────────────────────────────────────────── +# Main +# ────────────────────────────────────────────────────────────────────────────── + +def main(save: bool = False) -> None: + from osipy._version import __version__ + + print("\n" + "=" * 72) + print(" osipy BEFORE-REFACTOR PIPELINE STATE SNAPSHOT") + print("=" * 72) + print(f" osipy version : {__version__}") + print(f" Python : {sys.version.split()[0]}") + print(f" Captured at : {datetime.datetime.now().isoformat(timespec='seconds')}") + print(f" Spatial shape : {SHAPE_3D} (synthetic – no real MRI data needed)") + + snapshot: dict[str, Any] = { + "osipy_version": __version__, + "captured_at": datetime.datetime.now().isoformat(timespec="seconds"), + "synthetic_shape": list(SHAPE_3D), + "modalities": {}, + } + + try: + snapshot["modalities"]["DCE"] = _snapshot_dce() + except Exception as exc: + print(f"\n [DCE] ERROR: {exc}") + snapshot["modalities"]["DCE"] = {"error": str(exc)} + + try: + snapshot["modalities"]["DSC"] = _snapshot_dsc() + except Exception as exc: + print(f"\n [DSC] ERROR: {exc}") + snapshot["modalities"]["DSC"] = {"error": str(exc)} + + try: + snapshot["modalities"]["ASL"] = _snapshot_asl() + except Exception as exc: + print(f"\n [ASL] ERROR: {exc}") + snapshot["modalities"]["ASL"] = {"error": str(exc)} + + try: + snapshot["modalities"]["IVIM"] = _snapshot_ivim() + except Exception as exc: + print(f"\n [IVIM] ERROR: {exc}") + snapshot["modalities"]["IVIM"] = {"error": str(exc)} + + try: + snapshot["runner"] = _snapshot_runner() + except Exception as exc: + print(f"\n [runner] ERROR: {exc}") + snapshot["runner"] = {"error": str(exc)} + + # ── Summary table ──────────────────────────────────────────────────────── + _banner("SUMMARY: The Fragmentation Problem (Before Refactor)") + print( + f" {'Modality':<8} {'Result Class':<26} {'Maps interface':<30} " + f"{'Mask Guaranteed'}" + ) + print(f" {'-'*8} {'-'*26} {'-'*30} {'-'*15}") + rows = [ + ("DCE", "DCEPipelineResult", "result.fit_result.parameter_maps[name]", "Yes"), + ("DSC", "DSCPipelineResult", "result.perfusion_maps.", "NO !"), + ("ASL", "ASLPipelineResult", "result.cbf_result.cbf_map", "Yes"), + ("IVIM", "IVIMPipelineResult", "result.fit_result._map", "Yes"), + ] + for m, cls, path, mask in rows: + print(f" {m:<8} {cls:<26} {path:<30} {mask}") + + print(f"\n -> After the AnalysisResult refactor, ALL four pipelines will expose:") + print(f" result.parameter_maps[name] (dict, always present)") + print(f" result.quality_mask (bool ndarray, never None)") + print(f" result.provenance (version + timestamp + config)") + + # ── Save ──────────────────────────────────────────────────────────────── + if save: + out_path = Path(__file__).parent / "state_before_refactor.json" + with open(out_path, "w") as fh: + json.dump(snapshot, fh, indent=2) + print(f"\n [OK] Snapshot saved -> {out_path}") + + print(f"\n{SEP}\n") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Capture osipy pipeline result structures BEFORE the AnalysisResult refactor." + ) + parser.add_argument( + "--save", + action="store_true", + help="Save snapshot as state_before_refactor.json next to this script.", + ) + args = parser.parse_args() + main(save=args.save) diff --git a/osipy/scripts/state_before_refactor.json b/osipy/scripts/state_before_refactor.json new file mode 100644 index 0000000..13f828a --- /dev/null +++ b/osipy/scripts/state_before_refactor.json @@ -0,0 +1,82 @@ +{ + "osipy_version": "0.1.1", + "captured_at": "2026-03-09T18:35:25", + "synthetic_shape": [ + 4, + 4, + 2 + ], + "modalities": { + "DCE": { + "class": "DCEPipelineResult", + "parameter_maps_keys": [ + "Ktrans", + "ve", + "r_squared" + ], + "quality_mask_shape": [ + 4, + 4, + 2 + ], + "model": "Standard Tofts", + "access_path_params": "result.fit_result.parameter_maps[name]", + "access_path_mask": "result.fit_result.quality_mask", + "mask_guaranteed": true + }, + "DSC": { + "class": "DSCPipelineResult", + "parameter_maps_keys": [ + "cbv", + "cbf", + "mtt", + "ttp" + ], + "quality_mask_shape": [ + 4, + 4, + 2 + ], + "access_path_params": "result.perfusion_maps. (individual attrs)", + "access_path_mask": "result.perfusion_maps.quality_mask", + "mask_guaranteed": false + }, + "ASL": { + "class": "ASLPipelineResult", + "parameter_maps_keys": [ + "cbf_map" + ], + "quality_mask_shape": [ + 4, + 4, + 2 + ], + "access_path_params": "result.cbf_result.cbf_map", + "access_path_mask": "result.cbf_result.quality_mask", + "mask_guaranteed": true + }, + "IVIM": { + "class": "IVIMPipelineResult", + "parameter_maps_keys": [ + "d_map", + "d_star_map", + "f_map" + ], + "quality_mask_shape": [ + 4, + 4, + 2 + ], + "access_path_params": "result.fit_result.", + "access_path_mask": "result.fit_result.quality_mask", + "mask_guaranteed": true + } + }, + "runner": { + "class": "PipelineResult", + "has_parameter_maps_dict": true, + "has_quality_mask": true, + "has_provenance": false, + "note": "run_analysis() wraps each pipeline, but lacks provenance." + } +} \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index bc0b50b..2edf1d7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,7 @@ classifiers = [ "Intended Audience :: Science/Research", "License :: OSI Approved :: MIT License", "Operating System :: OS Independent", - "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.12", "Programming Language :: Python :: 3.13", "Topic :: Scientific/Engineering :: Medical Science Apps.", diff --git a/tests/integration/test_pipeline.py b/tests/integration/test_pipeline.py index a378f71..2972e6a 100644 --- a/tests/integration/test_pipeline.py +++ b/tests/integration/test_pipeline.py @@ -6,9 +6,11 @@ from __future__ import annotations import numpy as np +import pytest -from osipy.common.types import Modality +from osipy.common.types import AnalysisResult, Modality from osipy.pipeline import ( + AnalysisResult, IVIMPipeline, IVIMPipelineConfig, PipelineResult, @@ -19,9 +21,8 @@ class TestRunAnalysis: """Tests for unified run_analysis function.""" - def test_run_analysis_returns_pipeline_result(self) -> None: - """Test that run_analysis returns PipelineResult.""" - # Create minimal IVIM data (simplest case) + def test_run_analysis_returns_analysis_result(self) -> None: + """Test that run_analysis returns AnalysisResult (the contract type).""" data = np.random.rand(8, 8, 4, 6) b_values = np.array([0, 50, 100, 200, 400, 800]) @@ -31,7 +32,7 @@ def test_run_analysis_returns_pipeline_result(self) -> None: b_values=b_values, ) - assert isinstance(result, PipelineResult) + assert isinstance(result, AnalysisResult) assert result.modality == Modality.IVIM def test_run_analysis_string_modality(self) -> None: @@ -47,28 +48,75 @@ def test_run_analysis_string_modality(self) -> None: assert result.modality == Modality.IVIM + def test_analysis_result_parameter_maps_is_dict(self) -> None: + """Test that parameter_maps is always a non-empty dict.""" + data = np.random.rand(4, 4, 2, 6) + b_values = np.array([0, 50, 100, 200, 400, 800]) + + result = run_analysis(data, modality="ivim", b_values=b_values) + + assert isinstance(result.parameter_maps, dict) + assert len(result.parameter_maps) > 0 + + def test_analysis_result_quality_mask_never_none(self) -> None: + """Test that quality_mask is always present and is a bool array.""" + data = np.random.rand(4, 4, 2, 6) + b_values = np.array([0, 50, 100, 200, 400, 800]) + + result = run_analysis(data, modality="ivim", b_values=b_values) + + assert result.quality_mask is not None + assert result.quality_mask.dtype == np.bool_ + + def test_analysis_result_provenance_fields(self) -> None: + """Test that provenance always contains the required audit fields.""" + data = np.random.rand(4, 4, 2, 6) + b_values = np.array([0, 50, 100, 200, 400, 800]) + + result = run_analysis(data, modality="ivim", b_values=b_values) + + assert "osipy_version" in result.provenance + assert "captured_at" in result.provenance + assert "modality" in result.provenance + assert "config" in result.provenance + assert result.provenance["modality"] == "ivim" + + def test_uniform_save_interface_all_modalities(self) -> None: + """AnalysisResult enables the same save code for every modality. + + This is the canonical test for the contract: downstream code that + only knows AnalysisResult should work identically for IVIM. + """ + data = np.random.rand(4, 4, 2, 6) + b_values = np.array([0, 50, 100, 200, 400, 800]) + result = run_analysis(data, modality="ivim", b_values=b_values) + + # This code works WITHOUT knowing the modality ─ that's the whole point + saved = {} + for name, pmap in result.parameter_maps.items(): + arr = pmap.values if hasattr(pmap, "values") else pmap + saved[name] = arr + assert result.quality_mask is not None + assert len(saved) > 0 + class TestIVIMPipelineIntegration: """Integration tests for IVIM pipeline.""" def test_ivim_pipeline_full_run(self) -> None: """Test full IVIM pipeline execution.""" - # Create synthetic IVIM data shape = (8, 8, 4) b_values = np.array([0, 50, 100, 200, 400, 800]) n_bvalues = len(b_values) - # Known parameters - d = 1.0e-3 # mm^2/s - d_star = 10e-3 # mm^2/s + d = 1.0e-3 + d_star = 10e-3 f = 0.1 - # Generate bi-exponential signal signal = np.zeros((*shape, n_bvalues)) for i, b in enumerate(b_values): signal[..., i] = f * np.exp(-b * d_star) + (1 - f) * np.exp(-b * d) - # Add noise signal += np.random.randn(*signal.shape) * 0.01 signal = np.maximum(signal, 0.01) @@ -76,6 +124,7 @@ def test_ivim_pipeline_full_run(self) -> None: pipeline = IVIMPipeline(config) result = pipeline.run(signal, b_values=b_values) + # pipeline.run() still returns IVIMPipelineResult (unchanged) assert result is not None assert hasattr(result, "fit_result") @@ -85,14 +134,12 @@ class TestPipelineMemoryEfficiency: def test_pipeline_handles_large_data(self) -> None: """Test pipeline can handle larger datasets without memory issues.""" - # Create moderately large data data = np.random.rand(32, 32, 8, 6).astype(np.float32) b_values = np.array([0, 50, 100, 200, 400, 800]) config = IVIMPipelineConfig() pipeline = IVIMPipeline(config) - # Should complete without memory error result = pipeline.run(data, b_values=b_values) assert result is not None @@ -104,12 +151,10 @@ def test_headless_execution_no_display(self) -> None: """Test pipeline runs without display requirements.""" import os - # Ensure no display is required original_display = os.environ.get("DISPLAY") try: os.environ.pop("DISPLAY", None) - # Run minimal analysis data = np.random.rand(4, 4, 2, 4) b_values = np.array([0, 100, 400, 800]) @@ -126,17 +171,16 @@ def test_pipeline_deterministic_results(self) -> None: data = np.random.rand(4, 4, 2, 4) b_values = np.array([0, 100, 400, 800]) - # Run twice with same seed np.random.seed(42) result1 = run_analysis(data.copy(), modality="ivim", b_values=b_values) np.random.seed(42) result2 = run_analysis(data.copy(), modality="ivim", b_values=b_values) - # Results should be identical for key in result1.parameter_maps: if key in result2.parameter_maps: - np.testing.assert_array_almost_equal( - result1.parameter_maps[key].values, - result2.parameter_maps[key].values, - ) + pm1 = result1.parameter_maps[key] + pm2 = result2.parameter_maps[key] + arr1 = pm1.values if hasattr(pm1, "values") else pm1 + arr2 = pm2.values if hasattr(pm2, "values") else pm2 + np.testing.assert_array_almost_equal(arr1, arr2)