diff --git a/bin/utility_extract_data.py b/bin/utility_extract_preview_data.py similarity index 100% rename from bin/utility_extract_data.py rename to bin/utility_extract_preview_data.py diff --git a/modules/local/baysor/create_dataset/resources/usr/bin/create_dataset.py b/modules/local/baysor/create_dataset/resources/usr/bin/create_dataset.py deleted file mode 100755 index 4e5a263a..00000000 --- a/modules/local/baysor/create_dataset/resources/usr/bin/create_dataset.py +++ /dev/null @@ -1,96 +0,0 @@ -#!/usr/bin/env python3 -""" -Create a sampled dataset for Baysor preview mode. - -Reads a CSV transcript file and randomly samples a fraction of rows, -writing the result to a new CSV file. -""" - -import argparse -import csv -import os -import random -from pathlib import Path - - -class BaysorPreview(): - """ - Utility class to generate baysor preview dataset - """ - @staticmethod - def generate_dataset( - transcripts: Path, - sampled_transcripts: Path, - sample_fraction: float = 0.3, - random_state: int = 42, - prefix: str = "" - ) -> None: - """ - Reads a csv file & randomly samples a fraction of rows, - and writes the result to a .csv file. - - Args: - transcripts: unziped transcripts.csv from xenium bundle - sampled_transcripts: randomly subsampled transcripts.csv file - sample_fraction: Fraction of rows to sample - random_state: Seed for reproducibility - prefix: Output directory prefix - """ - - random.seed(random_state) - output_path = f"{prefix}/{sampled_transcripts}" - os.makedirs(os.path.dirname(output_path), exist_ok=True) - with open(transcripts, mode='rt', newline='') as infile, \ - open(output_path, mode='wt', newline='') as outfile: - - reader = csv.reader(infile) - writer = csv.writer(outfile) - - # get the header line - header = next(reader) - writer.writerow(header) - - # randomize csv rows to write - for row in reader: - if random.random() < float(sample_fraction): - writer.writerow(row) - - return None - - -def main() -> None: - """ - Run create dataset as nf module - """ - parser = argparse.ArgumentParser( - description="Create sampled dataset for Baysor preview" - ) - parser.add_argument( - "--transcripts", required=True, - help="Path to transcripts CSV file" - ) - parser.add_argument( - "--sample-fraction", required=True, type=float, - help="Fraction of rows to sample" - ) - parser.add_argument( - "--prefix", required=True, - help="Output directory prefix" - ) - args = parser.parse_args() - - sampled_transcripts = "sampled_transcripts.csv" - - # generate dataset - BaysorPreview.generate_dataset( - transcripts=args.transcripts, - sampled_transcripts=sampled_transcripts, - sample_fraction=args.sample_fraction, - prefix=args.prefix - ) - - return None - - -if __name__ == "__main__": - main() diff --git a/modules/local/baysor/preprocess/resources/usr/bin/preprocess_transcripts.py b/modules/local/baysor/preprocess/resources/usr/bin/preprocess_transcripts.py deleted file mode 100755 index 2662f83c..00000000 --- a/modules/local/baysor/preprocess/resources/usr/bin/preprocess_transcripts.py +++ /dev/null @@ -1,126 +0,0 @@ -#!/usr/bin/env python3 -""" -Preprocess Xenium transcripts for Baysor segmentation. - -Filters transcripts based on quality score and spatial coordinate thresholds, -removes negative control probes, and outputs filtered CSV for Baysor compatibility. -""" - -import argparse -import os - -import pandas as pd - - -def filter_transcripts( - transcripts: str, - min_qv: float = 20.0, - min_x: float = 0.0, - max_x: float = 24000.0, - min_y: float = 0.0, - max_y: float = 24000.0, - prefix: str = "", -) -> None: - """ - Filter transcripts based on the specified thresholds. - - Args: - transcripts: Path to transcripts parquet file - min_qv: Minimum Q-Score to pass filtering - min_x: Minimum x-coordinate threshold - max_x: Maximum x-coordinate threshold - min_y: Minimum y-coordinate threshold - max_y: Maximum y-coordinate threshold - prefix: Output directory prefix - """ - df = pd.read_parquet(transcripts, engine="pyarrow") - - # filter transcripts df with thresholds, ignore negative controls - filtered_df = df[ - (df["qv"] >= min_qv) - & (df["x_location"] >= min_x) - & (df["x_location"] <= max_x) - & (df["y_location"] >= min_y) - & (df["y_location"] <= max_y) - & (~df["feature_name"].str.startswith("NegControlProbe_")) - & (~df["feature_name"].str.startswith("antisense_")) - & (~df["feature_name"].str.startswith("NegControlCodeword_")) - & (~df["feature_name"].str.startswith("BLANK_")) - ] - - # change cell_id of cell-free transcripts to "0" (Baysor's no-cell sentinel). - # Modern Xenium stores cell_id as a string ("UNASSIGNED" for cell-free transcripts); - # legacy Xenium used integer -1. Normalize to string and handle both cases — pandas 3 - # rejects mixing int values into a string-dtype column. - filtered_df["cell_id"] = filtered_df["cell_id"].astype(str) - neg_cell_row = filtered_df["cell_id"].isin(["-1", "UNASSIGNED"]) - filtered_df.loc[neg_cell_row, "cell_id"] = "0" - - # Output filtered transcripts as CSV for Baysor 0.7.1 compatibility. - # Baysor's Julia Parquet.jl cannot read modern pyarrow Parquet files - # (pyarrow 15+ writes size_statistics Thrift field 16 unconditionally, - # which Baysor's old Thrift deserializer doesn't recognize). - os.makedirs(prefix, exist_ok=True) - filtered_df.to_csv(f"{prefix}/filtered_transcripts.csv", index=False) - - return None - - -def main() -> None: - """ - Run preprocess transcripts as nf module. - """ - parser = argparse.ArgumentParser( - description="Preprocess Xenium transcripts for Baysor" - ) - parser.add_argument( - "--transcripts", required=True, help="Path to transcripts parquet file" - ) - parser.add_argument("--prefix", required=True, help="Output directory prefix") - parser.add_argument( - "--min-qv", - type=float, - default=20.0, - help="Minimum Q-Score threshold (default: 20.0)", - ) - parser.add_argument( - "--min-x", - type=float, - default=0.0, - help="Minimum x-coordinate threshold (default: 0.0)", - ) - parser.add_argument( - "--max-x", - type=float, - default=24000.0, - help="Maximum x-coordinate threshold (default: 24000.0)", - ) - parser.add_argument( - "--min-y", - type=float, - default=0.0, - help="Minimum y-coordinate threshold (default: 0.0)", - ) - parser.add_argument( - "--max-y", - type=float, - default=24000.0, - help="Maximum y-coordinate threshold (default: 24000.0)", - ) - args = parser.parse_args() - - filter_transcripts( - transcripts=args.transcripts, - min_qv=args.min_qv, - min_x=args.min_x, - max_x=args.max_x, - min_y=args.min_y, - max_y=args.max_y, - prefix=args.prefix, - ) - - return None - - -if __name__ == "__main__": - main() diff --git a/modules/local/ficture/preprocess/resources/usr/bin/ficture_preprocess.py b/modules/local/ficture/preprocess/resources/usr/bin/ficture_preprocess.py deleted file mode 100755 index 2e0c687c..00000000 --- a/modules/local/ficture/preprocess/resources/usr/bin/ficture_preprocess.py +++ /dev/null @@ -1,101 +0,0 @@ -#!/usr/bin/env python3 -"""Preprocess Xenium transcripts for FICTURE analysis.""" - -import argparse -import gzip -import logging -import os -import re -import sys - -import pandas as pd - - -def parse_args(): - """Parse command-line arguments.""" - parser = argparse.ArgumentParser( - description="Preprocess Xenium transcripts for FICTURE" - ) - parser.add_argument( - "--transcripts", required=True, help="Path to transcripts file (CSV)" - ) - parser.add_argument( - "--features", default="", help="Path to features file (optional)" - ) - parser.add_argument( - "--negative-control-regex", default="", help="Regex for negative control probes" - ) - return parser.parse_args() - - -def main(): - """Run FICTURE preprocessing.""" - args = parse_args() - print("[START]") - - negctrl_regex = "BLANK|NegCon" - if args.negative_control_regex: - negctrl_regex = args.negative_control_regex - - unit_info = ["X", "Y", "gene", "cell_id", "overlaps_nucleus"] - oheader = unit_info + ["Count"] - - feature = pd.DataFrame() - xmin = sys.maxsize - xmax = 0 - ymin = sys.maxsize - ymax = 0 - - output = "processed_transcripts.tsv.gz" - feature_file = "feature.clean.tsv.gz" - min_phred_score = 15 - - with gzip.open(output, "wt") as wf: - wf.write("\t".join(oheader) + "\n") - - for chunk in pd.read_csv(args.transcripts, header=0, chunksize=500000): - chunk = chunk.loc[(chunk.qv > min_phred_score)] - chunk.rename(columns={"feature_name": "gene"}, inplace=True) - if negctrl_regex != "": - chunk = chunk[ - ~chunk.gene.str.contains(negctrl_regex, flags=re.IGNORECASE, regex=True) - ] - chunk.rename(columns={"x_location": "X", "y_location": "Y"}, inplace=True) - chunk["Count"] = 1 - chunk[oheader].to_csv( - output, sep="\t", mode="a", index=False, header=False, float_format="%.2f" - ) - logging.info(f"{chunk.shape[0]}") - feature = pd.concat( - [feature, chunk.groupby(by="gene").agg({"Count": "sum"}).reset_index()] - ) - x0 = chunk.X.min() - x1 = chunk.X.max() - y0 = chunk.Y.min() - y1 = chunk.Y.max() - xmin = min(int(xmin), int(x0)) - xmax = max(int(xmax), int(x1)) - ymin = min(int(ymin), int(y0)) - ymax = max(int(ymax), int(y1)) - - if os.path.exists(args.features): - feature_list = [] - with open(args.features, "r") as ff: - for line in ff: - feature_list.append(line.strip("\n")) - feature = feature.groupby(by="gene").agg({"Count": "sum"}).reset_index() - feature = feature[[x in feature_list for x in feature["gene"]]] - feature.to_csv(feature_file, sep="\t", index=False) - - f = os.path.join(os.path.dirname(output), "coordinate_minmax.tsv") - with open(f, "w") as wf: - wf.write(f"xmin\t{xmin}\n") - wf.write(f"xmax\t{xmax}\n") - wf.write(f"ymin\t{ymin}\n") - wf.write(f"ymax\t{ymax}\n") - - print("[FINISH]") - - -if __name__ == "__main__": - main() diff --git a/modules/local/segger/create_dataset/resources/usr/bin/run_create_dataset.py b/modules/local/segger/create_dataset/resources/usr/bin/run_create_dataset.py deleted file mode 100755 index c73ab006..00000000 --- a/modules/local/segger/create_dataset/resources/usr/bin/run_create_dataset.py +++ /dev/null @@ -1,253 +0,0 @@ -#!/usr/bin/env python3 -""" -Run segger create_dataset with spatialxe-specific preprocessing and workarounds. - -Wraps segger's create_dataset_fast.py with: - - bundle_local symlink prep (handles read-only S3/Fusion mounts) - - parquet column statistics (segger needs these) - - WORKAROUND: filter trainable tiles from test_tiles when segger commit 0787167 mis-splits - - WORKAROUND: replace NaN bd.x with zeros after get_polygon_props produces NaN - -Each WORKAROUND should be removable when the upstream segger bug is fixed. -""" - -import argparse -import os -import shutil -import subprocess -import sys -from pathlib import Path - -# imports for actual work (used in functions below) -import pyarrow.parquet as pq -import pyarrow.compute as pc -import torch - - -SEGGER_CLI = "/workspace/segger_dev/src/segger/cli/create_dataset_fast.py" - - -def parse_args(): - p = argparse.ArgumentParser() - p.add_argument("--bundle-dir", required=True) - p.add_argument("--output-dir", required=True) - p.add_argument("--sample-type", required=True, choices=["xenium"]) - p.add_argument("--tile-width", type=int, required=True) - p.add_argument("--tile-height", type=int, required=True) - p.add_argument("--n-workers", type=int, required=True) - # remaining args forwarded to segger CLI - args, extra = p.parse_known_args() - return args, extra - - -def prepare_bundle(bundle_dir): - """Create local bundle dir with absolute symlinks (S3/Fusion read-only-safe).""" - Path("bundle_local").mkdir(exist_ok=True) - for item in Path(bundle_dir).iterdir(): - try: - abs_path = item.resolve() - except Exception: - abs_path = item - target = Path("bundle_local") / item.name - if target.exists() or target.is_symlink(): - target.unlink() - target.symlink_to(abs_path) - - # Segger expects nucleus_boundaries.parquet but Xenium bundles have cell_boundaries.parquet - nb = Path("bundle_local/nucleus_boundaries.parquet") - cb = Path("bundle_local/cell_boundaries.parquet") - if not nb.exists() and cb.exists(): - print( - "Creating nucleus_boundaries.parquet symlink from cell_boundaries.parquet" - ) - nb.symlink_to(cb.resolve()) - - print("Bundle contents:") - for item in sorted(Path("bundle_local").iterdir()): - print(f" {item.name}") - - -def add_parquet_stats(): - """Rewrite key parquet files with column statistics (segger requires them).""" - Path("bundle_stats").mkdir(exist_ok=True) - for fname in ["transcripts.parquet", "nucleus_boundaries.parquet"]: - src = Path("bundle_local") / fname - dst = Path("bundle_stats") / fname - if not src.exists(): - print(f" Skip {src}") - continue - t = pq.read_table(str(src)) - pq.write_table(t, str(dst), write_statistics=True, compression="snappy") - print(f" Done {fname} ({len(t)} rows)") - - # Symlink everything else from bundle_local into bundle_stats - for item in Path("bundle_local").iterdir(): - dst = Path("bundle_stats") / item.name - if not dst.exists(): - dst.symlink_to(item.resolve()) - - # Debug: check overlaps_nucleus column in transcripts - print("\n=== Debugging overlaps_nucleus data ===") - tx = pq.read_table("bundle_stats/transcripts.parquet") - bd = pq.read_table("bundle_stats/nucleus_boundaries.parquet") - if "overlaps_nucleus" in tx.column_names: - col = tx.column("overlaps_nucleus") - print(f"overlaps_nucleus dtype: {col.type}") - unique_vals = pc.unique(col) - print(f"overlaps_nucleus unique values: {unique_vals.to_pylist()[:10]}") - val_counts = pc.value_counts(col) - print(f"overlaps_nucleus value_counts: {val_counts.to_pylist()}") - else: - print("WARNING: overlaps_nucleus column NOT FOUND in transcripts.parquet") - - if "cell_id" in tx.column_names and "cell_id" in bd.column_names: - tx_cells = set(pc.unique(tx.column("cell_id")).to_pylist()) - bd_cells = set(pc.unique(bd.column("cell_id")).to_pylist()) - overlap = tx_cells & bd_cells - print(f"Transcripts unique cell_ids: {len(tx_cells)}") - print(f"Boundaries unique cell_ids: {len(bd_cells)}") - print(f"Overlapping cell_ids: {len(overlap)}") - print("=== End Debug ===\n") - - -def run_segger_cli(args, extra): - cmd = [ - "python3", - SEGGER_CLI, - "--base_dir", - "bundle_stats", - "--data_dir", - args.output_dir, - "--sample_type", - args.sample_type, - "--tile_width", - str(args.tile_width), - "--tile_height", - str(args.tile_height), - "--n_workers", - str(args.n_workers), - *extra, - ] - print(f"Running: {' '.join(cmd)}") - result = subprocess.run(cmd) - if result.returncode != 0: - sys.exit(result.returncode) - - -def filter_trainable_tiles_if_needed(prefix): - """ - WORKAROUND: segger commit 0787167 has a bug where all tiles end up in test_tiles - regardless of test_prob/val_prob settings. Move ONLY trainable tiles (those with - edge_label_index) from test_tiles to train_tiles. - - Remove this function once segger >= 0.1.x is bumped with the upstream fix. - """ - train_dir = Path(prefix) / "train_tiles" / "processed" - test_dir = Path(prefix) / "test_tiles" / "processed" - val_dir = Path(prefix) / "val_tiles" / "processed" - - train_count = len(list(train_dir.iterdir())) if train_dir.exists() else 0 - test_count = len(list(test_dir.iterdir())) if test_dir.exists() else 0 - val_count = len(list(val_dir.iterdir())) if val_dir.exists() else 0 - print( - f"Dataset split (before fix): train={train_count} val={val_count} test={test_count}" - ) - - if train_count == 0 and test_count > 0: - print( - "Applying workaround: filtering trainable tiles from test_tiles (segger split bug)" - ) - moved = 0 - skipped = 0 - for tile_path in list(test_dir.iterdir()): - if not tile_path.name.endswith(".pt"): - continue - try: - tile = torch.load(str(tile_path), weights_only=False) - edge_store = tile["tx", "belongs", "bd"] - if ( - hasattr(edge_store, "edge_label_index") - and edge_store.edge_label_index.numel() > 0 - ): - shutil.move(str(tile_path), str(train_dir / tile_path.name)) - moved += 1 - else: - skipped += 1 - except Exception as e: - print(f"Warning: Could not process {tile_path.name}: {e}") - skipped += 1 - print(f"Moved {moved} trainable tiles to train_tiles") - print(f"Skipped {skipped} test-only tiles (no edge_label_index)") - - train_count = len(list(train_dir.iterdir())) if train_dir.exists() else 0 - test_count = len(list(test_dir.iterdir())) if test_dir.exists() else 0 - val_count = len(list(val_dir.iterdir())) if val_dir.exists() else 0 - print( - f"Dataset split (after fix): train={train_count} val={val_count} test={test_count}" - ) - - if train_count == 0: - print(f"ERROR: No trainable tiles were created in {train_dir}", file=sys.stderr) - print( - "This usually means no transcripts overlap with nucleus boundaries in the dataset.", - file=sys.stderr, - ) - print( - "Check if the Xenium bundle contains valid overlaps_nucleus data in transcripts.parquet.", - file=sys.stderr, - ) - sys.exit(1) - print(f"Successfully created {train_count} trainable tiles") - - -def fix_bd_x_nan(prefix): - """ - WORKAROUND: segger's get_polygon_props() produces NaN boundary features (bd.x) - when polygon geometries have zero area or index misalignment during GeoDataFrame - construction. Replace NaN bd.x with zeros so BCEWithLogitsLoss doesn't propagate NaN. - - Remove this function once segger >= 0.1.x is bumped with the upstream fix. - """ - fixed = 0 - total = 0 - for split in ["train_tiles", "test_tiles", "val_tiles"]: - tile_dir = Path(prefix) / split / "processed" - if not tile_dir.is_dir(): - continue - for tile_path in tile_dir.iterdir(): - if not tile_path.name.endswith(".pt"): - continue - total += 1 - tile = torch.load(str(tile_path), weights_only=False) - bd_x = tile["bd"].x - if bd_x.isnan().any(): - tile["bd"].x = torch.nan_to_num(bd_x, nan=0.0) - torch.save(tile, str(tile_path)) - fixed += 1 - print(f"Fixed NaN bd.x in {fixed}/{total} tiles") - - -def main(): - args, extra = parse_args() - - # Ensure numba cache dir is writable (env var should be set by caller, but belt-and-suspenders) - os.environ.setdefault("NUMBA_CACHE_DIR", os.path.join(os.getcwd(), ".numba_cache")) - os.makedirs(os.environ["NUMBA_CACHE_DIR"], exist_ok=True) - - prepare_bundle(args.bundle_dir) - print("Adding statistics to parquet files...") - add_parquet_stats() - - # Sanity-check bundle_stats - print("bundle_stats contents:") - for item in sorted(Path("bundle_stats").iterdir()): - print(f" {item.name}") - - run_segger_cli(args, extra) - - filter_trainable_tiles_if_needed(args.output_dir) - fix_bd_x_nan(args.output_dir) - - -if __name__ == "__main__": - main() diff --git a/modules/local/segger/predict/resources/usr/bin/run_predict.py b/modules/local/segger/predict/resources/usr/bin/run_predict.py deleted file mode 100755 index 56a77ffc..00000000 --- a/modules/local/segger/predict/resources/usr/bin/run_predict.py +++ /dev/null @@ -1,137 +0,0 @@ -#!/usr/bin/env python3 -""" -Run segger predict with spatialxe-specific preprocessing. - -Wraps segger's predict_fast.py with: - - GPU enumeration (replaces inline python3 -c torch check) - - WORKAROUND: patch predict_parquet.py at runtime to add torch.no_grad() for ~30-50% VRAM savings - - WORKAROUND: seed random.choice for deterministic GPU assignment (avoids stochastic OOM) - -Both WORKAROUNDs should be removable once the patches are upstreamed to segger. -""" - -import argparse -import os -import subprocess -import sys - - -SEGGER_CLI = "/workspace/segger_dev/src/segger/cli/predict_fast.py" - - -def parse_args(): - p = argparse.ArgumentParser() - p.add_argument("--models-dir", required=True) - p.add_argument("--segger-data-dir", required=True) - p.add_argument("--transcripts-file", required=True) - p.add_argument("--benchmarks-dir", required=True) - p.add_argument("--batch-size", type=int, required=True) - p.add_argument("--use-cc", required=True) - p.add_argument("--knn-method", required=True) - p.add_argument("--num-workers", type=int, required=True) - args, extra = p.parse_known_args() - return args, extra - - -def detect_gpus(): - """Return comma-separated list of available CUDA device ids (or "0" if none).""" - import torch - - print("=== GPU Detection (SEGGER_PREDICT) ===") - print(f"PyTorch CUDA available: {torch.cuda.is_available()}") - n = torch.cuda.device_count() - print(f"CUDA device count: {n}") - print("======================================") - if n > 0: - return ",".join(str(i) for i in range(n)) - return "0" - - -def patch_predict_parquet(): - """ - WORKAROUND: patch segger.prediction.predict_parquet at runtime. - - Avoids rebuilding the segger Docker image. Two patches: - 1. Add torch.no_grad() to disable gradient graphs during inference (~30-50% VRAM savings). - 2. Seed random for deterministic GPU assignment (avoids stochastic OOM). - - Remove this function once the patches are upstreamed to segger. - """ - import segger.prediction.predict_parquet as m - - pred_py = m.__file__ - print(f"Patching {pred_py}: torch.no_grad() + round-robin GPU assignment") - # Use sed via subprocess for in-place edit (matches the original behavior exactly) - subprocess.run( - [ - "sed", - "-i", - "s/with cp.cuda.Device(gpu_id):/with cp.cuda.Device(gpu_id), torch.no_grad():/", - pred_py, - ], - check=True, - ) - subprocess.run( - [ - "sed", - "-i", - "s/gpu_id = random.choice(gpu_ids)/random.seed(0); gpu_id = random.choice(gpu_ids)/", - pred_py, - ], - check=True, - ) - - -def run_segger_cli(args, extra, gpu_ids): - cmd = [ - "python3", - SEGGER_CLI, - "--models_dir", - args.models_dir, - "--segger_data_dir", - args.segger_data_dir, - "--transcripts_file", - args.transcripts_file, - "--benchmarks_dir", - args.benchmarks_dir, - "--batch_size", - str(args.batch_size), - "--use_cc", - str(args.use_cc), - "--knn_method", - args.knn_method, - "--num_workers", - str(args.num_workers), - "--gpu_ids", - gpu_ids, - *extra, - ] - print(f"Running: {' '.join(cmd)}") - result = subprocess.run(cmd) - if result.returncode != 0: - sys.exit(result.returncode) - - -def main(): - args, extra = parse_args() - - # Limit cupy GPU memory to 80% so PyTorch has headroom for graph attention ops - os.environ.setdefault("CUPY_GPU_MEMORY_LIMIT", "80%") - # Belt-and-suspenders: ensure PyTorch uses expandable segments - os.environ.setdefault( - "PYTORCH_CUDA_ALLOC_CONF", "expandable_segments:True,max_split_size_mb:512" - ) - # Numba cache directory - os.environ.setdefault("NUMBA_CACHE_DIR", os.path.join(os.getcwd(), ".numba_cache")) - os.makedirs(os.environ["NUMBA_CACHE_DIR"], exist_ok=True) - - gpu_ids = detect_gpus() - print(f"Using GPUs: {gpu_ids}") - - patch_predict_parquet() - - run_segger_cli(args, extra, gpu_ids) - - -if __name__ == "__main__": - main() diff --git a/modules/local/spatialdata/merge/resources/usr/bin/spatialdata_merge.py b/modules/local/spatialdata/merge/resources/usr/bin/spatialdata_merge.py deleted file mode 100755 index 409d8c00..00000000 --- a/modules/local/spatialdata/merge/resources/usr/bin/spatialdata_merge.py +++ /dev/null @@ -1,82 +0,0 @@ -#!/usr/bin/env python3 -"""Merge two spatialdata bundles to create a layered spatialdata object.""" - -import argparse -import json -import os -import shutil - -import spatialdata - - -def parse_args(): - """Parse command-line arguments.""" - parser = argparse.ArgumentParser(description="Merge two spatialdata bundles") - parser.add_argument("--raw-bundle", required=True, help="Path to raw spatialdata bundle") - parser.add_argument("--redefined-bundle", required=True, help="Path to redefined spatialdata bundle") - parser.add_argument("--prefix", required=True, help="Output prefix (sample ID)") - parser.add_argument("--output-folder", required=True, help="Output folder name") - return parser.parse_args() - - -def main(): - """Run spatialdata merge.""" - args = parse_args() - print("[START]") - - output_dir = f"spatialdata/{args.prefix}/{args.output_folder}" - - # Ensure the output folder exists - if os.path.exists(output_dir): - shutil.rmtree(output_dir) - os.makedirs(output_dir) - - # Copy the entire reference bundle as is - for root, _, files in os.walk(args.raw_bundle): - rel_path = os.path.relpath(root, args.raw_bundle) - target_path = os.path.join(output_dir, rel_path) - os.makedirs(target_path, exist_ok=True) - for file in files: - shutil.copy(os.path.join(root, file), os.path.join(target_path, file)) - - # Rename folders in Points, Shapes, and Tables to raw_* - for category in ["points", "shapes", "tables"]: - category_path = os.path.join(output_dir, category) - if os.path.exists(category_path): - for folder in next(os.walk(category_path))[1]: - old_path = os.path.join(category_path, folder) - print(folder) - new_path = os.path.join(category_path, f"raw_{folder}") - os.rename(old_path, new_path) - - # Copy folders from redefined_bundle and rename them as redefined_* - for category in ["points", "shapes", "tables"]: - add_category_path = os.path.join(args.redefined_bundle, category) - output_category_path = os.path.join(output_dir, category) - os.makedirs(output_category_path, exist_ok=True) - - if os.path.exists(add_category_path): - for folder in next(os.walk(add_category_path))[1]: - src_folder = os.path.join(add_category_path, folder) - dest_folder = os.path.join(output_category_path, f"redefined_{folder}") - shutil.copytree(src_folder, dest_folder) - - # Invalidate consolidated metadata in zarr.json -- the directory renames above - # made the element paths in the metadata stale (e.g., 'points/transcripts' -> - # 'points/raw_transcripts'). Without consolidated metadata, sd.read_zarr() - # discovers elements by scanning the filesystem directly. - zarr_json = os.path.join(output_dir, "zarr.json") - if os.path.exists(zarr_json): - with open(zarr_json) as f: - meta = json.load(f) - if "consolidated_metadata" in meta: - del meta["consolidated_metadata"] - with open(zarr_json, "w") as f: - json.dump(meta, f) - print("[NOTE] Removed stale consolidated metadata from zarr.json") - - print("[FINISH]") - - -if __name__ == "__main__": - main() diff --git a/modules/local/spatialdata/meta/resources/usr/bin/spatialdata_meta.py b/modules/local/spatialdata/meta/resources/usr/bin/spatialdata_meta.py deleted file mode 100755 index 935f39b2..00000000 --- a/modules/local/spatialdata/meta/resources/usr/bin/spatialdata_meta.py +++ /dev/null @@ -1,126 +0,0 @@ -#!/usr/bin/env python3 -"""Add metadata to SpatialData bundle.""" - -import argparse -import json -import sys - -import pandas as pd -import spatialdata as sd -import zarr - -# Fix zarr v3 + anndata + numcodecs incompatibility: -# anndata's string writer passes numcodecs.VLenUTF8 to zarr.Group.create_array, -# but zarr v3 only accepts ArrayArrayCodec types. OME-Zarr 0.5 requires zarr v3 -# for images, so we can't downgrade the store format. Instead, we intercept -# create_array to strip numcodecs codecs and let zarr v3 handle strings natively. -import numcodecs -import zarr.core.group as _zarr_group - -_orig_create_array = _zarr_group.Group.create_array - - -def _v3_compat_create_array(self, *args, **kwargs): - """Strip numcodecs VLenUTF8 from codec params for zarr v3 compatibility.""" - for param in ("filters", "compressor", "object_codec"): - val = kwargs.get(param) - if val is None: - continue - if isinstance(val, numcodecs.vlen.VLenUTF8): - del kwargs[param] - elif isinstance(val, (list, tuple)): - cleaned = [v for v in val if not isinstance(v, numcodecs.vlen.VLenUTF8)] - if len(cleaned) != len(val): - if cleaned: - kwargs[param] = cleaned - else: - del kwargs[param] - return _orig_create_array(self, *args, **kwargs) - - -_zarr_group.Group.create_array = _v3_compat_create_array - - -def _is_arrow_backed(dtype): - """Check if a pandas dtype is backed by PyArrow.""" - return isinstance(dtype, pd.ArrowDtype) or ( - hasattr(dtype, "storage") and getattr(dtype, "storage", None) == "pyarrow" - ) or "pyarrow" in str(dtype) - - -def _convert_df_arrow_to_numpy(df): - """Convert Arrow-backed dtypes in a DataFrame to numpy object dtype.""" - for col in df.columns: - dtype = df[col].dtype - if _is_arrow_backed(dtype): - df[col] = df[col].astype("object") - elif isinstance(dtype, pd.CategoricalDtype): - cats = dtype.categories - if cats is not None and _is_arrow_backed(cats.dtype): - df[col] = df[col].cat.rename_categories(cats.astype("object")) - if _is_arrow_backed(df.index.dtype): - df.index = pd.Index(df.index.astype("object")) - - -def convert_arrow_to_numpy(sdata): - """Convert Arrow-backed dtypes to numpy for anndata zarr write compatibility.""" - for table_key in list(sdata.tables.keys()): - adata = sdata.tables[table_key] - _convert_df_arrow_to_numpy(adata.obs) - _convert_df_arrow_to_numpy(adata.var) - - -def parse_args(): - """Parse command-line arguments.""" - parser = argparse.ArgumentParser(description="Add metadata to SpatialData bundle") - parser.add_argument("--spatialdata-bundle", required=True, help="Path to spatialdata bundle") - parser.add_argument("--xenium-bundle", required=True, help="Path to xenium bundle") - parser.add_argument("--prefix", required=True, help="Output prefix (sample ID)") - parser.add_argument("--metadata", required=True, help="Metadata string from Nextflow meta map") - parser.add_argument("--output-folder", required=True, help="Output folder name") - return parser.parse_args() - - -def main(): - """Run spatialdata metadata addition.""" - args = parse_args() - print("[START]") - - sdata = sd.read_zarr(args.spatialdata_bundle) - - # Convert metadata into dict - print("[NOTE] Read in provenance ...") - metadata = args.metadata.strip("[]") # Remove square brackets - pairs = metadata.split(", ") # Split by comma and space - metadata = {k: v for k, v in (pair.split(":") for pair in pairs)} # Create dictionary - - for key in metadata: - if key not in sdata['raw_table'].uns['spatialdata_attrs']: - sdata['raw_table'].uns['spatialdata_attrs'][key] = metadata[key] - else: - print(f'[ERROR] {key} already exist in sdata[raw_table].uns[spatialdata_attrs].', file=sys.stderr) - - # Add experimental metadata - print("[NOTE] Read in experiment metadata ...") - sdata['raw_table'].uns['experiment_xenium'] = '' - metadata_experiment = f'{args.xenium_bundle}/experiment.xenium' - with open(metadata_experiment, "r") as f: - metadata_experiment = json.load(f) - sdata['raw_table'].uns['experiment_xenium'] = json.dumps(metadata_experiment) - - # Add gene panel metadata - print("[NOTE] Read in gene panel metadata ...") - sdata['raw_table'].uns['gene_panel'] = '' - metadata_gene_panel = f'{args.xenium_bundle}/gene_panel.json' - with open(metadata_gene_panel, "r") as f: - metadata_gene_panel = json.load(f) - sdata['raw_table'].uns['gene_panel'] = json.dumps(metadata_gene_panel) - - convert_arrow_to_numpy(sdata) - sdata.write(f"spatialdata/{args.prefix}/{args.output_folder}", overwrite=True, consolidate_metadata=True, sdata_formats=None) - - print("[FINISH]") - - -if __name__ == "__main__": - main() diff --git a/modules/local/spatialdata/write/resources/usr/bin/spatialdata_write.py b/modules/local/spatialdata/write/resources/usr/bin/spatialdata_write.py deleted file mode 100755 index 421e830f..00000000 --- a/modules/local/spatialdata/write/resources/usr/bin/spatialdata_write.py +++ /dev/null @@ -1,156 +0,0 @@ -#!/usr/bin/env python3 -"""Write spatialdata object from segmentation format.""" - -import argparse -import sys - -import pandas as pd -import spatialdata -from spatialdata_io import xenium - -# Fix zarr v3 + anndata + numcodecs incompatibility: -# anndata's string writer passes numcodecs.VLenUTF8 to zarr.Group.create_array, -# but zarr v3 only accepts ArrayArrayCodec types. OME-Zarr 0.5 requires zarr v3 -# for images, so we can't downgrade the store format. Instead, we intercept -# create_array to strip numcodecs codecs and let zarr v3 handle strings natively. -import numcodecs -import zarr.core.group as _zarr_group - -_orig_create_array = _zarr_group.Group.create_array - - -def _v3_compat_create_array(self, *args, **kwargs): - """Strip numcodecs VLenUTF8 from codec params for zarr v3 compatibility.""" - for param in ("filters", "compressor", "object_codec"): - val = kwargs.get(param) - if val is None: - continue - if isinstance(val, numcodecs.vlen.VLenUTF8): - del kwargs[param] - elif isinstance(val, (list, tuple)): - cleaned = [v for v in val if not isinstance(v, numcodecs.vlen.VLenUTF8)] - if len(cleaned) != len(val): - if cleaned: - kwargs[param] = cleaned - else: - del kwargs[param] - return _orig_create_array(self, *args, **kwargs) - - -_zarr_group.Group.create_array = _v3_compat_create_array - - -def _is_arrow_backed(dtype): - """Check if a pandas dtype is backed by PyArrow.""" - return ( - isinstance(dtype, pd.ArrowDtype) - or (hasattr(dtype, "storage") and getattr(dtype, "storage", None) == "pyarrow") - or "pyarrow" in str(dtype) - ) - - -def _convert_df_arrow_to_numpy(df): - """Convert Arrow-backed dtypes in a DataFrame to numpy object dtype. - - Handles three cases: - 1. Regular columns with Arrow-backed dtypes - 2. Categorical columns whose categories are Arrow-backed - 3. Index with Arrow-backed dtype - """ - for col in df.columns: - dtype = df[col].dtype - if _is_arrow_backed(dtype): - df[col] = df[col].astype("object") - elif isinstance(dtype, pd.CategoricalDtype): - cats = dtype.categories - if cats is not None and _is_arrow_backed(cats.dtype): - df[col] = df[col].cat.rename_categories(cats.astype("object")) - if _is_arrow_backed(df.index.dtype): - df.index = pd.Index(df.index.astype("object")) - - -def convert_arrow_to_numpy(sdata): - """Convert Arrow-backed dtypes to numpy for anndata zarr write compatibility.""" - for table_key in list(sdata.tables.keys()): - adata = sdata.tables[table_key] - _convert_df_arrow_to_numpy(adata.obs) - _convert_df_arrow_to_numpy(adata.var) - - -def parse_args(): - """Parse command-line arguments.""" - parser = argparse.ArgumentParser(description="Write spatialdata object from segmentation format") - parser.add_argument("--bundle", required=True, help="Path to input bundle") - parser.add_argument("--prefix", required=True, help="Output prefix (sample ID)") - parser.add_argument("--output-folder", required=True, help="Output folder name") - parser.add_argument("--segmented-object", required=True, help="Segmented object type (cells, nuclei, cells_and_nuclei)") - parser.add_argument("--coordinate-space", required=True, help="Coordinate space (pixels, microns)") - parser.add_argument("--format", required=True, help="Input format (xenium)") - return parser.parse_args() - - -def main(): - """Run spatialdata write.""" - args = parse_args() - print("[START]") - - cells_as_circles = False - cells_boundaries = False - nucleus_boundaries = False - cells_labels = False - nucleus_labels = False - - if args.segmented_object == "cells": - cells_boundaries = True - cells_labels = True - elif args.segmented_object == "nuclei": - nucleus_boundaries = True - nucleus_labels = True - elif args.segmented_object == "cells_and_nuclei": - cells_boundaries = True - nucleus_boundaries = True - cells_labels = True - nucleus_labels = True - else: - cells_as_circles = False - - # set sd variables based on the coordinate space - if args.coordinate_space == "pixels": - cells_labels = True - nucleus_labels = True - # Labels are sufficient in pixel space; boundaries can contain - # degenerate polygons (< 4 vertices) from XeniumRanger that - # crash spatialdata_io's shapely LinearRing parser. - cells_boundaries = False - nucleus_boundaries = False - - if args.coordinate_space == "microns": - cells_labels = False - cells_boundaries = True - nucleus_boundaries = False - nucleus_labels = False - cells_as_circles = False - - if args.format == "xenium": - sd_xenium_obj = xenium( - args.bundle, - cells_as_circles=cells_as_circles, - cells_boundaries=cells_boundaries, - nucleus_boundaries=nucleus_boundaries, - cells_labels=cells_labels, - nucleus_labels=nucleus_labels, - transcripts=True, - morphology_mip=True, - morphology_focus=True, - ) - print(sd_xenium_obj) - convert_arrow_to_numpy(sd_xenium_obj) - sd_xenium_obj.write(f"spatialdata/{args.prefix}/{args.output_folder}") - else: - sys.exit("[ERROR] Format not found") - - print("[FINISH]") - - -if __name__ == "__main__": - main() diff --git a/modules/local/utility/extract_preview_data/main.nf b/modules/local/utility/extract_preview_data/main.nf index 1240ddbf..821effc5 100644 --- a/modules/local/utility/extract_preview_data/main.nf +++ b/modules/local/utility/extract_preview_data/main.nf @@ -26,7 +26,7 @@ process EXTRACT_PREVIEW_DATA { prefix = task.ext.prefix ?: "${meta.id}" """ - utility_extract_data.py \\ + utility_extract_preview_data.py \\ --preview-html ${preview_html} \\ --prefix ${prefix} """ diff --git a/modules/local/utility/extract_preview_data/resources/usr/bin/extract_data.py b/modules/local/utility/extract_preview_data/resources/usr/bin/extract_data.py deleted file mode 100755 index 0ea737c2..00000000 --- a/modules/local/utility/extract_preview_data/resources/usr/bin/extract_data.py +++ /dev/null @@ -1,208 +0,0 @@ -#!/usr/bin/env python3 -""" -Extract preview data from Baysor preview HTML reports. - -Parses embedded Vega-Lite spec variables and base64 PNG images from the -Baysor preview.html file, writing MultiQC-compatible TSV and PNG files. -""" - -import argparse -import base64 -import html -import json -import re -import sys -from pathlib import Path -from typing import Dict, List, Optional, Tuple - -import pandas as pd -from bs4 import BeautifulSoup - - -def get_png_files(soup: BeautifulSoup, outdir: Path) -> None: - """Get png base64 images following specific h1 tags in preview.html""" - target_ids = ["Transcript_Plots", "Noise_Level"] - outdir.mkdir(parents=True, exist_ok=True) - - for h1_id in target_ids: - h1_tag = soup.find("h1", id=h1_id) - if not h1_tag: - print(f"[WARN] No