diff --git a/common b/common index 79b884b4..67da19a3 160000 --- a/common +++ b/common @@ -1 +1 @@ -Subproject commit 79b884b4c7fed300972d83a6ca025abb6116cbdc +Subproject commit 67da19a36ae56ea068804d15ccadec88a06da920 diff --git a/scripts/run_benchmark/run_full_seqeracloud.sh b/scripts/run_benchmark/run_full_seqeracloud.sh index 979440c9..baa19b65 100755 --- a/scripts/run_benchmark/run_full_seqeracloud.sh +++ b/scripts/run_benchmark/run_full_seqeracloud.sh @@ -17,16 +17,15 @@ cat > /tmp/params.yaml << HERE input_states: s3://openproblems-data/resources/task_cyto_batch_integration/datasets/**/state.yaml rename_keys: 'input_censored_split1:output_censored_split1;input_censored_split2:output_censored_split2;input_unintegrated:output_unintegrated' output_state: "state.yaml" -settings: '{"metrics_exclude": ["cms"], "methods_include": ["mnnpy", "cytovi"]}' publish_dir: "$publish_dir" HERE tw launch https://github.com/openproblems-bio/task_cyto_batch_integration.git \ - --revision build/fix_failed_stuff \ + --revision build/main \ --pull-latest \ --main-script target/nextflow/workflows/run_benchmark/main.nf \ --workspace 53907369739130 \ --params-file /tmp/params.yaml \ --entry-name auto \ --config common/nextflow_helpers/labels_tw.config \ - --labels task_cyto_batch_integration,mnnnpy + --labels task_cyto_batch_integration,test_subset diff --git a/src/methods/harmonypy/script.py b/src/methods/harmonypy/script.py index 8fc0cccc..5ac69a5d 100644 --- a/src/methods/harmonypy/script.py +++ b/src/methods/harmonypy/script.py @@ -4,8 +4,8 @@ ## VIASH START par = { - "input": "resources_test/task_cyto_batch_integration/mouse_spleen_flow_cytometry_subset/censored_split2.h5ad", - "output": "resources_test/task_cyto_batch_integration/mouse_spleen_flow_cytometry_subset/output_harmony_split2.h5ad", + "input": "/Users/putri.g/Documents/cytobenchmark/debug_general/_viash_par/input_1/censored_split1.h5ad", + "output": "/Users/putri.g/Documents/cytobenchmark/debug_general/_viash_par/output_1/output_harmony_split1.h5ad", } meta = {"name": "harmonypy"} ## VIASH END @@ -13,6 +13,7 @@ print("Reading and preparing input files", flush=True) adata = ad.read_h5ad(par["input"]) +# harmony can't handle integer batch labels adata.obs["batch_str"] = adata.obs["batch"].astype(str) markers_to_correct = adata.var[adata.var["to_correct"]].index.to_numpy() @@ -21,10 +22,13 @@ adata_to_correct = adata[:, markers_to_correct].copy() print("Run harmony", flush=True) -# harmony can't handle integer batch labels + +# TODO numerical instability in kmeans causing problem with harmony. +# so adding a very small value to all entries to make sure there are no zeros +epsilon = 1e-20 out = harmonypy.run_harmony( - data_mat=adata_to_correct.layers["preprocessed"], + data_mat=adata_to_correct.layers["preprocessed"] + epsilon, meta_data=adata_to_correct.obs, vars_use="batch_str", ) diff --git a/src/metrics/bras/config.vsh.yaml b/src/metrics/bras/config.vsh.yaml index fed70ef3..a99b43f5 100644 --- a/src/metrics/bras/config.vsh.yaml +++ b/src/metrics/bras/config.vsh.yaml @@ -1,16 +1,9 @@ -# The API specifies which type of component this is. -# It contains specifications for: -# - The input/output files -# - Common parameters -# - A unit test __merge__: ../../api/comp_metric.yaml # A unique identifier for your component (required). # Can contain only lowercase letters or underscores. name: bras - - - +status: disabled # Metadata for your component info: metrics: @@ -56,13 +49,6 @@ info: # Whether a higher value represents a 'better' solution (required) maximize: true -# Component-specific parameters (optional) -# arguments: -# - name: "--n_neighbors" -# type: "integer" -# default: 5 -# description: Number of neighbors to use. - # Resources required to run the component resources: # The script of your component (required) @@ -73,6 +59,14 @@ resources: engines: # Specifications for the Docker image for this component. + # testing gpu jax version + # - type: docker + # image: openproblems/base_pytorch_nvidia:1.1 + # setup: + # - type: python + # packages: + # - jax[cuda_12_pip] + # - scib-metrics~=0.5.6 - type: docker image: python:3.11 setup: diff --git a/src/metrics/bras/script.py b/src/metrics/bras/script.py index 98229ad0..db3428a1 100644 --- a/src/metrics/bras/script.py +++ b/src/metrics/bras/script.py @@ -57,6 +57,7 @@ labels=ct_labels_s1, batch=batch_labels_s1, metric="euclidean", + chunk_size=512, ) batch_labels_s2 = integrated_s2.obs["batch"].values @@ -67,6 +68,7 @@ labels=ct_labels_s2, batch=batch_labels_s2, metric="euclidean", + chunk_size=512, ) bras_score = np.mean([bras_s1, bras_s2]) diff --git a/src/metrics/n_inconsistent_peaks/config.vsh.yaml b/src/metrics/n_inconsistent_peaks/config.vsh.yaml index 569be185..9c8bae0e 100644 --- a/src/metrics/n_inconsistent_peaks/config.vsh.yaml +++ b/src/metrics/n_inconsistent_peaks/config.vsh.yaml @@ -2,7 +2,7 @@ __merge__: ../../api/comp_metric.yaml name: n_inconsistent_peaks -# status: disabled +status: disabled info: metrics: diff --git a/src/metrics/ratio_inconsistent_peaks/config.vsh.yaml b/src/metrics/ratio_inconsistent_peaks/config.vsh.yaml new file mode 100644 index 00000000..933c9aad --- /dev/null +++ b/src/metrics/ratio_inconsistent_peaks/config.vsh.yaml @@ -0,0 +1,72 @@ +# The API specifies which type of component this is. +# It contains specifications for: +# - The input/output files +# - Common parameters +# - A unit test +__merge__: ../../api/comp_metric.yaml + +# A unique identifier for your component (required). +# Can contain only lowercase letters or underscores. +name: ratio_inconsistent_peaks + +# Metadata for your component +info: + metrics: + # A unique identifier for your metric (required). + # Can contain only lowercase letters or underscores. + - name: ratio_inconsistent_peaks + label: Ratio of inconsistent peaks + summary: "Ratio of the number of cell‑type marker‑expression peaks between unintegrated and batch‑normalized data." + description: | + The metric compares the number of cell type specific marker expression peaks between unintegrated and batch normalized data. + The number of peaks is calculated using the `scipy.signal.find_peaks` function. + The metric is calculated as the absolute difference between the number of peaks in the unintegrated and batch-normalized data. + The (cell type) marker expression profiles are first smoothed using kernel density estimation (KDE) (`scipy.stats.gaussian_kde`), + and then peaks are then identified using the `scipy.signal.find_peaks` function. + For peak calling, the `prominence` parameter is set to 0.1 and the `height` parameter is set to 0.05*max_density. + Ratio of inconsistent peaks is defined as number of cases where the number of peaks differ between the two splits in the batch + normalized data divided by the total number of cases. + Cases where there are different number of peaks between the two splits in the unintegrated data are ignored from the denominator. + A lower score indicates better performance, means there are less cases with inconsistent peaks after batch correction. + An alternative peak counting method using persistent homology is also implemented for comparison because peak calling + is sensitive to noise and parameter choices. + + references: + doi: + - 10.1038/s41592-019-0686-2 + links: + # URL to the documentation for this metric (required). + documentation: https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.find_peaks.html#scipy.signal.find_peaks + # URL to the code repository for this metric (required). + repository: https://github.com/scipy/scipy/blob/v1.15.2/scipy/signal/_peak_finding.py#L0-L1 + # The minimum possible value for this metric (required) + min: 0 + # The maximum possible value for this metric (required) + max: +.inf + # Whether a higher value represents a 'better' solution (required) + maximize: false + +# Resources required to run the component +resources: + # The script of your component (required) + - type: python_script + path: script.py + - path: helper.py + - path: /src/utils/helper_functions.py + +engines: + # Specifications for the Docker image for this component. + - type: docker + image: openproblems/base_python:1 + setup: + - type: python + packages: + - scikit-tda + +runners: + # This platform allows running the component natively + - type: executable + # Allows turning the component into a Nextflow module / pipeline. + - type: nextflow + directives: + label: [midtime,midmem,midcpu] diff --git a/src/metrics/ratio_inconsistent_peaks/helper.py b/src/metrics/ratio_inconsistent_peaks/helper.py new file mode 100644 index 00000000..461f6915 --- /dev/null +++ b/src/metrics/ratio_inconsistent_peaks/helper.py @@ -0,0 +1,137 @@ +import matplotlib.pyplot as plt +import numpy as np +import seaborn as sns +from ripser import ripser +from scipy.signal import find_peaks +from scipy.stats import gaussian_kde + + +def standardise_marker_expression(dist_1, dist_2): + """ + Standardises the marker expression values from two distributions. + + Inputs: + dist_1: array of values (1D) representing the marker expression from distribution 1 + dist_2: array of values (1D) representing the marker expression from distribution 2 + + Outputs: + std_dist_1: array of standardised values for distribution 1 + std_dist_2: array of standardised values for distribution 2 + """ + + pooled = np.concatenate([dist_1, dist_2]) + mu, sd = pooled.mean(), pooled.std() + std_dist_1 = (dist_1 - mu) / (sd) + std_dist_2 = (dist_2 - mu) / (sd) + + return std_dist_1, std_dist_2 + + +def get_kde_density(expression_array, return_xgrid=False, plot=False): + """ + Returns the density of the array using a gaussian kernel density estimation. + + Inputs: + expression_array: array of values (1D) representing the marker expression + return_xgrid: boolean, if True, also return the x_grid values used for density estimation + plot: boolean, if True, plot the density estimation + + Outputs: + density: array of values representing the density of marker expression + x_grid (optional): array of x values where the density is evaluated + """ + + min_val = expression_array.min() + max_val = expression_array.max() + marker_values = np.reshape(expression_array, (1, -1)) # Reshape array for KDE + kde = gaussian_kde(marker_values, bw_method="scott") + x_grid = np.linspace(min_val, max_val, 100) + density = kde(x_grid) + + # If the highest value is at the first bin, shift bins by one and adjust x_grid + if np.argmax(density) == 0 and density.size > 1: + print("Shifting KDE bins by one as the highest density is at the first bin.") + # orig_x_grid = x_grid.copy() + # recale the grid so we only have 99 bins and shift everything by one to the right.. + x_grid = np.linspace(min_val, max_val, 99) + density = kde(x_grid) + + # Prepend a zero so the beginning, but remove the last value to keep size consistent + # as otherwise we will end up with an extra bin... + density = np.concatenate([[0.0], density]) + # Have to use actual grid spacing to keep uniform spacing in x_grid. + # Can't just blindly add 1. + step = (max_val - min_val) / (len(x_grid)) if len(x_grid) > 1 else 0.0 + x_grid = np.concatenate([[min_val - step], x_grid]) + + if plot: + fig, ax = plt.subplots() + sns.scatterplot(x=x_grid, y=density, ax=ax) + ax.set_title("KDE Density Estimation") + ax.set_xlabel("Marker Expression") + ax.set_ylabel("Density") + fig.tight_layout() + fig.show() + + if return_xgrid: + # handy for plotting later on and maybe even save in the AnnData object + return density, x_grid + else: + return density + + +def call_peaks(density): + """ + Returns the peaks of the density using scipy.signal.find_peaks. + + Inputs: + density: array of values representing the density of marker expression + + Outputs: + peaks: array of values representing the peaks of the density + """ + + height_trsh = 0.1 + prom_trsh = 0.01 + + peaks, _ = find_peaks(density, prominence=prom_trsh, height=height_trsh) + num_peaks = len(peaks) + + return num_peaks + + +def persistent_peak_count(ys, persistence_cutoff=0.08): + """ + Counts robust peaks in a 1D dataset using persistent homology. + + Args: + ys (np.ndarray): KDE of a marker expression (1D array) + persistence_cutoff (float): a threshold that decides which peaks are “significant enough” to count. + A large persistence peak survives over many levels of smoothing (i.e. a strong, real peak). + A small persistence peak quickly merges into a neighbor — likely noise. + 0.01: very low threshold counts even weak bumps as peaks + 0.05: moderate (default) counts clearly separated peaks + 0.1–0.2: high threshold counts only strong, dominant peaks + Default to 0.08 to biased towards strong peaks but not overly. + + Returns: + int: number of significant peaks + """ + + y = np.asarray(ys) + if y.size == 0: + return 0 + + # Shift if max is at the first bin + if y.size > 1 and np.argmax(y) == 0: + y = np.concatenate([[0.0], y[:-1]]) + + # Invert to turn peaks into "holes" for 0D persistence + Y = -ys.reshape(-1, 1) + diagram = ripser(Y, maxdim=0)["dgms"][0] + persistence = diagram[:, 1] - diagram[:, 0] + + # Define significance threshold relative to data range + threshold = persistence_cutoff * np.ptp(ys) + n_peaks = np.sum(persistence > threshold) + return n_peaks diff --git a/src/metrics/ratio_inconsistent_peaks/script.py b/src/metrics/ratio_inconsistent_peaks/script.py new file mode 100644 index 00000000..2102bec4 --- /dev/null +++ b/src/metrics/ratio_inconsistent_peaks/script.py @@ -0,0 +1,292 @@ +import sys +from collections import defaultdict + +import anndata as ad +import numpy as np +import pandas as pd + +## VIASH START +# The following code has been auto-generated by Viash. +par = { + "input_unintegrated": "resources_test/task_cyto_batch_integration/mouse_spleen_flow_cytometry_subset/unintegrated.h5ad", + # "input_unintegrated": "/Users/putri.g/Documents/cytobenchmark/benchmark_out_20251015/human_blood_mass_cytometry/unintegrated.h5ad", + "input_integrated_split1": "resources_test/task_cyto_batch_integration/mouse_spleen_flow_cytometry_subset/integrated_split1.h5ad", + "input_integrated_split2": "resources_test/task_cyto_batch_integration/mouse_spleen_flow_cytometry_subset/integrated_split2.h5ad", + "output": "resources_test/task_cyto_batch_integration/mouse_spleen_flow_cytometry_subset/score.h5ad", +} +meta = { + "name": "ratio_inconsistent_peaks", +} + +# for local testing only +# import src.metrics.ratio_inconsistent_peaks.helper as metric_helper +# from src.utils.helper_functions import ( +# get_obs_var_for_integrated, +# remove_unlabelled, +# subset_markers_tocorrect, +# subset_nocontrols, +# ) + +## VIASH END + +sys.path.append(meta["resources_dir"]) + +import helper as metric_helper + +# from helper import call_peaks, get_kde_density +from helper_functions import ( + get_obs_var_for_integrated, + remove_unlabelled, + subset_markers_tocorrect, + subset_nocontrols, +) + +print("Reading input files", flush=True) +integrated_s1 = ad.read_h5ad(par["input_integrated_split1"]) +integrated_s2 = ad.read_h5ad(par["input_integrated_split2"]) +unintegrated = ad.read_h5ad(par["input_unintegrated"]) + +print("Formatting input files", flush=True) +integrated_s1, integrated_s2 = get_obs_var_for_integrated( + integrated_s1, integrated_s2, unintegrated +) + +integrated_s1 = subset_nocontrols(integrated_s1) +integrated_s1 = subset_markers_tocorrect(integrated_s1) +integrated_s1 = subset_nocontrols(integrated_s1) +integrated_s1 = remove_unlabelled(integrated_s1) + +integrated_s2 = subset_nocontrols(integrated_s2) +integrated_s2 = subset_markers_tocorrect(integrated_s2) +integrated_s2 = subset_nocontrols(integrated_s2) +integrated_s2 = remove_unlabelled(integrated_s2) + +donor_list = integrated_s1.obs["donor"].unique() + +print("Compute metric (per cell type)", flush=True) + +# case 1 = consistent peaks in unintegrated and also in integrated +# case 3 = consistent peaks in unintegrated but inconsistent in integrated +# not recording case 2 or 4 where unintegrated is inconsistent +n_case1 = 0 +n_case3 = 0 + +# so we can see where each cases comes from +case_details = defaultdict(list) + +# for comparison only +persistent_peaks_res = [] + + +for donor in donor_list: + # for testing only + # donor = donor_list[0] + + print("Processing donor", donor, flush=True) + + u_view = unintegrated[unintegrated.obs["donor"] == donor] + + # process per split + s1_view = integrated_s1[integrated_s1.obs["donor"] == donor] + s2_view = integrated_s2[integrated_s2.obs["donor"] == donor] + + celltype_list = s1_view.obs["cell_type"].unique() + + for celltype in celltype_list: + # for testing only + # celltype = celltype_list[0] + + print(f"Processing celltype {celltype}", flush=True) + + u_view_ct = u_view[u_view.obs["cell_type"] == celltype] + s1_view_ct = s1_view[s1_view.obs["cell_type"] == celltype] + s2_view_ct = s2_view[s2_view.obs["cell_type"] == celltype] + + if s1_view_ct.shape[0] < 100 or s2_view_ct.shape[0] < 100: + print(f"Skipping celltype {celltype} and donor {donor}.", flush=True) + if s1_view_ct.shape[0] < 100: + print( + f"Because n_cells in s1 is {s1_view_ct.shape[0]}, less than 100", + flush=True, + ) + else: + print( + f"Because n_cells in s2 is {s2_view_ct.shape[0]}, less than 100", + flush=True, + ) + # TODO uncomment me when done + continue + + # unintegrated for split 1 + u_view_ct_s1 = u_view_ct[u_view_ct.obs["split"] == 1] + u_view_ct_s2 = u_view_ct[u_view_ct.obs["split"] == 2] + + for marker in s1_view_ct.var.index: + # for testing only + # marker = u_view_ct.var.index[0] + + print(f"Processing marker {marker} for celltype {celltype}", flush=True) + + print("--------------------------------", flush=True) + print("Computing peaks for unintegrated", flush=True) + + print("Standardising marker expression", flush=True) + # standardise marker expression based on pooled mean and sd of + # unscaled marker expression for unintegrated data for split 1 and 2 + u_s1_unscaled = np.array(u_view_ct_s1[:, marker].layers["preprocessed"]) + u_s2_unscaled = np.array(u_view_ct_s2[:, marker].layers["preprocessed"]) + + u_s1_scaled, u_s2_scaled = metric_helper.standardise_marker_expression( + u_s1_unscaled, + u_s2_unscaled, + ) + print("Computing KDE density", flush=True) + density_dist_u_s1 = metric_helper.get_kde_density( + expression_array=u_s1_scaled + ) + density_dist_u_s2 = metric_helper.get_kde_density( + expression_array=u_s2_scaled + ) + + print("Calling peaks", flush=True) + + peaks_u_s1 = metric_helper.call_peaks(density_dist_u_s1) + peaks_u_s2 = metric_helper.call_peaks(density_dist_u_s2) + + # use persistent peak only if the peak calling method is too sensitive... + persistent_peak_count_u_s1 = metric_helper.persistent_peak_count( + density_dist_u_s1 + ) + persistent_peak_count_u_s2 = metric_helper.persistent_peak_count( + density_dist_u_s2 + ) + + print("--------------------------------", flush=True) + + print("\n", flush=True) + + print("--------------------------------", flush=True) + print("Computing peaks for integrated", flush=True) + + print("Standardising marker expression", flush=True) + # standardise marker expression based on pooled mean and sd of + # unscaled marker expression for unintegrated data for split 1 and 2 + s1_unscaled = np.array(s1_view_ct[:, marker].layers["integrated"]) + s2_unscaled = np.array(s2_view_ct[:, marker].layers["integrated"]) + + s1_scaled, s2_scaled = metric_helper.standardise_marker_expression( + s1_unscaled, + s2_unscaled, + ) + print("Computing KDE density", flush=True) + density_dist_s1 = metric_helper.get_kde_density(s1_scaled) + density_dist_s2 = metric_helper.get_kde_density(s2_scaled) + + print("Calling peaks", flush=True) + + peaks_s1 = metric_helper.call_peaks(density_dist_s1) + peaks_s2 = metric_helper.call_peaks(density_dist_s2) + + # use persistent peak only if the peak calling method is too sensitive... + persistent_peak_count_s1 = metric_helper.persistent_peak_count( + density_dist_s1 + ) + persistent_peak_count_s2 = metric_helper.persistent_peak_count( + density_dist_s2 + ) + + print("--------------------------------", flush=True) + + print("\n", flush=True) + + print( + f"Comparing peaks between unintegrated and integrated for {donor}, {celltype}, {marker}", + flush=True, + ) + + # case 1 or 3 where we have consistent peaks in unintegrated + if peaks_u_s1 == peaks_u_s2: + if peaks_s1 != peaks_s2: + n_case3 += 1 + case_details["case3"].append((donor, celltype, marker)) + else: + n_case1 += 1 + case_details["case1"].append((donor, celltype, marker)) + else: + print( + "WARNING! Inconsistent peaks detected in unintegrated data (case 2 or 4). Skipping calculation", + flush=True, + ) + print( + f"Number of peaks in unintegrated split 1: {peaks_u_s1}, split 2: {peaks_u_s2}", + flush=True, + ) + case_details["case2or4"].append((donor, celltype, marker)) + + # for comparison only + persistent_peaks_res.append( + [ + donor, + celltype, + marker, + peaks_u_s1, + peaks_u_s2, + persistent_peak_count_u_s1, + persistent_peak_count_u_s2, + peaks_s1, + peaks_s2, + persistent_peak_count_s1, + persistent_peak_count_s2, + ] + ) + print("Done comparing peaks.", flush=True) + print("\n", flush=True) + +print("Done processing all celltypes and donors", flush=True) +print("Calculating ratio", flush=True) + +if n_case1 + n_case3 == 0: + print( + "Only case 2 or 4 are found!. Cannot calculate metric.", + flush=True, + ) + metric_val = np.nan +else: + metric_val = n_case3 / (n_case1 + n_case3) + +persistent_peaks_res = pd.DataFrame( + persistent_peaks_res, + columns=[ + "donor", + "celltype", + "marker", + "peaks_u_s1", + "peaks_u_s2", + "persistent_peaks_u_s1", + "persistent_peaks_u_s2", + "peaks_s1", + "peaks_s2", + "persistent_peaks_s1", + "persistent_peaks_s2", + ], +) + +print("Write output AnnData to file", flush=True) +output = ad.AnnData( + uns={ + "dataset_id": integrated_s1.uns["dataset_id"], + "method_id": integrated_s1.uns["method_id"], + "metric_ids": [meta["name"]], + "metric_values": [metric_val], + "n_cases": { + "case1": n_case1, + "case3": n_case3, + "case2or4": len(case_details["case2or4"]), + }, + "case_details": dict(case_details), + "peak_calling_results_comparison": persistent_peaks_res, + } +) +output.write_h5ad(par["output"], compression="gzip") + +# print(uns_metric_ids, uns_metric_values) diff --git a/src/workflows/run_benchmark/config.vsh.yaml b/src/workflows/run_benchmark/config.vsh.yaml index c424489d..de188e22 100644 --- a/src/workflows/run_benchmark/config.vsh.yaml +++ b/src/workflows/run_benchmark/config.vsh.yaml @@ -109,11 +109,13 @@ dependencies: - name: methods/rpca_to_mid - name: methods/cytovi - name: metrics/emd - - name: metrics/n_inconsistent_peaks + # - name: metrics/bras + # - name: metrics/n_inconsistent_peaks + - name: metrics/ratio_inconsistent_peaks - name: metrics/average_batch_r2 - name: metrics/flowsom_mapping_similarity - name: metrics/lisi - - name: metrics/bras + runners: - type: nextflow diff --git a/src/workflows/run_benchmark/main.nf b/src/workflows/run_benchmark/main.nf index be484ce2..1bb72733 100644 --- a/src/workflows/run_benchmark/main.nf +++ b/src/workflows/run_benchmark/main.nf @@ -40,11 +40,12 @@ methods = [ // construct list of metrics metrics = [ emd, - n_inconsistent_peaks, + // bras, + // n_inconsistent_peaks, + ratio_inconsistent_peaks, average_batch_r2, flowsom_mapping_similarity, - lisi, - bras + lisi ] workflow run_wf {