From a335825b480e23c26163f1b75343e2ae46de2c62 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Wed, 11 Jun 2025 20:47:37 -0400 Subject: [PATCH 01/31] Add HDF5 format movie reader --- mesmerize_core/movie_readers.py | 35 +++++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/mesmerize_core/movie_readers.py b/mesmerize_core/movie_readers.py index c184682..6f5b3fc 100644 --- a/mesmerize_core/movie_readers.py +++ b/mesmerize_core/movie_readers.py @@ -2,6 +2,8 @@ from pathlib import Path import numpy as np from caiman import load_memmap +import h5py +import zarr from .utils import warning_experimental from .arrays import LazyTiff @@ -27,8 +29,11 @@ def default_reader(path: str, **kwargs): if ext in [".mmap", ".memmap"]: return caiman_memmap_reader(path, **kwargs) - else: - raise ValueError(f"No default movie reader for given file extension: '{ext}'") + + if ext in ['.hdf5', '.h5', '.nwb', '.mat', '.n5', '.zarr']: + return hdf5_reader(path, **kwargs) + + raise ValueError(f"No default movie reader for given file extension: '{ext}'") def tiff_memmap_reader(path: str, **kwargs) -> np.memmap: @@ -51,3 +56,29 @@ def pims_reader(path: str, **kwargs): if not HAS_PIMS: raise ModuleNotFoundError("you must install `pims` to use the pims reader") return pims.open(path, **kwargs) + + +def hdf5_reader(path: str, var_name_hdf5='mov'): + # based on caiman.base.movies.load_iter + extension = Path(path).suffix + if extension in ['.n5', '.zarr']: # Thankfully, the zarr library lines up closely with h5py past the initial open + f = zarr.open(path, "r") + if isinstance(f, zarr.Array): + raise RuntimeError('Expected a zarr Group, not an Array') + else: + try: + f = h5py.File(path, "r") + except: + if extension == '.mat': + raise Exception(f"Problem loading {path}: Unknown format. This may be in the original version 1 (non-hdf5) mat format; please convert it first") + else: + raise Exception(f"Problem loading {path}: Unknown format.") + + ignore_keys = ['__DATA_TYPES__'] # Known metadata that tools provide, add to this as needed. + fkeys = list(filter(lambda x: x not in ignore_keys, f.keys())) + if len(fkeys) == 1: # If the hdf5 file we're parsing has only one dataset inside it, + # ignore the arg and pick that dataset + var_name_hdf5 = fkeys[0] + Y = f.get('acquisition/' + var_name_hdf5 + '/data' + if extension == '.nwb' else var_name_hdf5) + return Y \ No newline at end of file From a868ec31ad5c6b71cceeb47c7255644eb78a7e98 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Wed, 11 Jun 2025 21:26:40 -0400 Subject: [PATCH 02/31] Correctly set var_name_hdf5 when loading hdf5 and this param is set --- mesmerize_core/caiman_extensions/common.py | 12 ++++++++++-- mesmerize_core/movie_readers.py | 4 ++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/mesmerize_core/caiman_extensions/common.py b/mesmerize_core/caiman_extensions/common.py index 9ba1a24..8e2b4cd 100644 --- a/mesmerize_core/caiman_extensions/common.py +++ b/mesmerize_core/caiman_extensions/common.py @@ -707,10 +707,18 @@ def get_input_movie( if reader is not None: if not callable(reader): raise TypeError(f"reader must be a callable type, such as a function") + else: + reader = default_reader + + # add hdf5 variable name if provided + main_params = self._series.params['main'] + if 'var_name_hdf5' in main_params: + kwargs['var_name_hdf5'] = main_params['var_name_hdf5'] + elif 'data' in main_params and 'var_name_hdf5' in main_params['data']: + kwargs['var_name_hdf5'] = main_params['data']['var_name_hdf5'] - return reader(path_str, **kwargs) + return reader(path_str, **kwargs) - return default_reader(path_str, **kwargs) @validate() def get_corr_image(self) -> np.ndarray: diff --git a/mesmerize_core/movie_readers.py b/mesmerize_core/movie_readers.py index 6f5b3fc..5895b7f 100644 --- a/mesmerize_core/movie_readers.py +++ b/mesmerize_core/movie_readers.py @@ -16,7 +16,7 @@ HAS_PIMS = False -def default_reader(path: str, **kwargs): +def default_reader(path: str, var_name_hdf5='mov', **kwargs): ext = Path(path).suffixes[-1] if ext in [".tiff", ".tif", ".btf"]: try: @@ -31,7 +31,7 @@ def default_reader(path: str, **kwargs): if ext in ['.hdf5', '.h5', '.nwb', '.mat', '.n5', '.zarr']: - return hdf5_reader(path, **kwargs) + return hdf5_reader(path, var_name_hdf5=var_name_hdf5, **kwargs) raise ValueError(f"No default movie reader for given file extension: '{ext}'") From 262cef08ad9db6c446f386249c5258b6cdcbe45e Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Sat, 10 Aug 2024 16:25:45 -0400 Subject: [PATCH 03/31] Add async run method and dview argument to algos --- mesmerize_core/algorithms/cnmf.py | 33 +++++++++++------- mesmerize_core/algorithms/cnmfe.py | 33 +++++++++++------- mesmerize_core/algorithms/mcorr.py | 32 +++++++++++------ mesmerize_core/batch_utils.py | 2 ++ mesmerize_core/caiman_extensions/common.py | 40 +++++++++++++++------- 5 files changed, 92 insertions(+), 48 deletions(-) diff --git a/mesmerize_core/algorithms/cnmf.py b/mesmerize_core/algorithms/cnmf.py index dd7381a..f4cd8a8 100644 --- a/mesmerize_core/algorithms/cnmf.py +++ b/mesmerize_core/algorithms/cnmf.py @@ -1,5 +1,6 @@ """Performs CNMF in a separate process""" +import asyncio import click import caiman as cm from caiman.source_extraction.cnmf import cnmf as cnmf @@ -21,7 +22,10 @@ from ..utils import IS_WINDOWS -def run_algo(batch_path, uuid, data_path: str = None): +def run_algo(batch_path, uuid, data_path: str = None, dview=None): + asyncio.run(run_algo_async(batch_path, uuid, data_path=data_path, dview=dview)) + +async def run_algo_async(batch_path, uuid, data_path: str = None, dview=None): algo_start = time.time() set_parent_raw_data_path(data_path) @@ -42,18 +46,23 @@ def run_algo(batch_path, uuid, data_path: str = None): f"Starting CNMF item:\n{item}\nWith params:{params}" ) - # adapted from current demo notebook - if "MESMERIZE_N_PROCESSES" in os.environ.keys(): - try: - n_processes = int(os.environ["MESMERIZE_N_PROCESSES"]) - except: - n_processes = psutil.cpu_count() - 1 + if 'multiprocessing' in str(type(dview)) and hasattr(dview, '_processes'): + n_processes = dview._processes + elif 'ipyparallel' in str(type(dview)): + n_processes = len(dview) else: - n_processes = psutil.cpu_count() - 1 - # Start cluster for parallel processing - c, dview, n_processes = cm.cluster.setup_cluster( - backend="local", n_processes=n_processes, single_thread=False - ) + # adapted from current demo notebook + if "MESMERIZE_N_PROCESSES" in os.environ.keys(): + try: + n_processes = int(os.environ["MESMERIZE_N_PROCESSES"]) + except: + n_processes = psutil.cpu_count() - 1 + else: + n_processes = psutil.cpu_count() - 1 + # Start cluster for parallel processing + c, dview, n_processes = cm.cluster.setup_cluster( + backend="multiprocessing", n_processes=n_processes, single_thread=False + ) # merge cnmf and eval kwargs into one dict cnmf_params = CNMFParams(params_dict=params["main"]) diff --git a/mesmerize_core/algorithms/cnmfe.py b/mesmerize_core/algorithms/cnmfe.py index d4f1858..04cec29 100644 --- a/mesmerize_core/algorithms/cnmfe.py +++ b/mesmerize_core/algorithms/cnmfe.py @@ -1,3 +1,4 @@ +import asyncio import click import numpy as np import caiman as cm @@ -18,7 +19,10 @@ from ..utils import IS_WINDOWS -def run_algo(batch_path, uuid, data_path: str = None): +def run_algo(batch_path, uuid, data_path: str = None, dview=None): + asyncio.run(run_algo_async(batch_path, uuid, data_path=data_path, dview=dview)) + +async def run_algo_async(batch_path, uuid, data_path: str = None, dview=None): algo_start = time.time() set_parent_raw_data_path(data_path) @@ -35,18 +39,23 @@ def run_algo(batch_path, uuid, data_path: str = None): params = item["params"] print("cnmfe params:", params) - # adapted from current demo notebook - if "MESMERIZE_N_PROCESSES" in os.environ.keys(): - try: - n_processes = int(os.environ["MESMERIZE_N_PROCESSES"]) - except: - n_processes = psutil.cpu_count() - 1 + if 'multiprocessing' in str(type(dview)) and hasattr(dview, '_processes'): + n_processes = dview._processes + elif 'ipyparallel' in str(type(dview)): + n_processes = len(dview) else: - n_processes = psutil.cpu_count() - 1 - # Start cluster for parallel processing - c, dview, n_processes = cm.cluster.setup_cluster( - backend="local", n_processes=n_processes, single_thread=False - ) + # adapted from current demo notebook + if "MESMERIZE_N_PROCESSES" in os.environ.keys(): + try: + n_processes = int(os.environ["MESMERIZE_N_PROCESSES"]) + except: + n_processes = psutil.cpu_count() - 1 + else: + n_processes = psutil.cpu_count() - 1 + # Start cluster for parallel processing + c, dview, n_processes = cm.cluster.setup_cluster( + backend="multiprocessing", n_processes=n_processes, single_thread=False + ) try: fname_new = cm.save_memmap( diff --git a/mesmerize_core/algorithms/mcorr.py b/mesmerize_core/algorithms/mcorr.py index 484130d..81bedeb 100644 --- a/mesmerize_core/algorithms/mcorr.py +++ b/mesmerize_core/algorithms/mcorr.py @@ -1,4 +1,5 @@ import traceback +import asyncio import click import caiman as cm from caiman.source_extraction.cnmf.params import CNMFParams @@ -18,7 +19,10 @@ from ..batch_utils import set_parent_raw_data_path, load_batch -def run_algo(batch_path, uuid, data_path: str = None): +def run_algo(batch_path, uuid, data_path: str = None, dview=None): + asyncio.run(run_algo_async(batch_path, uuid, data_path=data_path, dview=dview)) + +async def run_algo_async(batch_path, uuid, data_path: str = None, dview=None): algo_start = time.time() set_parent_raw_data_path(data_path) @@ -40,19 +44,25 @@ def run_algo(batch_path, uuid, data_path: str = None): params = item["params"] # adapted from current demo notebook - if "MESMERIZE_N_PROCESSES" in os.environ.keys(): - try: - n_processes = int(os.environ["MESMERIZE_N_PROCESSES"]) - except: - n_processes = psutil.cpu_count() - 1 + if 'multiprocessing' in str(type(dview)) and hasattr(dview, '_processes'): + n_processes = dview._processes + elif 'ipyparallel' in str(type(dview)): + n_processes = len(dview) else: - n_processes = psutil.cpu_count() - 1 + # adapted from current demo notebook + if "MESMERIZE_N_PROCESSES" in os.environ.keys(): + try: + n_processes = int(os.environ["MESMERIZE_N_PROCESSES"]) + except: + n_processes = psutil.cpu_count() - 1 + else: + n_processes = psutil.cpu_count() - 1 + # Start cluster for parallel processing + c, dview, n_processes = cm.cluster.setup_cluster( + backend="multiprocessing", n_processes=n_processes, single_thread=False + ) print("starting mc") - # Start cluster for parallel processing - c, dview, n_processes = cm.cluster.setup_cluster( - backend="local", n_processes=n_processes, single_thread=False - ) rel_params = dict(params["main"]) opts = CNMFParams(params_dict=rel_params) diff --git a/mesmerize_core/batch_utils.py b/mesmerize_core/batch_utils.py index 96a2bd7..9766172 100644 --- a/mesmerize_core/batch_utils.py +++ b/mesmerize_core/batch_utils.py @@ -11,11 +11,13 @@ COMPUTE_BACKEND_SUBPROCESS = "subprocess" #: subprocess backend COMPUTE_BACKEND_SLURM = "slurm" #: SLURM backend COMPUTE_BACKEND_LOCAL = "local" +COMPUTE_BACKEND_ASYNC = "local_async" COMPUTE_BACKENDS = [ COMPUTE_BACKEND_SUBPROCESS, COMPUTE_BACKEND_SLURM, COMPUTE_BACKEND_LOCAL, + COMPUTE_BACKEND_ASYNC, ] DATAFRAME_COLUMNS = [ diff --git a/mesmerize_core/caiman_extensions/common.py b/mesmerize_core/caiman_extensions/common.py index 8e2b4cd..b916bf7 100644 --- a/mesmerize_core/caiman_extensions/common.py +++ b/mesmerize_core/caiman_extensions/common.py @@ -9,6 +9,7 @@ from datetime import datetime import time from copy import deepcopy +import asyncio import numpy as np import pandas as pd @@ -25,6 +26,7 @@ COMPUTE_BACKENDS, COMPUTE_BACKEND_SUBPROCESS, COMPUTE_BACKEND_LOCAL, + COMPUTE_BACKEND_ASYNC, get_parent_raw_data_path, load_batch, ) @@ -531,19 +533,30 @@ def __init__(self, s: pd.Series): self.process: Popen = None def _run_local( - self, - algo: str, - batch_path: Path, - uuid: UUID, - data_path: Union[Path, None], - ): + self, + algo: str, + batch_path: Path, + uuid: UUID, + data_path: Union[Path, None], + dview=None + ) -> DummyProcess: + coroutine = self._run_local_async(algo, batch_path, uuid, data_path, dview) + asyncio.run(coroutine) + return DummyProcess() + + def _run_local_async( + self, + algo: str, + batch_path: Path, + uuid: UUID, + data_path: Union[Path, None], + dview=None + ) -> Coroutine: algo_module = getattr(algorithms, algo) - algo_module.run_algo( - batch_path=str(batch_path), uuid=str(uuid), data_path=str(data_path) + return algo_module.run_algo_async( + batch_path=str(batch_path), uuid=str(uuid), data_path=str(data_path), dview=dview ) - return DummyProcess() - def _run_subprocess(self, runfile_path: str, wait: bool, **kwargs): # Get the dir that contains the input movie @@ -636,13 +649,14 @@ def run(self, backend: Optional[str] = None, wait: bool = True, **kwargs): batch_path = self._series.paths.get_batch_path() - if backend == COMPUTE_BACKEND_LOCAL: - print(f"Running {self._series.uuid} with local backend") - return self._run_local( + if backend in [COMPUTE_BACKEND_LOCAL, COMPUTE_BACKEND_ASYNC]: + print(f"Running {self._series.uuid} with {backend} backend") + return getattr(self, f"_run_{backend}")( algo=self._series["algo"], batch_path=batch_path, uuid=self._series["uuid"], data_path=get_parent_raw_data_path(), + dview=kwargs.get("dview") ) # Create the runfile in the batch dir using this Series' UUID as the filename From 020d29c463eaa38cbd39e0d91002f3fa1a99cca3 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Sun, 11 Aug 2024 13:52:38 -0400 Subject: [PATCH 04/31] Don't close dview if passed in; add context manager to help --- mesmerize_core/algorithms/_utils.py | 48 +++++++ mesmerize_core/algorithms/cnmf.py | 175 +++++++++++------------ mesmerize_core/algorithms/cnmfe.py | 141 +++++++++---------- mesmerize_core/algorithms/mcorr.py | 206 +++++++++++++--------------- 4 files changed, 281 insertions(+), 289 deletions(-) create mode 100644 mesmerize_core/algorithms/_utils.py diff --git a/mesmerize_core/algorithms/_utils.py b/mesmerize_core/algorithms/_utils.py new file mode 100644 index 0000000..e7dbdca --- /dev/null +++ b/mesmerize_core/algorithms/_utils.py @@ -0,0 +1,48 @@ +import caiman as cm +from contextlib import contextmanager +from ipyparallel import DirectView +from multiprocessing.pool import Pool +import os +import psutil +from typing import Union, Optional, Generator + +Cluster = Union[Pool, DirectView] + +def get_n_processes(dview: Optional[Cluster]) -> int: + """Infer number of processes in a multiprocessing or ipyparallel cluster""" + if isinstance(dview, Pool) and hasattr(dview, '_processes'): + return dview._processes + elif isinstance(dview, DirectView): + return len(dview) + else: + return 1 + + +@contextmanager +def ensure_server(dview: Optional[Cluster]) -> Generator[tuple[Cluster, int], None, None]: + """ + Context manager that passes through an existing 'dview' or + opens up a multiprocessing server if none is passed in. + If a server was opened, closes it upon exit. + Usage: `with ensure_server(dview) as (dview, n_processes):` + """ + if dview is not None: + yield dview, get_n_processes(dview) + else: + # no cluster passed in, so open one + if "MESMERIZE_N_PROCESSES" in os.environ.keys(): + try: + n_processes = int(os.environ["MESMERIZE_N_PROCESSES"]) + except: + n_processes = psutil.cpu_count() - 1 + else: + n_processes = psutil.cpu_count() - 1 + + # Start cluster for parallel processing + _, dview, n_processes = cm.cluster.setup_cluster( + backend="multiprocessing", n_processes=n_processes, single_thread=False + ) + try: + yield dview, n_processes + finally: + cm.stop_server(dview=dview) diff --git a/mesmerize_core/algorithms/cnmf.py b/mesmerize_core/algorithms/cnmf.py index f4cd8a8..06d5ffb 100644 --- a/mesmerize_core/algorithms/cnmf.py +++ b/mesmerize_core/algorithms/cnmf.py @@ -17,9 +17,11 @@ if __name__ in ["__main__", "__mp_main__"]: # when running in subprocess from mesmerize_core import set_parent_raw_data_path, load_batch from mesmerize_core.utils import IS_WINDOWS + from mesmerize_core.algorithms._utils import ensure_server else: # when running with local backend from ..batch_utils import set_parent_raw_data_path, load_batch from ..utils import IS_WINDOWS + from ._utils import ensure_server def run_algo(batch_path, uuid, data_path: str = None, dview=None): @@ -46,108 +48,85 @@ async def run_algo_async(batch_path, uuid, data_path: str = None, dview=None): f"Starting CNMF item:\n{item}\nWith params:{params}" ) - if 'multiprocessing' in str(type(dview)) and hasattr(dview, '_processes'): - n_processes = dview._processes - elif 'ipyparallel' in str(type(dview)): - n_processes = len(dview) - else: - # adapted from current demo notebook - if "MESMERIZE_N_PROCESSES" in os.environ.keys(): - try: - n_processes = int(os.environ["MESMERIZE_N_PROCESSES"]) - except: - n_processes = psutil.cpu_count() - 1 - else: - n_processes = psutil.cpu_count() - 1 - # Start cluster for parallel processing - c, dview, n_processes = cm.cluster.setup_cluster( - backend="multiprocessing", n_processes=n_processes, single_thread=False - ) - - # merge cnmf and eval kwargs into one dict - cnmf_params = CNMFParams(params_dict=params["main"]) - # Run CNMF, denote boolean 'success' if CNMF completes w/out error - try: - fname_new = cm.save_memmap( - [input_movie_path], base_name=f"{uuid}_cnmf-memmap_", order="C", dview=dview - ) - - print("making memmap") - - Yr, dims, T = cm.load_memmap(fname_new) - images = np.reshape(Yr.T, [T] + list(dims), order="F") - - proj_paths = dict() - for proj_type in ["mean", "std", "max"]: - p_img = getattr(np, f"nan{proj_type}")(images, axis=0) - proj_paths[proj_type] = output_dir.joinpath( - f"{uuid}_{proj_type}_projection.npy" - ) - np.save(str(proj_paths[proj_type]), p_img) - - # in fname new load in memmap order C - cm.stop_server(dview=dview) - c, dview, n_processes = cm.cluster.setup_cluster( - backend="local", n_processes=None, single_thread=False - ) - - print("performing CNMF") - cnm = cnmf.CNMF(n_processes, params=cnmf_params, dview=dview) - - print("fitting images") - cnm.fit(images) - # - if "refit" in params.keys(): - if params["refit"] is True: - print("refitting") - cnm = cnm.refit(images, dview=dview) - - print("performing eval") - cnm.estimates.evaluate_components(images, cnm.params, dview=dview) - - output_path = output_dir.joinpath(f"{uuid}.hdf5") - - cnm.save(str(output_path)) - - Cn = cm.local_correlations(images, swap_dim=False) - Cn[np.isnan(Cn)] = 0 - - corr_img_path = output_dir.joinpath(f"{uuid}_cn.npy") - np.save(str(corr_img_path), Cn, allow_pickle=False) - - # output dict for dataframe row (pd.Series) - d = dict() - - cnmf_memmap_path = output_dir.joinpath(Path(fname_new).name) - if IS_WINDOWS: - Yr._mmap.close() # accessing private attr but windows is annoying otherwise - move_file(fname_new, cnmf_memmap_path) - - # save paths as relative path strings with forward slashes - cnmf_hdf5_path = str(PurePosixPath(output_path.relative_to(output_dir.parent))) - cnmf_memmap_path = str( - PurePosixPath(cnmf_memmap_path.relative_to(output_dir.parent)) - ) - corr_img_path = str(PurePosixPath(corr_img_path.relative_to(output_dir.parent))) - for proj_type in proj_paths.keys(): - d[f"{proj_type}-projection-path"] = str( - PurePosixPath(proj_paths[proj_type].relative_to(output_dir.parent)) + with ensure_server(dview) as (dview, n_processes): + + # merge cnmf and eval kwargs into one dict + cnmf_params = CNMFParams(params_dict=params["main"]) + # Run CNMF, denote boolean 'success' if CNMF completes w/out error + try: + fname_new = cm.save_memmap( + [input_movie_path], base_name=f"{uuid}_cnmf-memmap_", order="C", dview=dview ) - d.update( - { - "cnmf-hdf5-path": cnmf_hdf5_path, - "cnmf-memmap-path": cnmf_memmap_path, - "corr-img-path": corr_img_path, - "success": True, - "traceback": None, - } - ) + print("making memmap") + + Yr, dims, T = cm.load_memmap(fname_new) + + images = np.reshape(Yr.T, [T] + list(dims), order="F") + + proj_paths = dict() + for proj_type in ["mean", "std", "max"]: + p_img = getattr(np, f"nan{proj_type}")(images, axis=0) + proj_paths[proj_type] = output_dir.joinpath( + f"{uuid}_{proj_type}_projection.npy" + ) + np.save(str(proj_paths[proj_type]), p_img) + + print("performing CNMF") + cnm = cnmf.CNMF(n_processes, params=cnmf_params, dview=dview) + + print("fitting images") + cnm.fit(images) + # + if "refit" in params.keys(): + if params["refit"] is True: + print("refitting") + cnm = cnm.refit(images, dview=dview) + + print("performing eval") + cnm.estimates.evaluate_components(images, cnm.params, dview=dview) - except: - d = {"success": False, "traceback": traceback.format_exc()} + output_path = output_dir.joinpath(f"{uuid}.hdf5") + + cnm.save(str(output_path)) + + Cn = cm.local_correlations(images, swap_dim=False) + Cn[np.isnan(Cn)] = 0 + + corr_img_path = output_dir.joinpath(f"{uuid}_cn.npy") + np.save(str(corr_img_path), Cn, allow_pickle=False) + + # output dict for dataframe row (pd.Series) + d = dict() + + cnmf_memmap_path = output_dir.joinpath(Path(fname_new).name) + if IS_WINDOWS: + Yr._mmap.close() # accessing private attr but windows is annoying otherwise + move_file(fname_new, cnmf_memmap_path) + + # save paths as relative path strings with forward slashes + cnmf_hdf5_path = str(PurePosixPath(output_path.relative_to(output_dir.parent))) + cnmf_memmap_path = str( + PurePosixPath(cnmf_memmap_path.relative_to(output_dir.parent)) + ) + corr_img_path = str(PurePosixPath(corr_img_path.relative_to(output_dir.parent))) + for proj_type in proj_paths.keys(): + d[f"{proj_type}-projection-path"] = str( + PurePosixPath(proj_paths[proj_type].relative_to(output_dir.parent)) + ) + + d.update( + { + "cnmf-hdf5-path": cnmf_hdf5_path, + "cnmf-memmap-path": cnmf_memmap_path, + "corr-img-path": corr_img_path, + "success": True, + "traceback": None, + } + ) - cm.stop_server(dview=dview) + except: + d = {"success": False, "traceback": traceback.format_exc()} runtime = round(time.time() - algo_start, 2) df.caiman.update_item_with_results(uuid, d, runtime) diff --git a/mesmerize_core/algorithms/cnmfe.py b/mesmerize_core/algorithms/cnmfe.py index 04cec29..bd75e32 100644 --- a/mesmerize_core/algorithms/cnmfe.py +++ b/mesmerize_core/algorithms/cnmfe.py @@ -14,9 +14,11 @@ if __name__ in ["__main__", "__mp_main__"]: # when running in subprocess from mesmerize_core import set_parent_raw_data_path, load_batch from mesmerize_core.utils import IS_WINDOWS + from mesmerize_core.algorithms._utils import ensure_server else: # when running with local backend from ..batch_utils import set_parent_raw_data_path, load_batch from ..utils import IS_WINDOWS + from ._utils import ensure_server def run_algo(batch_path, uuid, data_path: str = None, dview=None): @@ -39,96 +41,77 @@ async def run_algo_async(batch_path, uuid, data_path: str = None, dview=None): params = item["params"] print("cnmfe params:", params) - if 'multiprocessing' in str(type(dview)) and hasattr(dview, '_processes'): - n_processes = dview._processes - elif 'ipyparallel' in str(type(dview)): - n_processes = len(dview) - else: - # adapted from current demo notebook - if "MESMERIZE_N_PROCESSES" in os.environ.keys(): - try: - n_processes = int(os.environ["MESMERIZE_N_PROCESSES"]) - except: - n_processes = psutil.cpu_count() - 1 - else: - n_processes = psutil.cpu_count() - 1 - # Start cluster for parallel processing - c, dview, n_processes = cm.cluster.setup_cluster( - backend="multiprocessing", n_processes=n_processes, single_thread=False - ) - - try: - fname_new = cm.save_memmap( - [input_movie_path], base_name=f"{uuid}_cnmf-memmap_", order="C", dview=dview - ) - - print("making memmap") - Yr, dims, T = cm.load_memmap(fname_new) - images = np.reshape(Yr.T, [T] + list(dims), order="F") - - # TODO: if projections already exist from mcorr we don't - # need to waste compute time re-computing them here - proj_paths = dict() - for proj_type in ["mean", "std", "max"]: - p_img = getattr(np, f"nan{proj_type}")(images, axis=0) - proj_paths[proj_type] = output_dir.joinpath( - f"{uuid}_{proj_type}_projection.npy" + with ensure_server(dview) as (dview, n_processes): + try: + fname_new = cm.save_memmap( + [input_movie_path], base_name=f"{uuid}_cnmf-memmap_", order="C", dview=dview ) - np.save(str(proj_paths[proj_type]), p_img) - d = dict() # for output + print("making memmap") + Yr, dims, T = cm.load_memmap(fname_new) + images = np.reshape(Yr.T, [T] + list(dims), order="F") + + # TODO: if projections already exist from mcorr we don't + # need to waste compute time re-computing them here + proj_paths = dict() + for proj_type in ["mean", "std", "max"]: + p_img = getattr(np, f"nan{proj_type}")(images, axis=0) + proj_paths[proj_type] = output_dir.joinpath( + f"{uuid}_{proj_type}_projection.npy" + ) + np.save(str(proj_paths[proj_type]), p_img) + + d = dict() # for output + + # force the CNMFE params + cnmfe_params_dict = { + "method_init": "corr_pnr", + "n_processes": n_processes, + "only_init": True, # for 1p + "center_psf": True, # for 1p + "normalize_init": False, # for 1p + } - # force the CNMFE params - cnmfe_params_dict = { - "method_init": "corr_pnr", - "n_processes": n_processes, - "only_init": True, # for 1p - "center_psf": True, # for 1p - "normalize_init": False, # for 1p - } + params_dict = {**cnmfe_params_dict, **params["main"]} - params_dict = {**cnmfe_params_dict, **params["main"]} + cnmfe_params_dict = CNMFParams(params_dict=params_dict) + cnm = cnmf.CNMF(n_processes=n_processes, dview=dview, params=cnmfe_params_dict) + print("Performing CNMFE") + cnm.fit(images) + print("evaluating components") + cnm.estimates.evaluate_components(images, cnm.params, dview=dview) - cnmfe_params_dict = CNMFParams(params_dict=params_dict) - cnm = cnmf.CNMF(n_processes=n_processes, dview=dview, params=cnmfe_params_dict) - print("Performing CNMFE") - cnm.fit(images) - print("evaluating components") - cnm.estimates.evaluate_components(images, cnm.params, dview=dview) + cnmf_hdf5_path = output_dir.joinpath(f"{uuid}.hdf5") + cnm.save(str(cnmf_hdf5_path)) - cnmf_hdf5_path = output_dir.joinpath(f"{uuid}.hdf5") - cnm.save(str(cnmf_hdf5_path)) + # save output paths to outputs dict + d["cnmf-hdf5-path"] = cnmf_hdf5_path.relative_to(output_dir.parent) - # save output paths to outputs dict - d["cnmf-hdf5-path"] = cnmf_hdf5_path.relative_to(output_dir.parent) + for proj_type in proj_paths.keys(): + d[f"{proj_type}-projection-path"] = proj_paths[proj_type].relative_to( + output_dir.parent + ) - for proj_type in proj_paths.keys(): - d[f"{proj_type}-projection-path"] = proj_paths[proj_type].relative_to( - output_dir.parent - ) + cnmf_memmap_path = output_dir.joinpath(Path(fname_new).name) + if IS_WINDOWS: + Yr._mmap.close() # accessing private attr but windows is annoying otherwise + move_file(fname_new, cnmf_memmap_path) - cnmf_memmap_path = output_dir.joinpath(Path(fname_new).name) - if IS_WINDOWS: - Yr._mmap.close() # accessing private attr but windows is annoying otherwise - move_file(fname_new, cnmf_memmap_path) - - # save path as relative path strings with forward slashes - cnmfe_memmap_path = str( - PurePosixPath(cnmf_memmap_path.relative_to(output_dir.parent)) - ) - - d.update( - { - "cnmf-memmap-path": cnmfe_memmap_path, - "success": True, - "traceback": None, - } - ) + # save path as relative path strings with forward slashes + cnmfe_memmap_path = str( + PurePosixPath(cnmf_memmap_path.relative_to(output_dir.parent)) + ) - except: - d = {"success": False, "traceback": traceback.format_exc()} + d.update( + { + "cnmf-memmap-path": cnmfe_memmap_path, + "success": True, + "traceback": None, + } + ) - cm.stop_server(dview=dview) + except: + d = {"success": False, "traceback": traceback.format_exc()} runtime = round(time.time() - algo_start, 2) df.caiman.update_item_with_results(uuid, d, runtime) diff --git a/mesmerize_core/algorithms/mcorr.py b/mesmerize_core/algorithms/mcorr.py index 81bedeb..d0a4236 100644 --- a/mesmerize_core/algorithms/mcorr.py +++ b/mesmerize_core/algorithms/mcorr.py @@ -15,8 +15,10 @@ # prevent circular import if __name__ in ["__main__", "__mp_main__"]: # when running in subprocess from mesmerize_core import set_parent_raw_data_path, load_batch + from mesmerize_core.algorithms._utils import ensure_server else: # when running with local backend from ..batch_utils import set_parent_raw_data_path, load_batch + from ._utils import ensure_server def run_algo(batch_path, uuid, data_path: str = None, dview=None): @@ -43,121 +45,101 @@ async def run_algo_async(batch_path, uuid, data_path: str = None, dview=None): params = item["params"] - # adapted from current demo notebook - if 'multiprocessing' in str(type(dview)) and hasattr(dview, '_processes'): - n_processes = dview._processes - elif 'ipyparallel' in str(type(dview)): - n_processes = len(dview) - else: - # adapted from current demo notebook - if "MESMERIZE_N_PROCESSES" in os.environ.keys(): - try: - n_processes = int(os.environ["MESMERIZE_N_PROCESSES"]) - except: - n_processes = psutil.cpu_count() - 1 - else: - n_processes = psutil.cpu_count() - 1 - # Start cluster for parallel processing - c, dview, n_processes = cm.cluster.setup_cluster( - backend="multiprocessing", n_processes=n_processes, single_thread=False - ) - - print("starting mc") - - rel_params = dict(params["main"]) - opts = CNMFParams(params_dict=rel_params) - # Run MC, denote boolean 'success' if MC completes w/out error - try: - # Run MC - fnames = [input_movie_path] - mc = MotionCorrect(fnames, dview=dview, **opts.get_group("motion")) - mc.motion_correct(save_movie=True) - - # find path to mmap file - memmap_output_path_temp = df.paths.resolve(mc.mmap_file[0]) - - # filename to move the output back to data dir - mcorr_memmap_path = output_dir.joinpath( - f"{uuid}-{memmap_output_path_temp.name}" - ) - - # move the output file - move_file(memmap_output_path_temp, mcorr_memmap_path) - - print("mc finished successfully!") - - print("computing projections") - Yr, dims, T = cm.load_memmap(str(mcorr_memmap_path)) - images = np.reshape(Yr.T, [T] + list(dims), order="F") - - proj_paths = dict() - for proj_type in ["mean", "std", "max"]: - p_img = getattr(np, f"nan{proj_type}")(images, axis=0) - proj_paths[proj_type] = output_dir.joinpath( - f"{uuid}_{proj_type}_projection.npy" + with ensure_server(dview) as (dview, n_processes): + print("starting mc") + + rel_params = dict(params["main"]) + opts = CNMFParams(params_dict=rel_params) + # Run MC, denote boolean 'success' if MC completes w/out error + try: + # Run MC + fnames = [input_movie_path] + mc = MotionCorrect(fnames, dview=dview, **opts.get_group("motion")) + mc.motion_correct(save_movie=True) + + # find path to mmap file + memmap_output_path_temp = df.paths.resolve(mc.mmap_file[0]) + + # filename to move the output back to data dir + mcorr_memmap_path = output_dir.joinpath( + f"{uuid}-{memmap_output_path_temp.name}" + ) + + # move the output file + move_file(memmap_output_path_temp, mcorr_memmap_path) + + print("mc finished successfully!") + + print("computing projections") + Yr, dims, T = cm.load_memmap(str(mcorr_memmap_path)) + images = np.reshape(Yr.T, [T] + list(dims), order="F") + + proj_paths = dict() + for proj_type in ["mean", "std", "max"]: + p_img = getattr(np, f"nan{proj_type}")(images, axis=0) + proj_paths[proj_type] = output_dir.joinpath( + f"{uuid}_{proj_type}_projection.npy" + ) + np.save(str(proj_paths[proj_type]), p_img) + + print("Computing correlation image") + Cns = local_correlations_movie_offline( + str(mcorr_memmap_path), + remove_baseline=True, + window=1000, + stride=1000, + winSize_baseline=100, + quantil_min_baseline=10, + dview=dview, + ) + Cn = Cns.max(axis=0) + Cn[np.isnan(Cn)] = 0 + cn_path = output_dir.joinpath(f"{uuid}_cn.npy") + np.save(str(cn_path), Cn, allow_pickle=False) + + print("finished computing correlation image") + + # Compute shifts + if opts.motion["pw_rigid"] == True: + x_shifts = mc.x_shifts_els + y_shifts = mc.y_shifts_els + shifts = [x_shifts, y_shifts] + if hasattr(mc, "z_shifts_els"): + shifts.append(mc.z_shifts_els) + shift_path = output_dir.joinpath(f"{uuid}_shifts.npy") + np.save(str(shift_path), shifts) + else: + shifts = mc.shifts_rig + shift_path = output_dir.joinpath(f"{uuid}_shifts.npy") + np.save(str(shift_path), shifts) + + # output dict for pandas series for dataframe row + d = dict() + + # save paths as relative path strings with forward slashes + cn_path = str(PurePosixPath(cn_path.relative_to(output_dir.parent))) + mcorr_memmap_path = str( + PurePosixPath(mcorr_memmap_path.relative_to(output_dir.parent)) ) - np.save(str(proj_paths[proj_type]), p_img) - - print("Computing correlation image") - Cns = local_correlations_movie_offline( - str(mcorr_memmap_path), - remove_baseline=True, - window=1000, - stride=1000, - winSize_baseline=100, - quantil_min_baseline=10, - dview=dview, - ) - Cn = Cns.max(axis=0) - Cn[np.isnan(Cn)] = 0 - cn_path = output_dir.joinpath(f"{uuid}_cn.npy") - np.save(str(cn_path), Cn, allow_pickle=False) - - print("finished computing correlation image") - - # Compute shifts - if opts.motion["pw_rigid"] == True: - x_shifts = mc.x_shifts_els - y_shifts = mc.y_shifts_els - shifts = [x_shifts, y_shifts] - if hasattr(mc, "z_shifts_els"): - shifts.append(mc.z_shifts_els) - shift_path = output_dir.joinpath(f"{uuid}_shifts.npy") - np.save(str(shift_path), shifts) - else: - shifts = mc.shifts_rig - shift_path = output_dir.joinpath(f"{uuid}_shifts.npy") - np.save(str(shift_path), shifts) - - # output dict for pandas series for dataframe row - d = dict() - - # save paths as relative path strings with forward slashes - cn_path = str(PurePosixPath(cn_path.relative_to(output_dir.parent))) - mcorr_memmap_path = str( - PurePosixPath(mcorr_memmap_path.relative_to(output_dir.parent)) - ) - shift_path = str(PurePosixPath(shift_path.relative_to(output_dir.parent))) - for proj_type in proj_paths.keys(): - d[f"{proj_type}-projection-path"] = str( - PurePosixPath(proj_paths[proj_type].relative_to(output_dir.parent)) + shift_path = str(PurePosixPath(shift_path.relative_to(output_dir.parent))) + for proj_type in proj_paths.keys(): + d[f"{proj_type}-projection-path"] = str( + PurePosixPath(proj_paths[proj_type].relative_to(output_dir.parent)) + ) + + d.update( + { + "mcorr-output-path": mcorr_memmap_path, + "corr-img-path": cn_path, + "shifts": shift_path, + "success": True, + "traceback": None, + } ) - d.update( - { - "mcorr-output-path": mcorr_memmap_path, - "corr-img-path": cn_path, - "shifts": shift_path, - "success": True, - "traceback": None, - } - ) - - except: - d = {"success": False, "traceback": traceback.format_exc()} - print("mc failed, stored traceback in output") - - cm.stop_server(dview=dview) + except: + d = {"success": False, "traceback": traceback.format_exc()} + print("mc failed, stored traceback in output") runtime = round(time.time() - algo_start, 2) df.caiman.update_item_with_results(uuid, d, runtime) From d907ac69c9a9b2bc8a20fd4b9bbd8933e03b4e65 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Sun, 11 Aug 2024 20:46:14 -0400 Subject: [PATCH 05/31] switch from using asyncio to concurrent.futures and add test --- mesmerize_core/algorithms/cnmf.py | 6 --- mesmerize_core/algorithms/cnmfe.py | 6 --- mesmerize_core/algorithms/mcorr.py | 5 --- mesmerize_core/caiman_extensions/common.py | 50 ++++++++++++++++------ tests/test_core.py | 50 +++++++++++++++++++++- 5 files changed, 87 insertions(+), 30 deletions(-) diff --git a/mesmerize_core/algorithms/cnmf.py b/mesmerize_core/algorithms/cnmf.py index 06d5ffb..9328cce 100644 --- a/mesmerize_core/algorithms/cnmf.py +++ b/mesmerize_core/algorithms/cnmf.py @@ -1,16 +1,13 @@ """Performs CNMF in a separate process""" -import asyncio import click import caiman as cm from caiman.source_extraction.cnmf import cnmf as cnmf from caiman.source_extraction.cnmf.params import CNMFParams -import psutil import numpy as np import traceback from pathlib import Path, PurePosixPath from shutil import move as move_file -import os import time # prevent circular import @@ -25,9 +22,6 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): - asyncio.run(run_algo_async(batch_path, uuid, data_path=data_path, dview=dview)) - -async def run_algo_async(batch_path, uuid, data_path: str = None, dview=None): algo_start = time.time() set_parent_raw_data_path(data_path) diff --git a/mesmerize_core/algorithms/cnmfe.py b/mesmerize_core/algorithms/cnmfe.py index bd75e32..3940727 100644 --- a/mesmerize_core/algorithms/cnmfe.py +++ b/mesmerize_core/algorithms/cnmfe.py @@ -1,14 +1,11 @@ -import asyncio import click import numpy as np import caiman as cm from caiman.source_extraction.cnmf import cnmf as cnmf from caiman.source_extraction.cnmf.params import CNMFParams -import psutil import traceback from pathlib import Path, PurePosixPath from shutil import move as move_file -import os import time if __name__ in ["__main__", "__mp_main__"]: # when running in subprocess @@ -22,9 +19,6 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): - asyncio.run(run_algo_async(batch_path, uuid, data_path=data_path, dview=dview)) - -async def run_algo_async(batch_path, uuid, data_path: str = None, dview=None): algo_start = time.time() set_parent_raw_data_path(data_path) diff --git a/mesmerize_core/algorithms/mcorr.py b/mesmerize_core/algorithms/mcorr.py index d0a4236..068faa3 100644 --- a/mesmerize_core/algorithms/mcorr.py +++ b/mesmerize_core/algorithms/mcorr.py @@ -1,11 +1,9 @@ import traceback -import asyncio import click import caiman as cm from caiman.source_extraction.cnmf.params import CNMFParams from caiman.motion_correction import MotionCorrect from caiman.summary_images import local_correlations_movie_offline -import psutil import os from pathlib import Path, PurePosixPath import numpy as np @@ -22,9 +20,6 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): - asyncio.run(run_algo_async(batch_path, uuid, data_path=data_path, dview=dview)) - -async def run_algo_async(batch_path, uuid, data_path: str = None, dview=None): algo_start = time.time() set_parent_raw_data_path(data_path) diff --git a/mesmerize_core/caiman_extensions/common.py b/mesmerize_core/caiman_extensions/common.py index b916bf7..5e5db5c 100644 --- a/mesmerize_core/caiman_extensions/common.py +++ b/mesmerize_core/caiman_extensions/common.py @@ -9,7 +9,7 @@ from datetime import datetime import time from copy import deepcopy -import asyncio +from concurrent.futures import ThreadPoolExecutor, Future import numpy as np import pandas as pd @@ -515,13 +515,26 @@ def get_parent(self, index: Union[int, str, UUID]) -> Union[UUID, None]: return r["uuid"] -class DummyProcess: - """Dummy process for local backend""" +class Waitable(Protocol): + """An object that we can call "wait" on""" + def wait(self) -> None: ... + - def wait(self): +class DummyProcess(Waitable): + """Dummy process for local backend""" + def wait(self) -> None: pass +class WaitableFuture(Waitable): + """Adaptor for future returned from Executor.submit""" + def __init__(self, future: Future[None]): + self.future = future + + def wait(self) -> None: + return self.future.result() + + @pd.api.extensions.register_series_accessor("caiman") class CaimanSeriesExtensions: """ @@ -530,7 +543,7 @@ class CaimanSeriesExtensions: def __init__(self, s: pd.Series): self._series = s - self.process: Popen = None + self.process: Optional[Waitable] = None def _run_local( self, @@ -540,9 +553,15 @@ def _run_local( data_path: Union[Path, None], dview=None ) -> DummyProcess: - coroutine = self._run_local_async(algo, batch_path, uuid, data_path, dview) - asyncio.run(coroutine) - return DummyProcess() + algo_module = getattr(algorithms, algo) + algo_module.run_algo( + batch_path=str(batch_path), + uuid=str(uuid), + data_path=str(data_path), + dview=dview + ) + self.process = DummyProcess() + return self.process def _run_local_async( self, @@ -551,11 +570,18 @@ def _run_local_async( uuid: UUID, data_path: Union[Path, None], dview=None - ) -> Coroutine: + ) -> WaitableFuture: algo_module = getattr(algorithms, algo) - return algo_module.run_algo_async( - batch_path=str(batch_path), uuid=str(uuid), data_path=str(data_path), dview=dview - ) + with ThreadPoolExecutor(max_workers=1) as executor: + future = executor.submit( + algo_module.run_algo, + batch_path=str(batch_path), + uuid=str(uuid), + data_path=str(data_path), + dview=dview + ) + self.process = WaitableFuture(future) + return self.process def _run_subprocess(self, runfile_path: str, wait: bool, **kwargs): diff --git a/tests/test_core.py b/tests/test_core.py index b8a7419..8b8a920 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,5 +1,4 @@ import os - import numpy as np from caiman.utils.utils import load_dict_from_hdf5 from caiman.source_extraction.cnmf.cnmf import CNMF @@ -15,9 +14,12 @@ from mesmerize_core.batch_utils import ( DATAFRAME_COLUMNS, COMPUTE_BACKEND_SUBPROCESS, + COMPUTE_BACKEND_LOCAL, + COMPUTE_BACKEND_ASYNC, get_full_raw_data_path, ) from mesmerize_core.utils import IS_WINDOWS +from mesmerize_core.algorithms._utils import ensure_server from uuid import uuid4 import pytest import requests @@ -33,6 +35,7 @@ import tifffile from copy import deepcopy + # don't call "resolve" on these - want to make sure we can handle non-canonical paths correctly tmp_dir = Path(os.path.dirname(os.path.abspath(__file__)), "test data", "tmp") vid_dir = Path(os.path.dirname(os.path.abspath(__file__)), "test data", "videos") @@ -1336,3 +1339,48 @@ def test_cache(): assert hex(id(cnmf.cnmf_cache.get_cache().iloc[-1]["return_val"])) == hex( id(output) ) + + +def test_backends(): + """test subprocess, local, and async_local backend""" + set_parent_raw_data_path(vid_dir) + algo = "mcorr" + df, batch_path = _create_tmp_batch() + input_movie_path = get_datafile(algo) + + # make small version of movie for quick testing + movie = tifffile.imread(input_movie_path) + small_movie_path = input_movie_path.parent.joinpath("small_movie.tif") + tifffile.imwrite(small_movie_path, movie[:1001]) + print(input_movie_path) + + # put backends that can run in the background first to save time + backends = [COMPUTE_BACKEND_SUBPROCESS, COMPUTE_BACKEND_ASYNC, COMPUTE_BACKEND_LOCAL] + for backend in backends: + df.caiman.add_item( + algo="mcorr", + item_name=f"test-{backend}", + input_movie_path=small_movie_path, + params=test_params["mcorr"], + ) + + # run using each backend + procs = [] + with ensure_server(None) as (dview, _): + for backend, (_, item) in zip(backends, df.iterrows()): + procs.append(item.caiman.run(backend=backend, dview=dview, wait=False)) + + # wait for all to finish + for proc in procs: + proc.wait() + + # compare results + df = load_batch(batch_path) + for i, item in df.iterrows(): + output = item.mcorr.get_output() + + if i == 0: + # save to compare to other results + first_output = output + else: + numpy.testing.assert_array_equal(output, first_output) From f71bb01b0b9631b08a4b5e249b868faeac285af9 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Sat, 14 Sep 2024 16:44:40 -0400 Subject: [PATCH 06/31] Make sure all Waitables throw exception on failure --- mesmerize_core/caiman_extensions/common.py | 96 ++++++++++++---------- 1 file changed, 51 insertions(+), 45 deletions(-) diff --git a/mesmerize_core/caiman_extensions/common.py b/mesmerize_core/caiman_extensions/common.py index 5e5db5c..72483bc 100644 --- a/mesmerize_core/caiman_extensions/common.py +++ b/mesmerize_core/caiman_extensions/common.py @@ -2,7 +2,7 @@ import shutil from pathlib import Path import psutil -from subprocess import Popen +from subprocess import Popen, CalledProcessError from typing import * from uuid import UUID, uuid4 from shutil import rmtree @@ -533,6 +533,17 @@ def __init__(self, future: Future[None]): def wait(self) -> None: return self.future.result() + + +class CheckedSubprocess(Waitable): + """Adaptor for Popen that just raises an exception if the return code is nonzero""" + def __init__(self, popen: Popen): + self.popen = popen + + def wait(self) -> None: + rc = self.popen.wait() + if rc != 0: + raise CalledProcessError(rc, self.popen.args) @pd.api.extensions.register_series_accessor("caiman") @@ -560,8 +571,7 @@ def _run_local( data_path=str(data_path), dview=dview ) - self.process = DummyProcess() - return self.process + return DummyProcess() def _run_local_async( self, @@ -580,23 +590,18 @@ def _run_local_async( data_path=str(data_path), dview=dview ) - self.process = WaitableFuture(future) - return self.process + return WaitableFuture(future) - def _run_subprocess(self, runfile_path: str, wait: bool, **kwargs): + def _run_subprocess(self, runfile_path: str, **kwargs) -> CheckedSubprocess: # Get the dir that contains the input movie parent_path = self._series.paths.resolve(self._series.input_movie_path).parent - self.process = Popen([runfile_path], cwd=parent_path) + popen = Popen([runfile_path], cwd=parent_path) + return CheckedSubprocess(popen) # so that it throws an exception on failure - if wait: - self.process.wait() - - return self.process def _run_slurm( - self, runfile_path: str, wait: bool, sbatch_opts: str = "", **kwargs - ): + self, runfile_path: str, sbatch_opts: str = "", **kwargs) -> CheckedSubprocess: """ Run on a cluster using SLURM. Configurable options (to pass to run): - sbatch_opts: A single string containing additional options for sbatch. @@ -628,15 +633,12 @@ def _run_slurm( f"--output={output_path}", "--wait", ] + shlex.split(sbatch_opts) + + return CheckedSubprocess(Popen(["sbatch", *submission_opts, runfile_path])) - self.process = Popen(["sbatch", *submission_opts, runfile_path]) - if wait: - self.process.wait() - - return self.process @cnmf_cache.invalidate() - def run(self, backend: Optional[str] = None, wait: bool = True, **kwargs): + def run(self, backend: Optional[str] = None, wait: bool = True, **kwargs) -> Waitable: """ Run a CaImAn algorithm in an external process using the chosen backend @@ -677,41 +679,45 @@ def run(self, backend: Optional[str] = None, wait: bool = True, **kwargs): if backend in [COMPUTE_BACKEND_LOCAL, COMPUTE_BACKEND_ASYNC]: print(f"Running {self._series.uuid} with {backend} backend") - return getattr(self, f"_run_{backend}")( + self.process = getattr(self, f"_run_{backend}")( algo=self._series["algo"], batch_path=batch_path, uuid=self._series["uuid"], data_path=get_parent_raw_data_path(), dview=kwargs.get("dview") ) - - # Create the runfile in the batch dir using this Series' UUID as the filename - if IS_WINDOWS: - runfile_ext = ".bat" else: - runfile_ext = ".runfile" - runfile_path = str( - batch_path.parent.joinpath(self._series["uuid"] + runfile_ext) - ) + # Create the runfile in the batch dir using this Series' UUID as the filename + if IS_WINDOWS: + runfile_ext = ".bat" + else: + runfile_ext = ".runfile" + runfile_path = str( + batch_path.parent.joinpath(self._series["uuid"] + runfile_ext) + ) - args_str = ( - f"--batch-path {lex.quote(str(batch_path))} --uuid {self._series.uuid}" - ) - if get_parent_raw_data_path() is not None: - args_str += f" --data-path {lex.quote(str(get_parent_raw_data_path()))}" - - # make the runfile - runfile_path = make_runfile( - module_path=os.path.abspath( - ALGO_MODULES[self._series["algo"]].__file__ - ), # caiman algorithm - filename=runfile_path, # path to create runfile - args_str=args_str, - ) + args_str = ( + f"--batch-path {lex.quote(str(batch_path))} --uuid {self._series.uuid}" + ) + if get_parent_raw_data_path() is not None: + args_str += f" --data-path {lex.quote(str(get_parent_raw_data_path()))}" + + # make the runfile + runfile_path = make_runfile( + module_path=os.path.abspath( + ALGO_MODULES[self._series["algo"]].__file__ + ), # caiman algorithm + filename=runfile_path, # path to create runfile + args_str=args_str, + ) - self.process = getattr(self, f"_run_{backend}")( - runfile_path, wait=wait, **kwargs - ) + self.process = getattr(self, f"_run_{backend}")( + runfile_path, **kwargs + ) + + assert self.process is not None, 'Process should have been created' + if wait: + self.process.wait() return self.process def get_input_movie_path(self) -> Path: From 1e8e36f076e5deb1649069a88a245558e6a629a2 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Fri, 5 Sep 2025 13:46:07 -0400 Subject: [PATCH 07/31] Check type of setup_cluster return value --- mesmerize_core/algorithms/_utils.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/mesmerize_core/algorithms/_utils.py b/mesmerize_core/algorithms/_utils.py index e7dbdca..2200884 100644 --- a/mesmerize_core/algorithms/_utils.py +++ b/mesmerize_core/algorithms/_utils.py @@ -1,17 +1,20 @@ -import caiman as cm from contextlib import contextmanager -from ipyparallel import DirectView -from multiprocessing.pool import Pool import os import psutil -from typing import Union, Optional, Generator +from typing import Optional, Union, Generator + +import caiman as cm +from caiman.cluster import setup_cluster +from ipyparallel import DirectView +from multiprocessing.pool import Pool + Cluster = Union[Pool, DirectView] def get_n_processes(dview: Optional[Cluster]) -> int: """Infer number of processes in a multiprocessing or ipyparallel cluster""" if isinstance(dview, Pool) and hasattr(dview, '_processes'): - return dview._processes + return dview._processes # type: ignore elif isinstance(dview, DirectView): return len(dview) else: @@ -39,9 +42,10 @@ def ensure_server(dview: Optional[Cluster]) -> Generator[tuple[Cluster, int], No n_processes = psutil.cpu_count() - 1 # Start cluster for parallel processing - _, dview, n_processes = cm.cluster.setup_cluster( + _, dview, n_processes = setup_cluster( backend="multiprocessing", n_processes=n_processes, single_thread=False ) + assert isinstance(dview, Pool) and isinstance(n_processes, int), 'setup_cluster with multiprocessing did not return a Pool' try: yield dview, n_processes finally: From dc44d179e2b88991dd464b49d4e355ee9ad058a9 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Fri, 5 Sep 2025 13:18:00 -0400 Subject: [PATCH 08/31] Make dview compatible with other cluster types (conforming to protocol) --- mesmerize_core/algorithms/_utils.py | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/mesmerize_core/algorithms/_utils.py b/mesmerize_core/algorithms/_utils.py index 2200884..5f50246 100644 --- a/mesmerize_core/algorithms/_utils.py +++ b/mesmerize_core/algorithms/_utils.py @@ -1,7 +1,8 @@ from contextlib import contextmanager import os import psutil -from typing import Optional, Union, Generator +from typing import (Optional, Union, Generator, Protocol, + Callable, TypeVar, Sequence, Iterable, runtime_checkable) import caiman as cm from caiman.cluster import setup_cluster @@ -9,13 +10,24 @@ from multiprocessing.pool import Pool -Cluster = Union[Pool, DirectView] +@runtime_checkable +class CustomCluster(Protocol): + """Protocol for a cluster that is not a multiprocessing pool""" + RetVal = TypeVar('RetVal') + def map_sync(self, fn: Callable[..., RetVal], args: Iterable) -> Sequence[RetVal]: + ... + + def __len__(self) -> int: + """return number of workers""" + ... + +Cluster = Union[Pool, DirectView, CustomCluster] def get_n_processes(dview: Optional[Cluster]) -> int: """Infer number of processes in a multiprocessing or ipyparallel cluster""" if isinstance(dview, Pool) and hasattr(dview, '_processes'): return dview._processes # type: ignore - elif isinstance(dview, DirectView): + elif isinstance(dview, CustomCluster): return len(dview) else: return 1 @@ -33,13 +45,17 @@ def ensure_server(dview: Optional[Cluster]) -> Generator[tuple[Cluster, int], No yield dview, get_n_processes(dview) else: # no cluster passed in, so open one + procs_available = psutil.cpu_count() + if procs_available is None: + raise RuntimeError('Cannot determine number of processes') + if "MESMERIZE_N_PROCESSES" in os.environ.keys(): try: n_processes = int(os.environ["MESMERIZE_N_PROCESSES"]) except: - n_processes = psutil.cpu_count() - 1 + n_processes = procs_available - 1 else: - n_processes = psutil.cpu_count() - 1 + n_processes = procs_available - 1 # Start cluster for parallel processing _, dview, n_processes = setup_cluster( From bd4c63d51975ce5056a04bdce8b8141a6346343b Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Fri, 5 Sep 2025 12:34:36 -0400 Subject: [PATCH 09/31] Fix CustomCluster protocol to work w/ futures again --- mesmerize_core/algorithms/_utils.py | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/mesmerize_core/algorithms/_utils.py b/mesmerize_core/algorithms/_utils.py index 5f50246..a988a22 100644 --- a/mesmerize_core/algorithms/_utils.py +++ b/mesmerize_core/algorithms/_utils.py @@ -6,28 +6,35 @@ import caiman as cm from caiman.cluster import setup_cluster -from ipyparallel import DirectView from multiprocessing.pool import Pool +RetVal = TypeVar("RetVal") @runtime_checkable class CustomCluster(Protocol): - """Protocol for a cluster that is not a multiprocessing pool""" - RetVal = TypeVar('RetVal') - def map_sync(self, fn: Callable[..., RetVal], args: Iterable) -> Sequence[RetVal]: - ... - + """ + Protocol for a cluster that is not a multiprocessing pool + (including ipyparallel.DirectView) + """ + + def map_sync( + self, fn: Callable[..., RetVal], args: Iterable + ) -> Sequence[RetVal]: ... + def __len__(self) -> int: """return number of workers""" ... -Cluster = Union[Pool, DirectView, CustomCluster] + +Cluster = Union[Pool, CustomCluster] + def get_n_processes(dview: Optional[Cluster]) -> int: """Infer number of processes in a multiprocessing or ipyparallel cluster""" - if isinstance(dview, Pool) and hasattr(dview, '_processes'): + if isinstance(dview, Pool): + assert hasattr(dview, '_processes'), "Pool not keeping track of # of processes?" return dview._processes # type: ignore - elif isinstance(dview, CustomCluster): + elif dview is not None: return len(dview) else: return 1 From 5cafeeabcf07cd792525c575d0a6e06ac8199d96 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Fri, 5 Sep 2025 13:05:22 -0400 Subject: [PATCH 10/31] Change definition of cluster again --- mesmerize_core/algorithms/_utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mesmerize_core/algorithms/_utils.py b/mesmerize_core/algorithms/_utils.py index a988a22..0bdf1a8 100644 --- a/mesmerize_core/algorithms/_utils.py +++ b/mesmerize_core/algorithms/_utils.py @@ -6,6 +6,7 @@ import caiman as cm from caiman.cluster import setup_cluster +from ipyparallel import DirectView from multiprocessing.pool import Pool @@ -26,7 +27,7 @@ def __len__(self) -> int: ... -Cluster = Union[Pool, CustomCluster] +Cluster = Union[Pool, CustomCluster, DirectView] def get_n_processes(dview: Optional[Cluster]) -> int: From d7819588ca44a6100de7fef7608c5457ba839d06 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Fri, 7 Jun 2024 09:33:01 -0400 Subject: [PATCH 11/31] Allow skipping mmap saving if input is correct order --- mesmerize_core/algorithms/cnmf.py | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/mesmerize_core/algorithms/cnmf.py b/mesmerize_core/algorithms/cnmf.py index 9328cce..b04da87 100644 --- a/mesmerize_core/algorithms/cnmf.py +++ b/mesmerize_core/algorithms/cnmf.py @@ -4,6 +4,7 @@ import caiman as cm from caiman.source_extraction.cnmf import cnmf as cnmf from caiman.source_extraction.cnmf.params import CNMFParams +from caiman.paths import decode_mmap_filename_dict import numpy as np import traceback from pathlib import Path, PurePosixPath @@ -48,13 +49,20 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): cnmf_params = CNMFParams(params_dict=params["main"]) # Run CNMF, denote boolean 'success' if CNMF completes w/out error try: - fname_new = cm.save_memmap( - [input_movie_path], base_name=f"{uuid}_cnmf-memmap_", order="C", dview=dview - ) - - print("making memmap") - - Yr, dims, T = cm.load_memmap(fname_new) + # only re-save memmap if necessary + save_new_mmap = True + if Path(input_movie_path).suffix == ".mmap": + mmap_info = decode_mmap_filename_dict(input_movie_path) + save_new_mmap = "order" not in mmap_info or mmap_info["order"] != "C" + + if save_new_mmap: + print("making memmap") + fname_new = cm.save_memmap( + [input_movie_path], base_name=f"{uuid}_cnmf-memmap_", order="C", dview=dview + ) + Yr, dims, T = cm.load_memmap(fname_new) + else: + Yr, dims, T = cm.load_memmap(input_movie_path) images = np.reshape(Yr.T, [T] + list(dims), order="F") @@ -93,10 +101,14 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): # output dict for dataframe row (pd.Series) d = dict() - cnmf_memmap_path = output_dir.joinpath(Path(fname_new).name) if IS_WINDOWS: Yr._mmap.close() # accessing private attr but windows is annoying otherwise - move_file(fname_new, cnmf_memmap_path) + + if save_new_mmap: + cnmf_memmap_path = output_dir.joinpath(Path(fname_new).name) + move_file(fname_new, cnmf_memmap_path) + else: + cnmf_memmap_path = Path(input_movie_path) # save paths as relative path strings with forward slashes cnmf_hdf5_path = str(PurePosixPath(output_path.relative_to(output_dir.parent))) From 73f4eba5f56d4b52ca95b4ad84c37d3e81d2ca4c Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Tue, 29 Oct 2024 22:36:50 -0400 Subject: [PATCH 12/31] Compute projections in parallel with appropriate chunk size --- mesmerize_core/algorithms/_utils.py | 48 +++++++++++++++++++++++++++++ mesmerize_core/algorithms/cnmf.py | 14 +++------ mesmerize_core/algorithms/cnmfe.py | 14 +++------ mesmerize_core/algorithms/mcorr.py | 14 +++------ 4 files changed, 63 insertions(+), 27 deletions(-) diff --git a/mesmerize_core/algorithms/_utils.py b/mesmerize_core/algorithms/_utils.py index 0bdf1a8..6a7e24c 100644 --- a/mesmerize_core/algorithms/_utils.py +++ b/mesmerize_core/algorithms/_utils.py @@ -1,5 +1,6 @@ from contextlib import contextmanager import os +from pathlib import Path import psutil from typing import (Optional, Union, Generator, Protocol, Callable, TypeVar, Sequence, Iterable, runtime_checkable) @@ -8,6 +9,7 @@ from caiman.cluster import setup_cluster from ipyparallel import DirectView from multiprocessing.pool import Pool +import numpy as np RetVal = TypeVar("RetVal") @@ -74,3 +76,49 @@ def ensure_server(dview: Optional[Cluster]) -> Generator[tuple[Cluster, int], No yield dview, n_processes finally: cm.stop_server(dview=dview) + + +def estimate_n_pixels_per_process(n_processes: int, T: int, dims: tuple[int, ...]) -> int: + """ + Estimate a safe number of pixels to allocate to each parallel process at a time + Taken from CNMF.fit (TODO factor this out in caiman and just import it) + """ + avail_memory_per_process = psutil.virtual_memory()[ + 1] / 2.**30 / n_processes + mem_per_pix = 3.6977678498329843e-09 + npx_per_proc = int(avail_memory_per_process / 8. / mem_per_pix / T) + npx_per_proc = int(np.minimum(npx_per_proc, np.prod(dims) // n_processes)) + return npx_per_proc + + +def make_chunk_projection(Yr_chunk: np.ndarray, proj_type: str): + return getattr(np, "nan" + proj_type)(Yr_chunk, axis=1) + +def make_chunk_projection_helper(args: tuple[np.ndarray, str]): + return make_chunk_projection(*args) + + +def save_projections_parallel(uuid, Yr: np.ndarray, dims: tuple[int, ...], T: int, + output_dir: Path, dview: Optional[Cluster]) -> dict[str, Path]: + proj_paths = dict() + for proj_type in ["mean", "std", "max"]: + if dview is None: + p_img_flat = make_chunk_projection(Yr, proj_type) + else: + # use n_pixels_per_process from CNMF to avoid running out of memory + n_pix = Yr.shape[0] + p_img_flat = np.empty(n_pix, dtype=Yr.dtype) + chunk_size = estimate_n_pixels_per_process(get_n_processes(dview), T, dims) + chunk_starts = range(0, n_pix, chunk_size) + chunk_slices = (slice(start, min(start + chunk_size, n_pix)) for start in chunk_starts) + chunks = (Yr[sl] for sl in chunk_slices) + args = ([ch, proj_type] for ch in chunks) + for chunk_slice, chunk_proj in zip(chunk_slices, dview.imap(make_chunk_projection_helper, args)): + p_img_flat[chunk_slice] = chunk_proj + + p_img = np.reshape(p_img_flat, dims, order='F') + proj_paths[proj_type] = output_dir.joinpath( + f"{uuid}_{proj_type}_projection.npy" + ) + np.save(str(proj_paths[proj_type]), p_img) + return proj_paths diff --git a/mesmerize_core/algorithms/cnmf.py b/mesmerize_core/algorithms/cnmf.py index b04da87..c6c3b7d 100644 --- a/mesmerize_core/algorithms/cnmf.py +++ b/mesmerize_core/algorithms/cnmf.py @@ -15,11 +15,11 @@ if __name__ in ["__main__", "__mp_main__"]: # when running in subprocess from mesmerize_core import set_parent_raw_data_path, load_batch from mesmerize_core.utils import IS_WINDOWS - from mesmerize_core.algorithms._utils import ensure_server + from mesmerize_core.algorithms._utils import ensure_server, save_projections_parallel else: # when running with local backend from ..batch_utils import set_parent_raw_data_path, load_batch from ..utils import IS_WINDOWS - from ._utils import ensure_server + from ._utils import ensure_server, save_projections_parallel def run_algo(batch_path, uuid, data_path: str = None, dview=None): @@ -66,13 +66,9 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): images = np.reshape(Yr.T, [T] + list(dims), order="F") - proj_paths = dict() - for proj_type in ["mean", "std", "max"]: - p_img = getattr(np, f"nan{proj_type}")(images, axis=0) - proj_paths[proj_type] = output_dir.joinpath( - f"{uuid}_{proj_type}_projection.npy" - ) - np.save(str(proj_paths[proj_type]), p_img) + proj_paths = save_projections_parallel( + uuid=uuid, Yr=Yr, dims=dims, T=T, output_dir=output_dir, dview=dview + ) print("performing CNMF") cnm = cnmf.CNMF(n_processes, params=cnmf_params, dview=dview) diff --git a/mesmerize_core/algorithms/cnmfe.py b/mesmerize_core/algorithms/cnmfe.py index 3940727..f3b0ca9 100644 --- a/mesmerize_core/algorithms/cnmfe.py +++ b/mesmerize_core/algorithms/cnmfe.py @@ -11,11 +11,11 @@ if __name__ in ["__main__", "__mp_main__"]: # when running in subprocess from mesmerize_core import set_parent_raw_data_path, load_batch from mesmerize_core.utils import IS_WINDOWS - from mesmerize_core.algorithms._utils import ensure_server + from mesmerize_core.algorithms._utils import ensure_server, save_projections_parallel else: # when running with local backend from ..batch_utils import set_parent_raw_data_path, load_batch from ..utils import IS_WINDOWS - from ._utils import ensure_server + from ._utils import ensure_server, save_projections_parallel def run_algo(batch_path, uuid, data_path: str = None, dview=None): @@ -47,13 +47,9 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): # TODO: if projections already exist from mcorr we don't # need to waste compute time re-computing them here - proj_paths = dict() - for proj_type in ["mean", "std", "max"]: - p_img = getattr(np, f"nan{proj_type}")(images, axis=0) - proj_paths[proj_type] = output_dir.joinpath( - f"{uuid}_{proj_type}_projection.npy" - ) - np.save(str(proj_paths[proj_type]), p_img) + proj_paths = save_projections_parallel( + uuid=uuid, Yr=Yr, dims=dims, T=T, output_dir=output_dir, dview=dview + ) d = dict() # for output diff --git a/mesmerize_core/algorithms/mcorr.py b/mesmerize_core/algorithms/mcorr.py index 068faa3..3cfdc69 100644 --- a/mesmerize_core/algorithms/mcorr.py +++ b/mesmerize_core/algorithms/mcorr.py @@ -13,10 +13,10 @@ # prevent circular import if __name__ in ["__main__", "__mp_main__"]: # when running in subprocess from mesmerize_core import set_parent_raw_data_path, load_batch - from mesmerize_core.algorithms._utils import ensure_server + from mesmerize_core.algorithms._utils import ensure_server, save_projections_parallel else: # when running with local backend from ..batch_utils import set_parent_raw_data_path, load_batch - from ._utils import ensure_server + from ._utils import ensure_server, save_projections_parallel def run_algo(batch_path, uuid, data_path: str = None, dview=None): @@ -69,13 +69,9 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): Yr, dims, T = cm.load_memmap(str(mcorr_memmap_path)) images = np.reshape(Yr.T, [T] + list(dims), order="F") - proj_paths = dict() - for proj_type in ["mean", "std", "max"]: - p_img = getattr(np, f"nan{proj_type}")(images, axis=0) - proj_paths[proj_type] = output_dir.joinpath( - f"{uuid}_{proj_type}_projection.npy" - ) - np.save(str(proj_paths[proj_type]), p_img) + proj_paths = save_projections_parallel( + uuid=uuid, Yr=Yr, dims=dims, T=T, output_dir=output_dir, dview=dview + ) print("Computing correlation image") Cns = local_correlations_movie_offline( From ac2854953d6f88b268786e2c9d16a0a66fd9e12d Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Tue, 29 Oct 2024 22:53:13 -0400 Subject: [PATCH 13/31] Factor out part that actually computes the projection so it can be reused --- mesmerize_core/algorithms/_utils.py | 36 ++++++++++++++++------------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/mesmerize_core/algorithms/_utils.py b/mesmerize_core/algorithms/_utils.py index 6a7e24c..918a36b 100644 --- a/mesmerize_core/algorithms/_utils.py +++ b/mesmerize_core/algorithms/_utils.py @@ -92,31 +92,35 @@ def estimate_n_pixels_per_process(n_processes: int, T: int, dims: tuple[int, ... def make_chunk_projection(Yr_chunk: np.ndarray, proj_type: str): - return getattr(np, "nan" + proj_type)(Yr_chunk, axis=1) + return getattr(np, proj_type)(Yr_chunk, axis=1) def make_chunk_projection_helper(args: tuple[np.ndarray, str]): return make_chunk_projection(*args) +def make_projection_parallel(Yr: np.ndarray, dims: tuple[int, ...], T: int, + proj_type: str, dview: Optional[Cluster]) -> np.ndarray: + if dview is None: + p_img_flat = make_chunk_projection(Yr, proj_type) + else: + # use n_pixels_per_process from CNMF to avoid running out of memory + n_pix = Yr.shape[0] + p_img_flat = np.empty(n_pix, dtype=Yr.dtype) + chunk_size = estimate_n_pixels_per_process(get_n_processes(dview), T, dims) + chunk_starts = range(0, n_pix, chunk_size) + chunk_slices = [slice(start, min(start + chunk_size, n_pix)) for start in chunk_starts] + args = ((Yr[chunk_slice], proj_type) for chunk_slice in chunk_slices) + for chunk_slice, chunk_proj in zip(chunk_slices, dview.imap(make_chunk_projection_helper, args)): + p_img_flat[chunk_slice] = chunk_proj + + return np.reshape(p_img_flat, dims, order='F') + + def save_projections_parallel(uuid, Yr: np.ndarray, dims: tuple[int, ...], T: int, output_dir: Path, dview: Optional[Cluster]) -> dict[str, Path]: proj_paths = dict() for proj_type in ["mean", "std", "max"]: - if dview is None: - p_img_flat = make_chunk_projection(Yr, proj_type) - else: - # use n_pixels_per_process from CNMF to avoid running out of memory - n_pix = Yr.shape[0] - p_img_flat = np.empty(n_pix, dtype=Yr.dtype) - chunk_size = estimate_n_pixels_per_process(get_n_processes(dview), T, dims) - chunk_starts = range(0, n_pix, chunk_size) - chunk_slices = (slice(start, min(start + chunk_size, n_pix)) for start in chunk_starts) - chunks = (Yr[sl] for sl in chunk_slices) - args = ([ch, proj_type] for ch in chunks) - for chunk_slice, chunk_proj in zip(chunk_slices, dview.imap(make_chunk_projection_helper, args)): - p_img_flat[chunk_slice] = chunk_proj - - p_img = np.reshape(p_img_flat, dims, order='F') + p_img = make_projection_parallel(Yr, dims, T, "nan" + proj_type, dview=dview) proj_paths[proj_type] = output_dir.joinpath( f"{uuid}_{proj_type}_projection.npy" ) From c8f1fa8df4f2ec2925e75f14a90839fa72d92258 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Thu, 31 Oct 2024 15:20:55 -0400 Subject: [PATCH 14/31] Avoid loading memmap in parallel processes --- mesmerize_core/algorithms/_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mesmerize_core/algorithms/_utils.py b/mesmerize_core/algorithms/_utils.py index 918a36b..7ae2899 100644 --- a/mesmerize_core/algorithms/_utils.py +++ b/mesmerize_core/algorithms/_utils.py @@ -109,7 +109,7 @@ def make_projection_parallel(Yr: np.ndarray, dims: tuple[int, ...], T: int, chunk_size = estimate_n_pixels_per_process(get_n_processes(dview), T, dims) chunk_starts = range(0, n_pix, chunk_size) chunk_slices = [slice(start, min(start + chunk_size, n_pix)) for start in chunk_starts] - args = ((Yr[chunk_slice], proj_type) for chunk_slice in chunk_slices) + args = ((np.asarray(Yr[chunk_slice]), proj_type) for chunk_slice in chunk_slices) for chunk_slice, chunk_proj in zip(chunk_slices, dview.imap(make_chunk_projection_helper, args)): p_img_flat[chunk_slice] = chunk_proj From 145baa3aa965c255f8c2d00720dd731e78a4358c Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Thu, 31 Oct 2024 23:05:50 -0400 Subject: [PATCH 15/31] Load memmap in subprocesses to avoid slow imap while keeping out of memory --- mesmerize_core/algorithms/_utils.py | 25 +++++++++++++------------ mesmerize_core/algorithms/cnmf.py | 14 +++++--------- mesmerize_core/algorithms/cnmfe.py | 2 +- mesmerize_core/algorithms/mcorr.py | 2 +- 4 files changed, 20 insertions(+), 23 deletions(-) diff --git a/mesmerize_core/algorithms/_utils.py b/mesmerize_core/algorithms/_utils.py index 7ae2899..c5d129f 100644 --- a/mesmerize_core/algorithms/_utils.py +++ b/mesmerize_core/algorithms/_utils.py @@ -94,33 +94,34 @@ def estimate_n_pixels_per_process(n_processes: int, T: int, dims: tuple[int, ... def make_chunk_projection(Yr_chunk: np.ndarray, proj_type: str): return getattr(np, proj_type)(Yr_chunk, axis=1) -def make_chunk_projection_helper(args: tuple[np.ndarray, str]): - return make_chunk_projection(*args) +def make_chunk_projection_helper(args: tuple[str, slice, str]): + Yr_name, chunk_slice, proj_type = args + Yr, _, _ = cm.load_memmap(Yr_name) + return make_chunk_projection(Yr[chunk_slice], proj_type) -def make_projection_parallel(Yr: np.ndarray, dims: tuple[int, ...], T: int, - proj_type: str, dview: Optional[Cluster]) -> np.ndarray: +def make_projection_parallel(movie_path: str, proj_type: str, dview: Optional[Cluster]) -> np.ndarray: + Yr, dims, T = cm.load_memmap(movie_path) if dview is None: p_img_flat = make_chunk_projection(Yr, proj_type) else: # use n_pixels_per_process from CNMF to avoid running out of memory n_pix = Yr.shape[0] - p_img_flat = np.empty(n_pix, dtype=Yr.dtype) chunk_size = estimate_n_pixels_per_process(get_n_processes(dview), T, dims) chunk_starts = range(0, n_pix, chunk_size) chunk_slices = [slice(start, min(start + chunk_size, n_pix)) for start in chunk_starts] - args = ((np.asarray(Yr[chunk_slice]), proj_type) for chunk_slice in chunk_slices) - for chunk_slice, chunk_proj in zip(chunk_slices, dview.imap(make_chunk_projection_helper, args)): - p_img_flat[chunk_slice] = chunk_proj - + args = [(movie_path, chunk_slice, proj_type) for chunk_slice in chunk_slices] + map_fn = dview.map if isinstance(dview, Pool) else dview.map_sync + chunk_projs = map_fn(make_chunk_projection_helper, args) + p_img_flat = np.concatenate(chunk_projs, axis=0) return np.reshape(p_img_flat, dims, order='F') -def save_projections_parallel(uuid, Yr: np.ndarray, dims: tuple[int, ...], T: int, - output_dir: Path, dview: Optional[Cluster]) -> dict[str, Path]: +def save_projections_parallel(uuid, movie_path: Union[str, Path], output_dir: Path, dview: Optional[Cluster] + ) -> dict[str, Path]: proj_paths = dict() for proj_type in ["mean", "std", "max"]: - p_img = make_projection_parallel(Yr, dims, T, "nan" + proj_type, dview=dview) + p_img = make_projection_parallel(str(movie_path), "nan" + proj_type, dview=dview) proj_paths[proj_type] = output_dir.joinpath( f"{uuid}_{proj_type}_projection.npy" ) diff --git a/mesmerize_core/algorithms/cnmf.py b/mesmerize_core/algorithms/cnmf.py index c6c3b7d..4d83175 100644 --- a/mesmerize_core/algorithms/cnmf.py +++ b/mesmerize_core/algorithms/cnmf.py @@ -60,14 +60,16 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): fname_new = cm.save_memmap( [input_movie_path], base_name=f"{uuid}_cnmf-memmap_", order="C", dview=dview ) - Yr, dims, T = cm.load_memmap(fname_new) + cnmf_memmap_path = output_dir.joinpath(Path(fname_new).name) + move_file(fname_new, cnmf_memmap_path) else: - Yr, dims, T = cm.load_memmap(input_movie_path) + cnmf_memmap_path = Path(input_movie_path) + Yr, dims, T = cm.load_memmap(str(cnmf_memmap_path)) images = np.reshape(Yr.T, [T] + list(dims), order="F") proj_paths = save_projections_parallel( - uuid=uuid, Yr=Yr, dims=dims, T=T, output_dir=output_dir, dview=dview + uuid=uuid, movie_path=cnmf_memmap_path, output_dir=output_dir, dview=dview ) print("performing CNMF") @@ -100,12 +102,6 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): if IS_WINDOWS: Yr._mmap.close() # accessing private attr but windows is annoying otherwise - if save_new_mmap: - cnmf_memmap_path = output_dir.joinpath(Path(fname_new).name) - move_file(fname_new, cnmf_memmap_path) - else: - cnmf_memmap_path = Path(input_movie_path) - # save paths as relative path strings with forward slashes cnmf_hdf5_path = str(PurePosixPath(output_path.relative_to(output_dir.parent))) cnmf_memmap_path = str( diff --git a/mesmerize_core/algorithms/cnmfe.py b/mesmerize_core/algorithms/cnmfe.py index f3b0ca9..83adeed 100644 --- a/mesmerize_core/algorithms/cnmfe.py +++ b/mesmerize_core/algorithms/cnmfe.py @@ -48,7 +48,7 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): # TODO: if projections already exist from mcorr we don't # need to waste compute time re-computing them here proj_paths = save_projections_parallel( - uuid=uuid, Yr=Yr, dims=dims, T=T, output_dir=output_dir, dview=dview + uuid=uuid, movie_path=fname_new, output_dir=output_dir, dview=dview ) d = dict() # for output diff --git a/mesmerize_core/algorithms/mcorr.py b/mesmerize_core/algorithms/mcorr.py index 3cfdc69..771553c 100644 --- a/mesmerize_core/algorithms/mcorr.py +++ b/mesmerize_core/algorithms/mcorr.py @@ -70,7 +70,7 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): images = np.reshape(Yr.T, [T] + list(dims), order="F") proj_paths = save_projections_parallel( - uuid=uuid, Yr=Yr, dims=dims, T=T, output_dir=output_dir, dview=dview + uuid=uuid, movie_path=mcorr_memmap_path, output_dir=output_dir, dview=dview ) print("Computing correlation image") From b6cf1e4a2963b4e7bff07bb02e3a54d7533bf058 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Sat, 2 Nov 2024 22:11:53 -0400 Subject: [PATCH 16/31] Fix input path not relative to output dir --- .gitignore | 3 --- mesmerize_core/algorithms/cnmf.py | 4 +--- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/.gitignore b/.gitignore index d5dede4..0aed8f9 100644 --- a/.gitignore +++ b/.gitignore @@ -136,6 +136,3 @@ dmypy.json # test files tests/test data - -# VSCode -.vscode/ diff --git a/mesmerize_core/algorithms/cnmf.py b/mesmerize_core/algorithms/cnmf.py index 4d83175..f479d9c 100644 --- a/mesmerize_core/algorithms/cnmf.py +++ b/mesmerize_core/algorithms/cnmf.py @@ -104,9 +104,7 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): # save paths as relative path strings with forward slashes cnmf_hdf5_path = str(PurePosixPath(output_path.relative_to(output_dir.parent))) - cnmf_memmap_path = str( - PurePosixPath(cnmf_memmap_path.relative_to(output_dir.parent)) - ) + cnmf_memmap_path = str(PurePosixPath(df.paths.split(cnmf_memmap_path)[1])) # still work if outside output dir corr_img_path = str(PurePosixPath(corr_img_path.relative_to(output_dir.parent))) for proj_type in proj_paths.keys(): d[f"{proj_type}-projection-path"] = str( From 748e7bc1f20ba5cc3dc82c7e11e4b3131517d442 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Mon, 25 Nov 2024 10:34:02 -0500 Subject: [PATCH 17/31] Compute correlations for cnmf in the same way as mcorr to maybe avoid OOM --- mesmerize_core/algorithms/cnmf.py | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/mesmerize_core/algorithms/cnmf.py b/mesmerize_core/algorithms/cnmf.py index f479d9c..1571559 100644 --- a/mesmerize_core/algorithms/cnmf.py +++ b/mesmerize_core/algorithms/cnmf.py @@ -4,6 +4,7 @@ import caiman as cm from caiman.source_extraction.cnmf import cnmf as cnmf from caiman.source_extraction.cnmf.params import CNMFParams +from caiman.summary_images import local_correlations_movie_offline from caiman.paths import decode_mmap_filename_dict import numpy as np import traceback @@ -68,10 +69,26 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): Yr, dims, T = cm.load_memmap(str(cnmf_memmap_path)) images = np.reshape(Yr.T, [T] + list(dims), order="F") + print("computing projections") proj_paths = save_projections_parallel( uuid=uuid, movie_path=cnmf_memmap_path, output_dir=output_dir, dview=dview ) + print("computing correlation image") + Cns = local_correlations_movie_offline( + str(cnmf_memmap_path), + remove_baseline=True, + window=1000, + stride=1000, + winSize_baseline=100, + quantil_min_baseline=10, + dview=dview, + ) + Cn = Cns.max(axis=0) + Cn[np.isnan(Cn)] = 0 + corr_img_path = output_dir.joinpath(f"{uuid}_cn.npy") + np.save(str(corr_img_path), Cn, allow_pickle=False) + print("performing CNMF") cnm = cnmf.CNMF(n_processes, params=cnmf_params, dview=dview) @@ -90,12 +107,6 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): cnm.save(str(output_path)) - Cn = cm.local_correlations(images, swap_dim=False) - Cn[np.isnan(Cn)] = 0 - - corr_img_path = output_dir.joinpath(f"{uuid}_cn.npy") - np.save(str(corr_img_path), Cn, allow_pickle=False) - # output dict for dataframe row (pd.Series) d = dict() From 0f02d49965d039fc535b319d31a8f7b5faf08333 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Wed, 4 Dec 2024 14:43:37 -0500 Subject: [PATCH 18/31] Support computing projections using scipy.stats functions --- mesmerize_core/algorithms/_utils.py | 30 +++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/mesmerize_core/algorithms/_utils.py b/mesmerize_core/algorithms/_utils.py index c5d129f..21a59d2 100644 --- a/mesmerize_core/algorithms/_utils.py +++ b/mesmerize_core/algorithms/_utils.py @@ -10,6 +10,7 @@ from ipyparallel import DirectView from multiprocessing.pool import Pool import numpy as np +import scipy.stats RetVal = TypeVar("RetVal") @@ -91,26 +92,39 @@ def estimate_n_pixels_per_process(n_processes: int, T: int, dims: tuple[int, ... return npx_per_proc -def make_chunk_projection(Yr_chunk: np.ndarray, proj_type: str): - return getattr(np, proj_type)(Yr_chunk, axis=1) +def make_chunk_projection(Yr_chunk: np.ndarray, proj_type: str, ignore_nan=False): + if hasattr(scipy.stats, proj_type): + return getattr(scipy.stats, proj_type)(Yr_chunk, axis=1, nan_policy='omit' if ignore_nan else 'propagate') + + if hasattr(np, proj_type): + if ignore_nan: + if hasattr(np, "nan" + proj_type): + proj_type = "nan" + proj_type + else: + logging.warning(f"NaN-ignoring version of {proj_type} function does not exist; not ignoring NaNs") + return getattr(np, proj_type)(Yr_chunk, axis=1) + + raise NotImplementedError(f"Projection type '{proj_type}' not implemented") + def make_chunk_projection_helper(args: tuple[str, slice, str]): - Yr_name, chunk_slice, proj_type = args + Yr_name, chunk_slice, proj_type, ignore_nan = args Yr, _, _ = cm.load_memmap(Yr_name) - return make_chunk_projection(Yr[chunk_slice], proj_type) + return make_chunk_projection(Yr[chunk_slice], proj_type, ignore_nan=ignore_nan) -def make_projection_parallel(movie_path: str, proj_type: str, dview: Optional[Cluster]) -> np.ndarray: +def make_projection_parallel(movie_path: str, proj_type: str, dview: Optional[Cluster], + ignore_nan=False) -> np.ndarray: Yr, dims, T = cm.load_memmap(movie_path) if dview is None: - p_img_flat = make_chunk_projection(Yr, proj_type) + p_img_flat = make_chunk_projection(Yr, proj_type, ignore_nan=ignore_nan) else: # use n_pixels_per_process from CNMF to avoid running out of memory n_pix = Yr.shape[0] chunk_size = estimate_n_pixels_per_process(get_n_processes(dview), T, dims) chunk_starts = range(0, n_pix, chunk_size) chunk_slices = [slice(start, min(start + chunk_size, n_pix)) for start in chunk_starts] - args = [(movie_path, chunk_slice, proj_type) for chunk_slice in chunk_slices] + args = [(movie_path, chunk_slice, proj_type, ignore_nan) for chunk_slice in chunk_slices] map_fn = dview.map if isinstance(dview, Pool) else dview.map_sync chunk_projs = map_fn(make_chunk_projection_helper, args) p_img_flat = np.concatenate(chunk_projs, axis=0) @@ -121,7 +135,7 @@ def save_projections_parallel(uuid, movie_path: Union[str, Path], output_dir: Pa ) -> dict[str, Path]: proj_paths = dict() for proj_type in ["mean", "std", "max"]: - p_img = make_projection_parallel(str(movie_path), "nan" + proj_type, dview=dview) + p_img = make_projection_parallel(str(movie_path), proj_type, dview=dview, ignore_nan=True) proj_paths[proj_type] = output_dir.joinpath( f"{uuid}_{proj_type}_projection.npy" ) From 09354c0dab04a39ccff029eaa231bc4a67c5b730 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Thu, 5 Dec 2024 16:49:51 -0500 Subject: [PATCH 19/31] Allow movies to be passed directly to make_projection functions --- mesmerize_core/algorithms/_utils.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/mesmerize_core/algorithms/_utils.py b/mesmerize_core/algorithms/_utils.py index 21a59d2..e4d21b1 100644 --- a/mesmerize_core/algorithms/_utils.py +++ b/mesmerize_core/algorithms/_utils.py @@ -107,15 +107,24 @@ def make_chunk_projection(Yr_chunk: np.ndarray, proj_type: str, ignore_nan=False raise NotImplementedError(f"Projection type '{proj_type}' not implemented") -def make_chunk_projection_helper(args: tuple[str, slice, str]): - Yr_name, chunk_slice, proj_type, ignore_nan = args - Yr, _, _ = cm.load_memmap(Yr_name) +def make_chunk_projection_helper(args: tuple[Union[str, np.ndarray], slice, str, bool]): + Yr, chunk_slice, proj_type, ignore_nan = args + if isinstance(Yr, str): + Yr, _, _ = cm.load_memmap(Yr) return make_chunk_projection(Yr[chunk_slice], proj_type, ignore_nan=ignore_nan) -def make_projection_parallel(movie_path: str, proj_type: str, dview: Optional[Cluster], +def make_projection_parallel(movie_or_path: Union[cm.movie, str], proj_type: str, dview: Optional[Cluster], ignore_nan=False) -> np.ndarray: - Yr, dims, T = cm.load_memmap(movie_path) + if isinstance(movie_or_path, str): + movie_path = movie_or_path + Yr, dims, T = cm.load_memmap(movie_path) + else: + dview = None # avoid doing in parallel if we already know it fits into memory + T = movie_or_path.shape[0] + dims = movie_or_path.shape[1:] + Yr = movie_or_path.reshape((T, -1), order='F').T + if dview is None: p_img_flat = make_chunk_projection(Yr, proj_type, ignore_nan=ignore_nan) else: From 9547ad97cbf117ddf9d95dc1b3ec5c4d90ca1533 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Fri, 4 Apr 2025 14:11:45 -0400 Subject: [PATCH 20/31] Adjust window size when saving correlations to avoid OOM --- mesmerize_core/algorithms/_utils.py | 31 +++++++++++++++++++++++++++-- mesmerize_core/algorithms/cnmf.py | 31 +++++++++++++++-------------- mesmerize_core/algorithms/mcorr.py | 30 +++++++++++++++------------- 3 files changed, 61 insertions(+), 31 deletions(-) diff --git a/mesmerize_core/algorithms/_utils.py b/mesmerize_core/algorithms/_utils.py index e4d21b1..a32f208 100644 --- a/mesmerize_core/algorithms/_utils.py +++ b/mesmerize_core/algorithms/_utils.py @@ -1,4 +1,6 @@ from contextlib import contextmanager +import logging +import math import os from pathlib import Path import psutil @@ -7,6 +9,7 @@ import caiman as cm from caiman.cluster import setup_cluster +from caiman.summary_images import local_correlations_movie_offline from ipyparallel import DirectView from multiprocessing.pool import Pool import numpy as np @@ -79,13 +82,16 @@ def ensure_server(dview: Optional[Cluster]) -> Generator[tuple[Cluster, int], No cm.stop_server(dview=dview) +def avail_bytes_per_process(n_processes: int): + return psutil.virtual_memory()[1] / n_processes + + def estimate_n_pixels_per_process(n_processes: int, T: int, dims: tuple[int, ...]) -> int: """ Estimate a safe number of pixels to allocate to each parallel process at a time Taken from CNMF.fit (TODO factor this out in caiman and just import it) """ - avail_memory_per_process = psutil.virtual_memory()[ - 1] / 2.**30 / n_processes + avail_memory_per_process = avail_bytes_per_process(n_processes) / 2.0**30 mem_per_pix = 3.6977678498329843e-09 npx_per_proc = int(avail_memory_per_process / 8. / mem_per_pix / T) npx_per_proc = int(np.minimum(npx_per_proc, np.prod(dims) // n_processes)) @@ -150,3 +156,24 @@ def save_projections_parallel(uuid, movie_path: Union[str, Path], output_dir: Pa ) np.save(str(proj_paths[proj_type]), p_img) return proj_paths + + +def save_correlation_parallel(uuid, movie_path: Union[str, Path], output_dir: Path, + dims: tuple[int, ...], dview: Optional[Cluster], max_window=1000) -> Path: + """Compute and save local correlations in chunks that are small enough to fit in memory""" + n_processes = get_n_processes(dview) + safe_window = min(max_window, math.floor(avail_bytes_per_process(n_processes) / (8 * np.prod(dims)) * 0.8)) + Cns = local_correlations_movie_offline( + str(movie_path), + remove_baseline=True, + window=safe_window, + stride=safe_window, + winSize_baseline=100, + quantil_min_baseline=10, + dview=dview, + ) + Cn = Cns.max(axis=0) + Cn[np.isnan(Cn)] = 0 + corr_img_path = output_dir.joinpath(f"{uuid}_cn.npy") + np.save(str(corr_img_path), Cn, allow_pickle=False) + return corr_img_path diff --git a/mesmerize_core/algorithms/cnmf.py b/mesmerize_core/algorithms/cnmf.py index 1571559..d0e36cf 100644 --- a/mesmerize_core/algorithms/cnmf.py +++ b/mesmerize_core/algorithms/cnmf.py @@ -4,7 +4,6 @@ import caiman as cm from caiman.source_extraction.cnmf import cnmf as cnmf from caiman.source_extraction.cnmf.params import CNMFParams -from caiman.summary_images import local_correlations_movie_offline from caiman.paths import decode_mmap_filename_dict import numpy as np import traceback @@ -16,11 +15,15 @@ if __name__ in ["__main__", "__mp_main__"]: # when running in subprocess from mesmerize_core import set_parent_raw_data_path, load_batch from mesmerize_core.utils import IS_WINDOWS - from mesmerize_core.algorithms._utils import ensure_server, save_projections_parallel + from mesmerize_core.algorithms._utils import ( + ensure_server, + save_projections_parallel, + save_correlation_parallel, + ) else: # when running with local backend from ..batch_utils import set_parent_raw_data_path, load_batch from ..utils import IS_WINDOWS - from ._utils import ensure_server, save_projections_parallel + from ._utils import ensure_server, save_projections_parallel, save_correlation_parallel def run_algo(batch_path, uuid, data_path: str = None, dview=None): @@ -71,23 +74,21 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): print("computing projections") proj_paths = save_projections_parallel( - uuid=uuid, movie_path=cnmf_memmap_path, output_dir=output_dir, dview=dview + uuid=uuid, + movie_path=cnmf_memmap_path, + output_dir=output_dir, + dview=dview, ) print("computing correlation image") - Cns = local_correlations_movie_offline( - str(cnmf_memmap_path), - remove_baseline=True, - window=1000, - stride=1000, - winSize_baseline=100, - quantil_min_baseline=10, + corr_img_path = save_correlation_parallel( + uuid=uuid, + movie_path=cnmf_memmap_path, + output_dir=output_dir, + dims=dims, dview=dview, + max_window=1000 ) - Cn = Cns.max(axis=0) - Cn[np.isnan(Cn)] = 0 - corr_img_path = output_dir.joinpath(f"{uuid}_cn.npy") - np.save(str(corr_img_path), Cn, allow_pickle=False) print("performing CNMF") cnm = cnmf.CNMF(n_processes, params=cnmf_params, dview=dview) diff --git a/mesmerize_core/algorithms/mcorr.py b/mesmerize_core/algorithms/mcorr.py index 771553c..880ec0a 100644 --- a/mesmerize_core/algorithms/mcorr.py +++ b/mesmerize_core/algorithms/mcorr.py @@ -13,10 +13,14 @@ # prevent circular import if __name__ in ["__main__", "__mp_main__"]: # when running in subprocess from mesmerize_core import set_parent_raw_data_path, load_batch - from mesmerize_core.algorithms._utils import ensure_server, save_projections_parallel + from mesmerize_core.algorithms._utils import ( + ensure_server, + save_projections_parallel, + save_correlation_parallel, + ) else: # when running with local backend from ..batch_utils import set_parent_raw_data_path, load_batch - from ._utils import ensure_server, save_projections_parallel + from ._utils import ensure_server, save_projections_parallel, save_correlation_parallel def run_algo(batch_path, uuid, data_path: str = None, dview=None): @@ -70,23 +74,21 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): images = np.reshape(Yr.T, [T] + list(dims), order="F") proj_paths = save_projections_parallel( - uuid=uuid, movie_path=mcorr_memmap_path, output_dir=output_dir, dview=dview + uuid=uuid, + movie_path=mcorr_memmap_path, + output_dir=output_dir, + dview=dview, ) print("Computing correlation image") - Cns = local_correlations_movie_offline( - str(mcorr_memmap_path), - remove_baseline=True, - window=1000, - stride=1000, - winSize_baseline=100, - quantil_min_baseline=10, + cn_path = save_correlation_parallel( + uuid=uuid, + movie_path=mcorr_memmap_path, + output_dir=output_dir, + dims=dims, dview=dview, + max_window=1000 ) - Cn = Cns.max(axis=0) - Cn[np.isnan(Cn)] = 0 - cn_path = output_dir.joinpath(f"{uuid}_cn.npy") - np.save(str(cn_path), Cn, allow_pickle=False) print("finished computing correlation image") From 7e56981bb7003768996ee1cb083811bbf0a6bfb5 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Fri, 5 Sep 2025 12:34:56 -0400 Subject: [PATCH 21/31] Compute correlation more efficiently --- mesmerize_core/algorithms/_utils.py | 147 ++++++++++++++++++++-------- mesmerize_core/algorithms/cnmf.py | 2 - mesmerize_core/algorithms/mcorr.py | 3 - 3 files changed, 105 insertions(+), 47 deletions(-) diff --git a/mesmerize_core/algorithms/_utils.py b/mesmerize_core/algorithms/_utils.py index a32f208..99911ac 100644 --- a/mesmerize_core/algorithms/_utils.py +++ b/mesmerize_core/algorithms/_utils.py @@ -9,7 +9,7 @@ import caiman as cm from caiman.cluster import setup_cluster -from caiman.summary_images import local_correlations_movie_offline +from caiman.summary_images import local_correlations from ipyparallel import DirectView from multiprocessing.pool import Pool import numpy as np @@ -113,37 +113,38 @@ def make_chunk_projection(Yr_chunk: np.ndarray, proj_type: str, ignore_nan=False raise NotImplementedError(f"Projection type '{proj_type}' not implemented") -def make_chunk_projection_helper(args: tuple[Union[str, np.ndarray], slice, str, bool]): - Yr, chunk_slice, proj_type, ignore_nan = args - if isinstance(Yr, str): - Yr, _, _ = cm.load_memmap(Yr) +def make_chunk_projection_helper(args: tuple[str, slice, str, bool]): + movie_path, chunk_slice, proj_type, ignore_nan = args + Yr, _, _ = cm.load_memmap(movie_path) return make_chunk_projection(Yr[chunk_slice], proj_type, ignore_nan=ignore_nan) -def make_projection_parallel(movie_or_path: Union[cm.movie, str], proj_type: str, dview: Optional[Cluster], - ignore_nan=False) -> np.ndarray: - if isinstance(movie_or_path, str): - movie_path = movie_or_path - Yr, dims, T = cm.load_memmap(movie_path) - else: - dview = None # avoid doing in parallel if we already know it fits into memory - T = movie_or_path.shape[0] - dims = movie_or_path.shape[1:] - Yr = movie_or_path.reshape((T, -1), order='F').T +def make_projection_parallel(movie_path: str, proj_type: str, dview: Optional[Cluster], ignore_nan=False) -> np.ndarray: + Yr, dims, T = cm.load_memmap(movie_path) + + # use n_pixels_per_process from CNMF to avoid running out of memory + n_pix = Yr.shape[0] + chunk_size = estimate_n_pixels_per_process(get_n_processes(dview), T, dims) + + chunk_starts = range(0, n_pix, chunk_size) + chunk_slices = [ + slice(start, min(start + chunk_size, n_pix)) for start in chunk_starts + ] + args = [ + (movie_path, chunk_slice, proj_type, ignore_nan) + for chunk_slice in chunk_slices + ] if dview is None: - p_img_flat = make_chunk_projection(Yr, proj_type, ignore_nan=ignore_nan) + map_fn = map + elif isinstance(dview, Pool): + map_fn = dview.map else: - # use n_pixels_per_process from CNMF to avoid running out of memory - n_pix = Yr.shape[0] - chunk_size = estimate_n_pixels_per_process(get_n_processes(dview), T, dims) - chunk_starts = range(0, n_pix, chunk_size) - chunk_slices = [slice(start, min(start + chunk_size, n_pix)) for start in chunk_starts] - args = [(movie_path, chunk_slice, proj_type, ignore_nan) for chunk_slice in chunk_slices] - map_fn = dview.map if isinstance(dview, Pool) else dview.map_sync - chunk_projs = map_fn(make_chunk_projection_helper, args) - p_img_flat = np.concatenate(chunk_projs, axis=0) - return np.reshape(p_img_flat, dims, order='F') + map_fn = dview.map_sync + + chunk_projs = map_fn(make_chunk_projection_helper, args) + p_img_flat = np.concatenate(list(chunk_projs), axis=0) + return np.reshape(p_img_flat, dims, order="F") def save_projections_parallel(uuid, movie_path: Union[str, Path], output_dir: Path, dview: Optional[Cluster] @@ -158,22 +159,84 @@ def save_projections_parallel(uuid, movie_path: Union[str, Path], output_dir: Pa return proj_paths -def save_correlation_parallel(uuid, movie_path: Union[str, Path], output_dir: Path, - dims: tuple[int, ...], dview: Optional[Cluster], max_window=1000) -> Path: +ChunkDims = tuple[slice, slice] +ChunkSpec = tuple[ChunkDims, ChunkDims, ChunkDims] # input, output, patch subinds + +def save_correlation_parallel(uuid, movie_path: Union[str, Path], output_dir: Path, dview: Optional[Cluster]) -> Path: """Compute and save local correlations in chunks that are small enough to fit in memory""" - n_processes = get_n_processes(dview) - safe_window = min(max_window, math.floor(avail_bytes_per_process(n_processes) / (8 * np.prod(dims)) * 0.8)) - Cns = local_correlations_movie_offline( - str(movie_path), - remove_baseline=True, - window=safe_window, - stride=safe_window, - winSize_baseline=100, - quantil_min_baseline=10, - dview=dview, - ) - Cn = Cns.max(axis=0) - Cn[np.isnan(Cn)] = 0 + Yr, dims, T = cm.load_memmap(str(movie_path)) + + # use n_pixels_per_process from CNMF to avoid running out of memory + chunk_size = estimate_n_pixels_per_process(get_n_processes(dview), T, dims) + patches = make_correlation_patches(dims, chunk_size) + + # do correlation calculation in parallel + args = [(str(movie_path), p[0]) for p in patches] + if dview is None: + map_fn = map + elif isinstance(dview, Pool): + map_fn = dview.map + else: + map_fn = dview.map_sync + + patch_corrs = map_fn(chunk_correlation_helper, args) + output_img = np.empty(dims, dtype=Yr.dtype) + for (_, output_coords, subinds), patch_corr in zip(patches, patch_corrs): + output_img[output_coords] = patch_corr[subinds] + + # save to file and return path corr_img_path = output_dir.joinpath(f"{uuid}_cn.npy") - np.save(str(corr_img_path), Cn, allow_pickle=False) + np.save(str(corr_img_path), output_img, allow_pickle=False) return corr_img_path + + +def chunk_correlation_helper(args: tuple[str, ChunkDims]) -> np.ndarray: + movie_path, dims_input = args + Yr, dims, T = cm.load_memmap(movie_path) + Yr_3d = np.reshape(Yr, dims + (T,), order="F") + return local_correlations(Yr_3d[dims_input]) + + +def make_correlation_patches(dims: tuple[int, ...], chunk_size: int) -> list[ChunkSpec]: + """ + Compute dimensions for dividing movie (ideally C-order) into patches for correlation calculation. + Overlap = 2 to avoid edge effects except on the edge. + Each entry of the returned list contains 3 (Y, X) tuples of slices: + - input coordinates (for getting sub-movie to compute correlation on) + - output coordinates (for assigning result to full correlation image, excludes inner borders) + - patch sub-indices (to index result for assignment to output) + """ + window_size = math.floor(math.sqrt(chunk_size)) + + # first get patch starts and sizes for each dimension + patch_coords_y = make_correlation_patches_for_dim(dims[0], window_size) + patch_coords_x = make_correlation_patches_for_dim(dims[1], window_size) + return [ + ((input_y, input_x), (output_y, output_x), (subind_y, subind_x)) + for input_y, output_y, subind_y in patch_coords_y + for input_x, output_x, subind_x in patch_coords_x + ] + + +def make_correlation_patches_for_dim(dim: int, window_size: int) -> list[tuple[slice, slice, slice]]: + """ + Like make_correlation_patches but for just one dimension + """ + overlap = 2 # so that edge pixel in one patch is a non-edge pixel in the next + window_size = max(window_size, overlap + 1) + stride = window_size - overlap + + patch_starts = range(0, dim - overlap, stride) # last pixels are covered by last window + patch_ends = [start + window_size for start in patch_starts[:-1]] + [dim] + patch_coords: list[tuple[slice, slice, slice]] = [] + + for start, end in zip(patch_starts, patch_ends): + is_first = start == patch_starts[0] + is_last = start == patch_starts[-1] + patch_coords.append(( + slice(start, end), + slice(start if is_first else start + 1, end if is_last else end-1), + slice(0 if is_first else 1, None if is_last else -1) + )) + + return patch_coords diff --git a/mesmerize_core/algorithms/cnmf.py b/mesmerize_core/algorithms/cnmf.py index d0e36cf..649d58c 100644 --- a/mesmerize_core/algorithms/cnmf.py +++ b/mesmerize_core/algorithms/cnmf.py @@ -85,9 +85,7 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): uuid=uuid, movie_path=cnmf_memmap_path, output_dir=output_dir, - dims=dims, dview=dview, - max_window=1000 ) print("performing CNMF") diff --git a/mesmerize_core/algorithms/mcorr.py b/mesmerize_core/algorithms/mcorr.py index 880ec0a..f72435a 100644 --- a/mesmerize_core/algorithms/mcorr.py +++ b/mesmerize_core/algorithms/mcorr.py @@ -3,7 +3,6 @@ import caiman as cm from caiman.source_extraction.cnmf.params import CNMFParams from caiman.motion_correction import MotionCorrect -from caiman.summary_images import local_correlations_movie_offline import os from pathlib import Path, PurePosixPath import numpy as np @@ -85,9 +84,7 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): uuid=uuid, movie_path=mcorr_memmap_path, output_dir=output_dir, - dims=dims, dview=dview, - max_window=1000 ) print("finished computing correlation image") From bbadaf851270a61d581115f5d86ab4e0ef7b31a3 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Fri, 5 Sep 2025 13:05:54 -0400 Subject: [PATCH 22/31] Decouple correlation computation from saving --- mesmerize_core/algorithms/_utils.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/mesmerize_core/algorithms/_utils.py b/mesmerize_core/algorithms/_utils.py index 99911ac..bf8c6c8 100644 --- a/mesmerize_core/algorithms/_utils.py +++ b/mesmerize_core/algorithms/_utils.py @@ -162,8 +162,8 @@ def save_projections_parallel(uuid, movie_path: Union[str, Path], output_dir: Pa ChunkDims = tuple[slice, slice] ChunkSpec = tuple[ChunkDims, ChunkDims, ChunkDims] # input, output, patch subinds -def save_correlation_parallel(uuid, movie_path: Union[str, Path], output_dir: Path, dview: Optional[Cluster]) -> Path: - """Compute and save local correlations in chunks that are small enough to fit in memory""" +def make_correlation_parallel(movie_path: Union[str, Path], dview: Optional[Cluster]) -> np.ndarray: + """Compute local correlations in chunks that are small enough to fit in memory""" Yr, dims, T = cm.load_memmap(str(movie_path)) # use n_pixels_per_process from CNMF to avoid running out of memory @@ -184,9 +184,14 @@ def save_correlation_parallel(uuid, movie_path: Union[str, Path], output_dir: Pa for (_, output_coords, subinds), patch_corr in zip(patches, patch_corrs): output_img[output_coords] = patch_corr[subinds] - # save to file and return path + return output_img + + +def save_correlation_parallel(uuid, movie_path: Union[str, Path], output_dir: Path, dview: Optional[Cluster]) -> Path: + """Compute and save local correlations in chunks that are small enough to fit in memory""" + corr_img = make_correlation_parallel(movie_path, dview) corr_img_path = output_dir.joinpath(f"{uuid}_cn.npy") - np.save(str(corr_img_path), output_img, allow_pickle=False) + np.save(str(corr_img_path), corr_img, allow_pickle=False) return corr_img_path From 3e619d17856aa003b308d26a97b2b1caed439631 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Fri, 2 May 2025 00:42:56 -0400 Subject: [PATCH 23/31] Support projections/correlation images from any files loadable by caiman --- mesmerize_core/algorithms/_utils.py | 59 ++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 18 deletions(-) diff --git a/mesmerize_core/algorithms/_utils.py b/mesmerize_core/algorithms/_utils.py index bf8c6c8..87d5074 100644 --- a/mesmerize_core/algorithms/_utils.py +++ b/mesmerize_core/algorithms/_utils.py @@ -8,6 +8,7 @@ Callable, TypeVar, Sequence, Iterable, runtime_checkable) import caiman as cm +from caiman.base.movies import get_file_size from caiman.cluster import setup_cluster from caiman.summary_images import local_correlations from ipyparallel import DirectView @@ -113,28 +114,48 @@ def make_chunk_projection(Yr_chunk: np.ndarray, proj_type: str, ignore_nan=False raise NotImplementedError(f"Projection type '{proj_type}' not implemented") -def make_chunk_projection_helper(args: tuple[str, slice, str, bool]): - movie_path, chunk_slice, proj_type, ignore_nan = args - Yr, _, _ = cm.load_memmap(movie_path) - return make_chunk_projection(Yr[chunk_slice], proj_type, ignore_nan=ignore_nan) +def make_chunk_projection_helper(args: tuple[str, slice, Optional[int], str, bool]): + movie_path, chunk_slice, page, proj_type, ignore_nan = args + subindices = (slice(None), slice(None), chunk_slice) + if page is not None: + subindices += (page,) + + mov: cm.movie = cm.load(movie_path, subindices=subindices) + # flatten to pixels x time + Yr = mov.reshape((mov.shape[0], -1), order='F').T + return make_chunk_projection(Yr, proj_type, ignore_nan=ignore_nan) def make_projection_parallel(movie_path: str, proj_type: str, dview: Optional[Cluster], ignore_nan=False) -> np.ndarray: - Yr, dims, T = cm.load_memmap(movie_path) + """ + Compute projection in chunks that are small enough to fit in memory + movie_path: path to movie that can be memory-mapped using caiman.load + """ + dims, T = get_file_size(movie_path) # use n_pixels_per_process from CNMF to avoid running out of memory - n_pix = Yr.shape[0] chunk_size = estimate_n_pixels_per_process(get_n_processes(dview), T, dims) + chunk_columns = max(chunk_size // dims[0], 1) - chunk_starts = range(0, n_pix, chunk_size) + # divide movie into chunks of columns + chunk_starts = range(0, dims[1], chunk_columns) chunk_slices = [ - slice(start, min(start + chunk_size, n_pix)) for start in chunk_starts - ] - args = [ - (movie_path, chunk_slice, proj_type, ignore_nan) - for chunk_slice in chunk_slices + slice(start, min(start + chunk_columns, dims[1])) for start in chunk_starts ] + if len(dims) > 2 and dims[2] > 1: + args = [] + for page in range(dims[2]): + args.extend([ + (movie_path, chunk_slice, page, proj_type, ignore_nan) + for chunk_slice in chunk_slices + ]) + else: + args = [ + (movie_path, chunk_slice, None, proj_type, ignore_nan) + for chunk_slice in chunk_slices + ] + if dview is None: map_fn = map elif isinstance(dview, Pool): @@ -163,8 +184,11 @@ def save_projections_parallel(uuid, movie_path: Union[str, Path], output_dir: Pa ChunkSpec = tuple[ChunkDims, ChunkDims, ChunkDims] # input, output, patch subinds def make_correlation_parallel(movie_path: Union[str, Path], dview: Optional[Cluster]) -> np.ndarray: - """Compute local correlations in chunks that are small enough to fit in memory""" - Yr, dims, T = cm.load_memmap(str(movie_path)) + """ + Compute local correlations in chunks that are small enough to fit in memory + movie_path: path to movie that can be memory-mapped using caiman.load + """ + dims, T = get_file_size(movie_path) # use n_pixels_per_process from CNMF to avoid running out of memory chunk_size = estimate_n_pixels_per_process(get_n_processes(dview), T, dims) @@ -180,7 +204,7 @@ def make_correlation_parallel(movie_path: Union[str, Path], dview: Optional[Clus map_fn = dview.map_sync patch_corrs = map_fn(chunk_correlation_helper, args) - output_img = np.empty(dims, dtype=Yr.dtype) + output_img = np.empty(dims, dtype=np.float32) for (_, output_coords, subinds), patch_corr in zip(patches, patch_corrs): output_img[output_coords] = patch_corr[subinds] @@ -197,9 +221,8 @@ def save_correlation_parallel(uuid, movie_path: Union[str, Path], output_dir: Pa def chunk_correlation_helper(args: tuple[str, ChunkDims]) -> np.ndarray: movie_path, dims_input = args - Yr, dims, T = cm.load_memmap(movie_path) - Yr_3d = np.reshape(Yr, dims + (T,), order="F") - return local_correlations(Yr_3d[dims_input]) + mov = cm.load(movie_path, subindices=(slice(None),) + dims_input) + return local_correlations(mov, swap_dim=False) def make_correlation_patches(dims: tuple[int, ...], chunk_size: int) -> list[ChunkSpec]: From d9e139a70286513a71a507d78be51b489d8d93e1 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Fri, 11 Jul 2025 15:26:11 -0400 Subject: [PATCH 24/31] Make new function for saving C-order mmap and plug in for cnmf and cnmfe --- mesmerize_core/algorithms/_utils.py | 196 ++++++++++++++++++++-------- mesmerize_core/algorithms/cnmf.py | 15 ++- mesmerize_core/algorithms/cnmfe.py | 40 +++--- 3 files changed, 180 insertions(+), 71 deletions(-) diff --git a/mesmerize_core/algorithms/_utils.py b/mesmerize_core/algorithms/_utils.py index 87d5074..094e0fd 100644 --- a/mesmerize_core/algorithms/_utils.py +++ b/mesmerize_core/algorithms/_utils.py @@ -4,12 +4,24 @@ import os from pathlib import Path import psutil -from typing import (Optional, Union, Generator, Protocol, - Callable, TypeVar, Sequence, Iterable, runtime_checkable) +from typing import ( + Optional, + Union, + Generator, + Protocol, + Callable, + Concatenate, + Generic, + TypeVar, + Sequence, + Iterable, + runtime_checkable, +) import caiman as cm from caiman.base.movies import get_file_size from caiman.cluster import setup_cluster +from caiman.paths import generate_fname_tot, fn_relocated from caiman.summary_images import local_correlations from ipyparallel import DirectView from multiprocessing.pool import Pool @@ -99,7 +111,124 @@ def estimate_n_pixels_per_process(n_processes: int, T: int, dims: tuple[int, ... return npx_per_proc -def make_chunk_projection(Yr_chunk: np.ndarray, proj_type: str, ignore_nan=False): +R = TypeVar('R') +class ColumnMappingFunction(Generic[R]): + """ + Object to map an operation over columns of a movie to avoid running out of memory + Construct with the kernel function which takes pixels x time matrices and returns anything. + """ + def __init__(self, kernel: Callable[Concatenate[np.ndarray, slice, ...], R]): + self.kernel = kernel + + def _helper(self, args: tuple) -> R: + movie_path: str = args[0] + var_name_hdf5: str = args[1] + col_slice: slice = args[2] + page: Optional[int] = args[3] + + subindices = (slice(None), slice(None), col_slice) + if page is not None: + subindices += (page,) + + mov: cm.movie = cm.load(movie_path, subindices=subindices, var_name_hdf5=var_name_hdf5) + T, *dims = mov.shape + + # flatten to pixels x time + Yr = mov.reshape((T, -1), order='F').T + + # get slice of flattened pixels + page_offset = page * dims[0] * dims[1] if page else 0 + pixel_slice = slice(page_offset + col_slice.start * dims[0], page_offset + col_slice.stop * dims[0]) + + # apply the kernel + return self.kernel(Yr, pixel_slice, *args[4:]) + + def __call__(self, movie_path: str, dview: Optional[Cluster], var_name_hdf5='mov', *args) -> Iterable[R]: + """Perform an operation on non-overlapping chunks of columns that fit into memory""" + dims, T = get_file_size(movie_path, var_name_hdf5=var_name_hdf5) + assert isinstance(T, int) # non-type-stable interface... + + # use n_pixels_per_process from CNMF to avoid running out of memory + chunk_size = estimate_n_pixels_per_process(get_n_processes(dview), T, dims) + chunk_columns = max(chunk_size // dims[0], 1) + + # divide movie into chunks of columns + chunk_starts = range(0, dims[1], chunk_columns) + chunk_slices = [ + slice(start, min(start + chunk_columns, dims[1])) for start in chunk_starts + ] + + if len(dims) > 2 and dims[2] > 1: + map_args = [] + for page in range(dims[2]): + map_args.extend([ + (movie_path, var_name_hdf5, chunk_slice, page) + tuple(args) + for chunk_slice in chunk_slices + ]) + else: + map_args = [ + (movie_path, var_name_hdf5, chunk_slice, None) + tuple(args) + for chunk_slice in chunk_slices + ] + + if dview is None: + map_fn = map + elif isinstance(dview, Pool): + map_fn = dview.map + else: + map_fn = dview.map_sync + + return map_fn(self._helper, map_args) + + +def _save_c_order_mmap_in_chunks_kernel(Yr_chunk: np.ndarray, pixel_slice: slice, mmap_fname: str): + """ + Alternative to cm.save_memmap that can load from non-mmap files and uses chunks + Based on caiman.mmapping.save_portion + """ + c_order_copy = np.ascontiguousarray(Yr_chunk, dtype=np.float32) # pixels x time + tot_frames = c_order_copy.shape[1] + + with open(mmap_fname, 'r+b') as f: + # seek to the start of the chunk + idx_start, idx_end = pixel_slice.start, pixel_slice.stop + f.seek(idx_start * c_order_copy.dtype.itemsize * tot_frames) + f.write(c_order_copy.data) + computed_position = np.uint64(idx_end * np.uint64(c_order_copy.dtype.itemsize) * tot_frames) + if f.tell() != computed_position: + logging.critical(f"Error in mmap portion write: at position {f.tell()}") + logging.critical( + f"But should be at position {idx_end} * {c_order_copy.dtype.itemsize} * {tot_frames} = {computed_position}" + ) + f.close() + raise Exception('Internal error in mmapping: Actual position does not match computed position') +save_c_order_mmap_in_chunks = ColumnMappingFunction(_save_c_order_mmap_in_chunks_kernel) + +def save_c_order_mmap_parallel(movie_path: str, base_name: str, dview: Optional[Cluster], var_name_hdf5='mov') -> str: + """Alternative to cm.save_memmap that hopefully does better with memory""" + # get name of mmap file and create it + dims, tot_frames = get_file_size(movie_path, var_name_hdf5=var_name_hdf5) + assert isinstance(tot_frames, int) # non-type-stable interface... + + # use generate_fname_tot to emulate behavior of save_memmap + mmap_fname_start = generate_fname_tot(base_name, list(dims), order='C') + mmap_fname = f'{mmap_fname_start}_frames_{tot_frames}.mmap' + mmap_fname = os.path.join(os.path.split(movie_path)[0], mmap_fname) + mmap_fname = fn_relocated(mmap_fname) + logging.info(f'Creating mmap file: {mmap_fname}') + + d = int(np.prod(dims)) + big_mov = np.memmap(mmap_fname, mode='w+', dtype=np.float32, shape=(d, tot_frames), order='C') + + # parallel load/save call + save_c_order_mmap_in_chunks(movie_path, dview, var_name_hdf5, mmap_fname) + + # clean up + del big_mov + return mmap_fname + + +def _make_chunk_projections_kernel(Yr_chunk: np.ndarray, _pixel_slice: slice, proj_type: str, ignore_nan=False) -> np.ndarray: if hasattr(scipy.stats, proj_type): return getattr(scipy.stats, proj_type)(Yr_chunk, axis=1, nan_policy='omit' if ignore_nan else 'propagate') @@ -112,67 +241,29 @@ def make_chunk_projection(Yr_chunk: np.ndarray, proj_type: str, ignore_nan=False return getattr(np, proj_type)(Yr_chunk, axis=1) raise NotImplementedError(f"Projection type '{proj_type}' not implemented") +make_chunk_projections = ColumnMappingFunction(_make_chunk_projections_kernel) -def make_chunk_projection_helper(args: tuple[str, slice, Optional[int], str, bool]): - movie_path, chunk_slice, page, proj_type, ignore_nan = args - subindices = (slice(None), slice(None), chunk_slice) - if page is not None: - subindices += (page,) - - mov: cm.movie = cm.load(movie_path, subindices=subindices) - # flatten to pixels x time - Yr = mov.reshape((mov.shape[0], -1), order='F').T - return make_chunk_projection(Yr, proj_type, ignore_nan=ignore_nan) - - -def make_projection_parallel(movie_path: str, proj_type: str, dview: Optional[Cluster], ignore_nan=False) -> np.ndarray: +def make_projection_parallel(movie_path: str, proj_type: str, dview: Optional[Cluster], ignore_nan=False, + var_name_hdf5='mov') -> np.ndarray: """ Compute projection in chunks that are small enough to fit in memory movie_path: path to movie that can be memory-mapped using caiman.load """ - dims, T = get_file_size(movie_path) - - # use n_pixels_per_process from CNMF to avoid running out of memory - chunk_size = estimate_n_pixels_per_process(get_n_processes(dview), T, dims) - chunk_columns = max(chunk_size // dims[0], 1) - - # divide movie into chunks of columns - chunk_starts = range(0, dims[1], chunk_columns) - chunk_slices = [ - slice(start, min(start + chunk_columns, dims[1])) for start in chunk_starts - ] - - if len(dims) > 2 and dims[2] > 1: - args = [] - for page in range(dims[2]): - args.extend([ - (movie_path, chunk_slice, page, proj_type, ignore_nan) - for chunk_slice in chunk_slices - ]) - else: - args = [ - (movie_path, chunk_slice, None, proj_type, ignore_nan) - for chunk_slice in chunk_slices - ] - - if dview is None: - map_fn = map - elif isinstance(dview, Pool): - map_fn = dview.map - else: - map_fn = dview.map_sync - - chunk_projs = map_fn(make_chunk_projection_helper, args) + dims, _ = get_file_size(movie_path, var_name_hdf5=var_name_hdf5) + chunk_projs = make_chunk_projections(movie_path, dview, var_name_hdf5, proj_type, ignore_nan) p_img_flat = np.concatenate(list(chunk_projs), axis=0) return np.reshape(p_img_flat, dims, order="F") -def save_projections_parallel(uuid, movie_path: Union[str, Path], output_dir: Path, dview: Optional[Cluster] - ) -> dict[str, Path]: +def save_projections_parallel( + uuid, movie_path: Union[str, Path], output_dir: Path, dview: Optional[Cluster], var_name_hdf5='mov' +) -> dict[str, Path]: proj_paths = dict() for proj_type in ["mean", "std", "max"]: - p_img = make_projection_parallel(str(movie_path), proj_type, dview=dview, ignore_nan=True) + p_img = make_projection_parallel( + str(movie_path), proj_type, dview=dview, ignore_nan=True, var_name_hdf5=var_name_hdf5 + ) proj_paths[proj_type] = output_dir.joinpath( f"{uuid}_{proj_type}_projection.npy" ) @@ -189,6 +280,7 @@ def make_correlation_parallel(movie_path: Union[str, Path], dview: Optional[Clus movie_path: path to movie that can be memory-mapped using caiman.load """ dims, T = get_file_size(movie_path) + assert isinstance(T, int) # non-type-stable interface... # use n_pixels_per_process from CNMF to avoid running out of memory chunk_size = estimate_n_pixels_per_process(get_n_processes(dview), T, dims) diff --git a/mesmerize_core/algorithms/cnmf.py b/mesmerize_core/algorithms/cnmf.py index 649d58c..fbb5c60 100644 --- a/mesmerize_core/algorithms/cnmf.py +++ b/mesmerize_core/algorithms/cnmf.py @@ -19,11 +19,17 @@ ensure_server, save_projections_parallel, save_correlation_parallel, + save_c_order_mmap_parallel, ) else: # when running with local backend from ..batch_utils import set_parent_raw_data_path, load_batch from ..utils import IS_WINDOWS - from ._utils import ensure_server, save_projections_parallel, save_correlation_parallel + from ._utils import ( + ensure_server, + save_projections_parallel, + save_correlation_parallel, + save_c_order_mmap_parallel, + ) def run_algo(batch_path, uuid, data_path: str = None, dview=None): @@ -61,8 +67,11 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): if save_new_mmap: print("making memmap") - fname_new = cm.save_memmap( - [input_movie_path], base_name=f"{uuid}_cnmf-memmap_", order="C", dview=dview + fname_new = save_c_order_mmap_parallel( + input_movie_path, + base_name=f"{uuid}_cnmf-memmap_", + dview=dview, + var_name_hdf5=cnmf_params.data['var_name_hdf5'] ) cnmf_memmap_path = output_dir.joinpath(Path(fname_new).name) move_file(fname_new, cnmf_memmap_path) diff --git a/mesmerize_core/algorithms/cnmfe.py b/mesmerize_core/algorithms/cnmfe.py index 83adeed..6812fd6 100644 --- a/mesmerize_core/algorithms/cnmfe.py +++ b/mesmerize_core/algorithms/cnmfe.py @@ -11,11 +11,15 @@ if __name__ in ["__main__", "__mp_main__"]: # when running in subprocess from mesmerize_core import set_parent_raw_data_path, load_batch from mesmerize_core.utils import IS_WINDOWS - from mesmerize_core.algorithms._utils import ensure_server, save_projections_parallel + from mesmerize_core.algorithms._utils import ( + ensure_server, + save_projections_parallel, + save_c_order_mmap_parallel, + ) else: # when running with local backend from ..batch_utils import set_parent_raw_data_path, load_batch from ..utils import IS_WINDOWS - from ._utils import ensure_server, save_projections_parallel + from ._utils import ensure_server, save_projections_parallel, save_c_order_mmap_parallel def run_algo(batch_path, uuid, data_path: str = None, dview=None): @@ -37,8 +41,24 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): with ensure_server(dview) as (dview, n_processes): try: - fname_new = cm.save_memmap( - [input_movie_path], base_name=f"{uuid}_cnmf-memmap_", order="C", dview=dview + # force the CNMFE params + cnmfe_params_dict = { + "method_init": "corr_pnr", + "n_processes": n_processes, + "only_init": True, # for 1p + "center_psf": True, # for 1p + "normalize_init": False, # for 1p + } + + params_dict = {**cnmfe_params_dict, **params["main"]} + + cnmfe_params_dict = CNMFParams(params_dict=params_dict) + + fname_new = save_c_order_mmap_parallel( + input_movie_path, + base_name=f"{uuid}_cnmf-memmap_", + dview=dview, + var_name_hdf5=cnmfe_params_dict.data['var_name_hdf5'] ) print("making memmap") @@ -53,18 +73,6 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): d = dict() # for output - # force the CNMFE params - cnmfe_params_dict = { - "method_init": "corr_pnr", - "n_processes": n_processes, - "only_init": True, # for 1p - "center_psf": True, # for 1p - "normalize_init": False, # for 1p - } - - params_dict = {**cnmfe_params_dict, **params["main"]} - - cnmfe_params_dict = CNMFParams(params_dict=params_dict) cnm = cnmf.CNMF(n_processes=n_processes, dview=dview, params=cnmfe_params_dict) print("Performing CNMFE") cnm.fit(images) From 0657633298a2e1214f9c1be971d3a02ad0de375d Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Fri, 18 Jul 2025 11:03:49 -0400 Subject: [PATCH 25/31] Don't re-save if input is C-order mmap for CNMFE --- mesmerize_core/algorithms/_utils.py | 49 +++++++++++++++++------------ mesmerize_core/algorithms/cnmfe.py | 46 +++++++++++++++++++-------- 2 files changed, 62 insertions(+), 33 deletions(-) diff --git a/mesmerize_core/algorithms/_utils.py b/mesmerize_core/algorithms/_utils.py index 094e0fd..5dd50e0 100644 --- a/mesmerize_core/algorithms/_utils.py +++ b/mesmerize_core/algorithms/_utils.py @@ -1,4 +1,5 @@ from contextlib import contextmanager +from itertools import pairwise import logging import math import os @@ -123,12 +124,16 @@ def __init__(self, kernel: Callable[Concatenate[np.ndarray, slice, ...], R]): def _helper(self, args: tuple) -> R: movie_path: str = args[0] var_name_hdf5: str = args[1] - col_slice: slice = args[2] - page: Optional[int] = args[3] + row_slice: slice = args[2] + col_slice: slice = args[3] + page: Optional[int] = args[4] - subindices = (slice(None), slice(None), col_slice) + subindices = (slice(None), row_slice, col_slice) if page is not None: subindices += (page,) + logging.debug(f'In column mapping kernel, page = {page}, cols = {col_slice.start} to {col_slice.stop}') + else: + logging.debug(f'In column mapping kernel, cols = {col_slice.start} to {col_slice.stop}') mov: cm.movie = cm.load(movie_path, subindices=subindices, var_name_hdf5=var_name_hdf5) T, *dims = mov.shape @@ -138,10 +143,11 @@ def _helper(self, args: tuple) -> R: # get slice of flattened pixels page_offset = page * dims[0] * dims[1] if page else 0 - pixel_slice = slice(page_offset + col_slice.start * dims[0], page_offset + col_slice.stop * dims[0]) + pixel_slice = slice(page_offset + col_slice.start * dims[0] + row_slice.start, + page_offset + (col_slice.stop-1) * dims[0] + row_slice.stop) # apply the kernel - return self.kernel(Yr, pixel_slice, *args[4:]) + return self.kernel(Yr, pixel_slice, *args[5:]) def __call__(self, movie_path: str, dview: Optional[Cluster], var_name_hdf5='mov', *args) -> Iterable[R]: """Perform an operation on non-overlapping chunks of columns that fit into memory""" @@ -150,26 +156,29 @@ def __call__(self, movie_path: str, dview: Optional[Cluster], var_name_hdf5='mov # use n_pixels_per_process from CNMF to avoid running out of memory chunk_size = estimate_n_pixels_per_process(get_n_processes(dview), T, dims) - chunk_columns = max(chunk_size // dims[0], 1) + n_chunks = math.ceil(dims[0] * dims[1] / chunk_size) + n_column_chunks = min(math.ceil(dims[0] * dims[1] / chunk_size), dims[1]) # divide movie into chunks of columns - chunk_starts = range(0, dims[1], chunk_columns) - chunk_slices = [ - slice(start, min(start + chunk_columns, dims[1])) for start in chunk_starts - ] + chunk_col_edges = np.linspace(0, dims[1], n_column_chunks+1).astype(int) + chunk_col_slices = [slice(start, end) for start, end in pairwise(chunk_col_edges)] + + if (n_row_chunks := math.ceil(n_chunks / n_column_chunks)) > 1: + # subdivide rows as well + chunk_row_edges = np.linspace(0, dims[0], n_row_chunks+1).astype(int) + chunk_row_slices = [slice(start, end) for start, end in pairwise(chunk_row_edges)] + else: + chunk_row_slices = [slice(0, dims[0])] if len(dims) > 2 and dims[2] > 1: - map_args = [] - for page in range(dims[2]): - map_args.extend([ - (movie_path, var_name_hdf5, chunk_slice, page) + tuple(args) - for chunk_slice in chunk_slices - ]) + pages = range(dims[2]) else: - map_args = [ - (movie_path, var_name_hdf5, chunk_slice, None) + tuple(args) - for chunk_slice in chunk_slices - ] + pages = [None] + + map_args = [ + (movie_path, var_name_hdf5, row_slice, col_slice, page) + args + for page in pages for col_slice in chunk_col_slices for row_slice in chunk_row_slices + ] if dview is None: map_fn = map diff --git a/mesmerize_core/algorithms/cnmfe.py b/mesmerize_core/algorithms/cnmfe.py index 6812fd6..6f0a927 100644 --- a/mesmerize_core/algorithms/cnmfe.py +++ b/mesmerize_core/algorithms/cnmfe.py @@ -3,6 +3,8 @@ import caiman as cm from caiman.source_extraction.cnmf import cnmf as cnmf from caiman.source_extraction.cnmf.params import CNMFParams +from caiman.paths import decode_mmap_filename_dict +from caiman.base.movies import get_file_size import traceback from pathlib import Path, PurePosixPath from shutil import move as move_file @@ -15,11 +17,12 @@ ensure_server, save_projections_parallel, save_c_order_mmap_parallel, + estimate_n_pixels_per_process ) else: # when running with local backend from ..batch_utils import set_parent_raw_data_path, load_batch from ..utils import IS_WINDOWS - from ._utils import ensure_server, save_projections_parallel, save_c_order_mmap_parallel + from ._utils import ensure_server, save_projections_parallel, save_c_order_mmap_parallel, estimate_n_pixels_per_process def run_algo(batch_path, uuid, data_path: str = None, dview=None): @@ -54,21 +57,40 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): cnmfe_params_dict = CNMFParams(params_dict=params_dict) - fname_new = save_c_order_mmap_parallel( - input_movie_path, - base_name=f"{uuid}_cnmf-memmap_", - dview=dview, - var_name_hdf5=cnmfe_params_dict.data['var_name_hdf5'] - ) + # only re-save memmap if necessary + save_new_mmap = True + if Path(input_movie_path).suffix == ".mmap": + mmap_info = decode_mmap_filename_dict(input_movie_path) + save_new_mmap = "order" not in mmap_info or mmap_info["order"] != "C" + + if save_new_mmap: + print("making memmap") + dims, T = get_file_size(input_movie_path, var_name_hdf5=cnmfe_params_dict.data['var_name_hdf5']) + assert isinstance(T, int) + print('Movie dims:', dims) + print('N frames:', T) + print('N processes:', n_processes) + print('N pixels per process:', chunk_size := estimate_n_pixels_per_process(n_processes, T, dims)) + print('Columns per chunk:', max(chunk_size // dims[0], 1)) + breakpoint() + fname_new = save_c_order_mmap_parallel( + input_movie_path, + base_name=f"{uuid}_cnmf-memmap_", + dview=dview, + var_name_hdf5=cnmfe_params_dict.data['var_name_hdf5'] + ) + cnmf_memmap_path = output_dir.joinpath(Path(fname_new).name) + move_file(fname_new, cnmf_memmap_path) + else: + cnmf_memmap_path = Path(input_movie_path) - print("making memmap") - Yr, dims, T = cm.load_memmap(fname_new) + Yr, dims, T = cm.load_memmap(str(cnmf_memmap_path)) images = np.reshape(Yr.T, [T] + list(dims), order="F") # TODO: if projections already exist from mcorr we don't # need to waste compute time re-computing them here proj_paths = save_projections_parallel( - uuid=uuid, movie_path=fname_new, output_dir=output_dir, dview=dview + uuid=uuid, movie_path=cnmf_memmap_path, output_dir=output_dir, dview=dview ) d = dict() # for output @@ -90,14 +112,12 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): output_dir.parent ) - cnmf_memmap_path = output_dir.joinpath(Path(fname_new).name) if IS_WINDOWS: Yr._mmap.close() # accessing private attr but windows is annoying otherwise - move_file(fname_new, cnmf_memmap_path) # save path as relative path strings with forward slashes cnmfe_memmap_path = str( - PurePosixPath(cnmf_memmap_path.relative_to(output_dir.parent)) + PurePosixPath(df.paths.split(cnmf_memmap_path)[1]) ) d.update( From 6f82bd520061e58bc8586d0339d6334e287d60a4 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Thu, 14 Aug 2025 14:17:39 -0400 Subject: [PATCH 26/31] Type checking fix, get rid of Concatenate --- mesmerize_core/algorithms/_utils.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mesmerize_core/algorithms/_utils.py b/mesmerize_core/algorithms/_utils.py index 5dd50e0..a1e5c87 100644 --- a/mesmerize_core/algorithms/_utils.py +++ b/mesmerize_core/algorithms/_utils.py @@ -11,7 +11,6 @@ Generator, Protocol, Callable, - Concatenate, Generic, TypeVar, Sequence, @@ -118,7 +117,7 @@ class ColumnMappingFunction(Generic[R]): Object to map an operation over columns of a movie to avoid running out of memory Construct with the kernel function which takes pixels x time matrices and returns anything. """ - def __init__(self, kernel: Callable[Concatenate[np.ndarray, slice, ...], R]): + def __init__(self, kernel: Callable[..., R]): self.kernel = kernel def _helper(self, args: tuple) -> R: From cf92fca7b478dd84945a01d144d60fdf56285959 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Tue, 30 Sep 2025 14:06:58 -0400 Subject: [PATCH 27/31] Clean up print statements in cnmfe --- mesmerize_core/algorithms/cnmfe.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/mesmerize_core/algorithms/cnmfe.py b/mesmerize_core/algorithms/cnmfe.py index 6f0a927..04da26e 100644 --- a/mesmerize_core/algorithms/cnmfe.py +++ b/mesmerize_core/algorithms/cnmfe.py @@ -4,7 +4,6 @@ from caiman.source_extraction.cnmf import cnmf as cnmf from caiman.source_extraction.cnmf.params import CNMFParams from caiman.paths import decode_mmap_filename_dict -from caiman.base.movies import get_file_size import traceback from pathlib import Path, PurePosixPath from shutil import move as move_file @@ -17,12 +16,11 @@ ensure_server, save_projections_parallel, save_c_order_mmap_parallel, - estimate_n_pixels_per_process ) else: # when running with local backend from ..batch_utils import set_parent_raw_data_path, load_batch from ..utils import IS_WINDOWS - from ._utils import ensure_server, save_projections_parallel, save_c_order_mmap_parallel, estimate_n_pixels_per_process + from ._utils import ensure_server, save_projections_parallel, save_c_order_mmap_parallel def run_algo(batch_path, uuid, data_path: str = None, dview=None): @@ -65,14 +63,6 @@ def run_algo(batch_path, uuid, data_path: str = None, dview=None): if save_new_mmap: print("making memmap") - dims, T = get_file_size(input_movie_path, var_name_hdf5=cnmfe_params_dict.data['var_name_hdf5']) - assert isinstance(T, int) - print('Movie dims:', dims) - print('N frames:', T) - print('N processes:', n_processes) - print('N pixels per process:', chunk_size := estimate_n_pixels_per_process(n_processes, T, dims)) - print('Columns per chunk:', max(chunk_size // dims[0], 1)) - breakpoint() fname_new = save_c_order_mmap_parallel( input_movie_path, base_name=f"{uuid}_cnmf-memmap_", From 8087df210ffc227cc9415390d703711601ea01d3 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Thu, 2 Oct 2025 14:00:23 -0400 Subject: [PATCH 28/31] Add 0.0001 constant add_to_movie to match save_memmap --- mesmerize_core/algorithms/_utils.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/mesmerize_core/algorithms/_utils.py b/mesmerize_core/algorithms/_utils.py index a1e5c87..01b2833 100644 --- a/mesmerize_core/algorithms/_utils.py +++ b/mesmerize_core/algorithms/_utils.py @@ -189,12 +189,12 @@ def __call__(self, movie_path: str, dview: Optional[Cluster], var_name_hdf5='mov return map_fn(self._helper, map_args) -def _save_c_order_mmap_in_chunks_kernel(Yr_chunk: np.ndarray, pixel_slice: slice, mmap_fname: str): +def _save_c_order_mmap_in_chunks_kernel(Yr_chunk: np.ndarray, pixel_slice: slice, mmap_fname: str, add_to_movie: float): """ Alternative to cm.save_memmap that can load from non-mmap files and uses chunks Based on caiman.mmapping.save_portion """ - c_order_copy = np.ascontiguousarray(Yr_chunk, dtype=np.float32) # pixels x time + c_order_copy = np.ascontiguousarray(Yr_chunk, dtype=np.float32) + np.float32(add_to_movie) # pixels x time tot_frames = c_order_copy.shape[1] with open(mmap_fname, 'r+b') as f: @@ -212,8 +212,12 @@ def _save_c_order_mmap_in_chunks_kernel(Yr_chunk: np.ndarray, pixel_slice: slice raise Exception('Internal error in mmapping: Actual position does not match computed position') save_c_order_mmap_in_chunks = ColumnMappingFunction(_save_c_order_mmap_in_chunks_kernel) -def save_c_order_mmap_parallel(movie_path: str, base_name: str, dview: Optional[Cluster], var_name_hdf5='mov') -> str: - """Alternative to cm.save_memmap that hopefully does better with memory""" +def save_c_order_mmap_parallel(movie_path: str, base_name: str, dview: Optional[Cluster], + var_name_hdf5='mov', add_to_movie=0.0001) -> str: + """ + Alternative to cm.save_memmap that hopefully does better with memory + add_to_movie=0.0001 emulates default behavior of save_memmap + """ # get name of mmap file and create it dims, tot_frames = get_file_size(movie_path, var_name_hdf5=var_name_hdf5) assert isinstance(tot_frames, int) # non-type-stable interface... @@ -229,7 +233,7 @@ def save_c_order_mmap_parallel(movie_path: str, base_name: str, dview: Optional[ big_mov = np.memmap(mmap_fname, mode='w+', dtype=np.float32, shape=(d, tot_frames), order='C') # parallel load/save call - save_c_order_mmap_in_chunks(movie_path, dview, var_name_hdf5, mmap_fname) + save_c_order_mmap_in_chunks(movie_path, dview, var_name_hdf5, mmap_fname, add_to_movie) # clean up del big_mov From 46b4e14d38a39e1660208e1834bee5cb4812b505 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Thu, 2 Oct 2025 14:33:03 -0400 Subject: [PATCH 29/31] Update Zenodo URL for correlation ground truth change --- tests/test_core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_core.py b/tests/test_core.py index 8b8a920..d977eb0 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -53,7 +53,7 @@ def _download_ground_truths(): print(f"Downloading ground truths") - url = f"https://zenodo.org/record/14934525/files/ground_truths.zip" + url = f"https://zenodo.org/record/17253218/files/ground_truths.zip" # basically from https://stackoverflow.com/questions/37573483/progress-bar-while-download-file-over-http-with-requests/37573701 response = requests.get(url, stream=True) From 89a906c10336e5f42231510026aa58bac0dfb1a5 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Thu, 2 Oct 2025 14:51:25 -0400 Subject: [PATCH 30/31] Remove pairwise for Python 3.9 --- mesmerize_core/algorithms/_utils.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/mesmerize_core/algorithms/_utils.py b/mesmerize_core/algorithms/_utils.py index 01b2833..ce8e06b 100644 --- a/mesmerize_core/algorithms/_utils.py +++ b/mesmerize_core/algorithms/_utils.py @@ -1,5 +1,4 @@ from contextlib import contextmanager -from itertools import pairwise import logging import math import os @@ -160,12 +159,12 @@ def __call__(self, movie_path: str, dview: Optional[Cluster], var_name_hdf5='mov # divide movie into chunks of columns chunk_col_edges = np.linspace(0, dims[1], n_column_chunks+1).astype(int) - chunk_col_slices = [slice(start, end) for start, end in pairwise(chunk_col_edges)] + chunk_col_slices = [slice(start, end) for start, end in zip(chunk_col_edges[:-1], chunk_col_edges[1:])] if (n_row_chunks := math.ceil(n_chunks / n_column_chunks)) > 1: # subdivide rows as well chunk_row_edges = np.linspace(0, dims[0], n_row_chunks+1).astype(int) - chunk_row_slices = [slice(start, end) for start, end in pairwise(chunk_row_edges)] + chunk_row_slices = [slice(start, end) for start, end in zip(chunk_row_edges[:-1], chunk_row_edges[1:])] else: chunk_row_slices = [slice(0, dims[0])] From 8abdb375fdaf1f2368000cbf8b50997cdf0f2618 Mon Sep 17 00:00:00 2001 From: Ethan Blackwood Date: Thu, 2 Oct 2025 15:44:14 -0400 Subject: [PATCH 31/31] Add fix_multid_subindices to deal with caiman.load weirdness --- mesmerize_core/algorithms/_utils.py | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/mesmerize_core/algorithms/_utils.py b/mesmerize_core/algorithms/_utils.py index ce8e06b..21b50b6 100644 --- a/mesmerize_core/algorithms/_utils.py +++ b/mesmerize_core/algorithms/_utils.py @@ -110,6 +110,20 @@ def estimate_n_pixels_per_process(n_processes: int, T: int, dims: tuple[int, ... return npx_per_proc +def fix_multid_subindices(movie_path: str, subindices: Union[list, tuple]) -> Union[list, tuple]: + """ + Make multidimensional subindices that work for the given file type for caiman.load, given that + some file types expect subindices as a list and others don't explicitly support multi-D subindices + and therefore they must be passed as a tuple to work correctly. + """ + _, ext = os.path.splitext(movie_path) + if ext in ['.tif', '.tiff', '.btf', '.avi', '.mkv']: + # formats that expect multi-D subindices as a list + return list(subindices) + else: + return tuple(subindices) + + R = TypeVar('R') class ColumnMappingFunction(Generic[R]): """ @@ -133,7 +147,7 @@ def _helper(self, args: tuple) -> R: else: logging.debug(f'In column mapping kernel, cols = {col_slice.start} to {col_slice.stop}') - mov: cm.movie = cm.load(movie_path, subindices=subindices, var_name_hdf5=var_name_hdf5) + mov: cm.movie = cm.load(movie_path, subindices=fix_multid_subindices(movie_path, subindices), var_name_hdf5=var_name_hdf5) T, *dims = mov.shape # flatten to pixels x time @@ -324,7 +338,7 @@ def save_correlation_parallel(uuid, movie_path: Union[str, Path], output_dir: Pa def chunk_correlation_helper(args: tuple[str, ChunkDims]) -> np.ndarray: movie_path, dims_input = args - mov = cm.load(movie_path, subindices=(slice(None),) + dims_input) + mov = cm.load(movie_path, subindices=fix_multid_subindices(movie_path, (slice(None),) + dims_input)) return local_correlations(mov, swap_dim=False)