From e721da065a92f32edd7cd90222ff1994cf88a983 Mon Sep 17 00:00:00 2001 From: an-altosian Date: Thu, 28 May 2026 20:41:43 +0000 Subject: [PATCH 1/5] fix(workflows): guard gene_panel.json read on do_relabel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Empirical evidence from 10x Atera (Xenium Gen2) Cell Pellet preview data: all non-XR pipeline modes (qc, preview, segfree-baysor, segfree-ficture) failed at workflow init in ~90s with errorMessage '/gene_panel.json' even though gene_panel.json is only read by XENIUMRANGER_RELABEL_RESEGMENT. Root cause: the .map { ... file(/gene_panel.json, checkIfExists: true) } closure in the else-branch at workflows/ spatialaxe.nf:343-352 is evaluated eagerly by Nextflow as soon as ch_input emits, regardless of whether ch_gene_panel is downstream- consumed. For any bundle that doesn't ship gene_panel.json — 10x Atera (Xenium Gen2 preview data) is one such case — workflow init fails for every mode, including qc/preview/segfree which never need a gene panel. Fix: wrap the entire ch_gene_panel construction in if (do_relabel) { ... }. When false, ch_gene_panel keeps its channel.empty() initialisation from line 112. No behaviour change for image/ coordinate modes (do_relabel can still be true via --gene_panel or --relabel_genes). --- workflows/spatialaxe.nf | 36 +++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/workflows/spatialaxe.nf b/workflows/spatialaxe.nf index 8e11f05..6f10e85 100644 --- a/workflows/spatialaxe.nf +++ b/workflows/spatialaxe.nf @@ -333,24 +333,34 @@ workflow SPATIALAXE { // get gene_panel.json if provided with --gene_panel, sets relabel_genes to true def do_relabel = gene_panel ? true : relabel_genes - if (gene_panel) { - def gene_panel_file = file(gene_panel, checkIfExists: true) - ch_gene_panel = ch_input.map { meta, _bundle, _image -> - return [meta, gene_panel_file] + // Only construct ch_gene_panel when relabel will actually run. + // The .map { file(..., checkIfExists: true) } closure is evaluated eagerly + // by Nextflow as soon as ch_input emits — even if ch_gene_panel is never + // consumed downstream — so leaving this block unconditional breaks every + // bundle that ships without gene_panel.json (e.g. 10x Atera / Xenium Gen2), + // even in modes that never invoke XENIUMRANGER_RELABEL_RESEGMENT. + if (do_relabel) { + if (gene_panel) { + + def gene_panel_file = file(gene_panel, checkIfExists: true) + ch_gene_panel = ch_input.map { meta, _bundle, _image -> + return [meta, gene_panel_file] + } } - } - else { + else { - // gene panel to use if only --relabel_genes is provided - ch_gene_panel = ch_input.map { meta, bundle, _image -> - def gene_panel_file = file( - bundle.toString().replaceFirst(/\/$/, '') + "/gene_panel.json", - checkIfExists: true - ) - return [meta, gene_panel_file] + // gene panel to use if only --relabel_genes is provided + ch_gene_panel = ch_input.map { meta, bundle, _image -> + def gene_panel_file = file( + bundle.toString().replaceFirst(/\/$/, '') + "/gene_panel.json", + checkIfExists: true + ) + return [meta, gene_panel_file] + } } } + // else: ch_gene_panel keeps its initial channel.empty() value from line 112 /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ From 64eeab417ff92ac3932b675f11e23d5fa1ca2168 Mon Sep 17 00:00:00 2001 From: an-altosian Date: Thu, 28 May 2026 20:41:44 +0000 Subject: [PATCH 2/5] perf(parquet_to_csv): stream-convert instead of eager-load PARQUET_TO_CSV is a pure format transformation: every input row maps to one output row. The pandas-based implementation eagerly loaded the entire parquet into a DataFrame before writing CSV. For the 10x Atera WTA panel transcripts.parquet (236M rows x 13 cols, 2.9 GB compressed, ~30+ GB uncompressed), this OOMed twice at process_low defaults (12 GB -> 24 GB) before succeeding on attempt 3 (36 GB, 34 GB peak rss) in a Tower run on Atera Cell Pellet. Replace with pyarrow iter_batches() + pa_csv.CSVWriter. Memory is bounded by --batch-size (default 200000 rows ~= 130 MB) instead of the full row count. Same I/O, same CPU, ~100x less peak memory. Verified locally on a 1000-row sample of real Atera transcripts.parquet: output is well-formed RFC-4180 CSV with same column order and same data. Cosmetic differences from pandas (string quoting, '23' vs '23.0', 'true' vs 'True') are universally tolerated by downstream consumers (Baysor Julia CSV.jl, Ficture pd.read_csv) and the only affected bool column (is_gene) isn't read by either consumer. --- bin/utility_parquet_to_csv.py | 58 +++++++++++++++++++++-------------- 1 file changed, 35 insertions(+), 23 deletions(-) diff --git a/bin/utility_parquet_to_csv.py b/bin/utility_parquet_to_csv.py index bfa19c4..785b97c 100755 --- a/bin/utility_parquet_to_csv.py +++ b/bin/utility_parquet_to_csv.py @@ -1,47 +1,52 @@ #!/usr/bin/env python3 """ -Convert a Parquet file to CSV format. +Stream-convert a Parquet file to CSV format using pyarrow batched I/O. -Reads a Parquet file and writes it as CSV, optionally gzip-compressed. +Pure format transformation: every input row maps to one output row. +Memory usage is bounded by --batch-size (default 200_000 rows) rather +than the full row count, so this works on large bundles (e.g. the 10x +Atera WTA panel: 236M rows x 13 cols) within ~200 MB of RAM rather +than the ~30 GB the previous pandas-based eager loader required. """ import argparse +import gzip from pathlib import Path -import pandas as pd +import pyarrow.csv as pa_csv +import pyarrow.parquet as pq -def convert_parquet( +def stream_parquet_to_csv( transcripts: str, - extension: str = ".csv", - prefix: str = "", + extension: str, + prefix: str, + batch_size: int, ) -> None: - """ - Convert a Parquet file to CSV or CSV.GZ format. - - Args: - transcripts: Filename of the input parquet file - extension: Output extension ('.csv' or '.gz' for gzip) - prefix: Output directory prefix - """ - df = pd.read_parquet(transcripts, engine="pyarrow") - + """Stream a Parquet file to CSV (optionally gzip-compressed).""" Path(prefix).mkdir(parents=True, exist_ok=True) + pf = pq.ParquetFile(transcripts) if extension == ".gz": - output = transcripts.replace(".parquet", ".csv.gz") - df.to_csv(f"{prefix}/{output}", compression="gzip", index=False) + out_path = f"{prefix}/" + transcripts.replace(".parquet", ".csv.gz") + # pyarrow's CSVWriter writes bytes; wrap a gzip stream. + sink = gzip.open(out_path, "wb") else: - output = transcripts.replace(".parquet", ".csv") - df.to_csv(f"{prefix}/{output}", index=False) + out_path = f"{prefix}/" + transcripts.replace(".parquet", ".csv") + sink = open(out_path, "wb") - return None + try: + with pa_csv.CSVWriter(sink, pf.schema_arrow) as writer: + for batch in pf.iter_batches(batch_size=batch_size): + writer.write_batch(batch) + finally: + sink.close() def parse_args() -> argparse.Namespace: """Parse command-line arguments.""" parser = argparse.ArgumentParser( - description="Convert a Parquet file to CSV format." + description="Stream-convert a Parquet file to CSV format." ) parser.add_argument( "--transcripts", @@ -58,13 +63,20 @@ def parse_args() -> argparse.Namespace: required=True, help="Output directory prefix (sample ID)", ) + parser.add_argument( + "--batch-size", + type=int, + default=200_000, + help="Rows per batch (default 200000). Memory ~= batch_size * row_size.", + ) return parser.parse_args() if __name__ == "__main__": args = parse_args() - convert_parquet( + stream_parquet_to_csv( transcripts=args.transcripts, extension=args.extension, prefix=args.prefix, + batch_size=args.batch_size, ) From c03c97b51c94bf4af107df24ef042157ffc180b4 Mon Sep 17 00:00:00 2001 From: an-altosian Date: Thu, 28 May 2026 20:48:23 +0000 Subject: [PATCH 3/5] ci(pre-commit): ban python3 ${moduleDir}/templates/ shell-call pattern MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Preventive guardrail against the regression we hit in PR #154 (May 2026). The python3 ${moduleDir}/templates/