diff --git a/tests/calibration/test_calibration_features.py b/tests/calibration/test_calibration_features.py index 7818c14..b0ab67d 100644 --- a/tests/calibration/test_calibration_features.py +++ b/tests/calibration/test_calibration_features.py @@ -9,6 +9,7 @@ FeatureDependency, MassErrorFeature, BeamFeatures, + DiffusionBeamFeatures, RetentionTimeFeature, PrositFeatures, ChimericFeatures, @@ -449,6 +450,273 @@ def test_beam_features_edge_case_equal_probabilities(self, beam_features): ) +class TestDiffusionBeamFeatures: + """Test the DiffusionBeamFeatures class.""" + + @pytest.fixture() + def diffusion_beam_features(self): + """Create a DiffusionBeamFeatures instance for testing.""" + return DiffusionBeamFeatures() + + @pytest.fixture() + def sample_dataset_with_diffusion_beams(self): + """Create a sample dataset with diffusion beam predictions in metadata.""" + metadata = pd.DataFrame( + { + "confidence": [0.9, 0.8, 0.7], + "spectrum_id": [1, 2, 3], + # String-formatted predictions (as they appear in real data) + "diffusion_predictions_beam_0": [ + "['A', 'G']", + "['V', 'T']", + "['K']", + ], + "diffusion_log_probabilities_beam_0": [ + np.log(0.8), + np.log(0.9), + np.log(0.95), + ], + "diffusion_predictions_beam_1": [ + "['G', 'A']", + "['T', 'V']", + "['K', 'K']", + ], + "diffusion_log_probabilities_beam_1": [ + np.log(0.6), + np.log(0.7), + np.log(0.75), + ], + "diffusion_predictions_beam_2": [ + "['S', 'P']", + "['V', 'V']", + "['K', 'R']", + ], + "diffusion_log_probabilities_beam_2": [ + np.log(0.4), + np.log(0.5), + np.log(0.55), + ], + } + ) + return CalibrationDataset(metadata=metadata, predictions=[None, None, None]) + + def test_properties(self, diffusion_beam_features): + """Test DiffusionBeamFeatures properties.""" + assert diffusion_beam_features.name == "Diffusion Beam Features" + assert diffusion_beam_features.columns == [ + "diffusion_margin", + "diffusion_median_margin", + "diffusion_entropy", + "diffusion_z-score", + ] + assert diffusion_beam_features.dependencies == [] + + def test_compute_diffusion_beam_features( + self, diffusion_beam_features, sample_dataset_with_diffusion_beams + ): + """Test diffusion beam features computation with valid data.""" + diffusion_beam_features.compute(sample_dataset_with_diffusion_beams) + + # Check that all expected columns were added + expected_columns = [ + "diffusion_margin", + "diffusion_median_margin", + "diffusion_entropy", + "diffusion_z-score", + ] + for col in expected_columns: + assert col in sample_dataset_with_diffusion_beams.metadata.columns + + # Check that we have the right number of rows + assert len(sample_dataset_with_diffusion_beams.metadata) == 3 + + def test_compute_with_insufficient_sequences_warning(self, diffusion_beam_features): + """Test that warning is issued for beam results with < 2 sequences.""" + metadata = pd.DataFrame( + { + "confidence": [0.9, 0.8], + "diffusion_predictions_beam_0": ["['A']", "['G']"], + "diffusion_log_probabilities_beam_0": [np.log(0.8), np.log(0.9)], + # No beam_1, so only 1 sequence per spectrum + } + ) + dataset = CalibrationDataset(metadata=metadata, predictions=[None, None]) + + diffusion_beam_features.prepare(dataset) + + with pytest.warns( + UserWarning, + match="2 diffusion beam search results have fewer than two sequences", + ): + diffusion_beam_features.compute(dataset) + + def test_compute_margin_calculation(self, diffusion_beam_features): + """Test specific margin calculation for diffusion beams.""" + metadata = pd.DataFrame( + { + "confidence": [0.9], + "diffusion_predictions_beam_0": ["['A']"], + "diffusion_log_probabilities_beam_0": [np.log(0.8)], + "diffusion_predictions_beam_1": ["['G']"], + "diffusion_log_probabilities_beam_1": [np.log(0.6)], + } + ) + dataset = CalibrationDataset(metadata=metadata, predictions=[None]) + + diffusion_beam_features.compute(dataset) + + expected_margin = 0.8 - 0.6 # top_prob - second_prob + assert dataset.metadata.iloc[0]["diffusion_margin"] == pytest.approx( + expected_margin, rel=1e-10, abs=1e-10 + ) + + def test_compute_features_with_two_sequences(self, diffusion_beam_features): + """Test diffusion beam feature calculations with two sequences.""" + metadata = pd.DataFrame( + { + "confidence": [0.9], + "diffusion_predictions_beam_0": ["['A']"], + "diffusion_log_probabilities_beam_0": [np.log(0.8)], + "diffusion_predictions_beam_1": ["['G']"], + "diffusion_log_probabilities_beam_1": [np.log(0.6)], + } + ) + dataset = CalibrationDataset(metadata=metadata, predictions=[None]) + + diffusion_beam_features.compute(dataset) + + # margin = 0.8 - 0.6 = 0.2 + assert dataset.metadata.iloc[0]["diffusion_margin"] == pytest.approx( + 0.2, rel=1e-10, abs=1e-10 + ) + + # median_margin = 0.8 - 0.6 = 0.2 (only one runner-up) + assert dataset.metadata.iloc[0]["diffusion_median_margin"] == pytest.approx( + 0.2, rel=1e-10, abs=1e-10 + ) + + # entropy([1.0]) = 0 (only one runner-up, so normalized prob = [1.0]) + assert dataset.metadata.iloc[0]["diffusion_entropy"] == pytest.approx( + 0.0, rel=1e-10, abs=1e-10 + ) + + # z-score with two values: (0.8 + 0.6)/2 = 0.7, std = 0.1, z = (0.8-0.7)/0.1 = 1.0 + assert dataset.metadata.iloc[0]["diffusion_z-score"] == pytest.approx( + 1.0, rel=1e-10, abs=1e-10 + ) + + def test_compute_features_with_three_sequences(self, diffusion_beam_features): + """Test diffusion beam feature calculations with three sequences.""" + metadata = pd.DataFrame( + { + "confidence": [0.9], + "diffusion_predictions_beam_0": ["['A']"], + "diffusion_log_probabilities_beam_0": [np.log(0.7)], + "diffusion_predictions_beam_1": ["['G']"], + "diffusion_log_probabilities_beam_1": [np.log(0.2)], + "diffusion_predictions_beam_2": ["['S']"], + "diffusion_log_probabilities_beam_2": [np.log(0.1)], + } + ) + dataset = CalibrationDataset(metadata=metadata, predictions=[None]) + + diffusion_beam_features.compute(dataset) + + # margin = 0.7 - 0.2 = 0.5 + assert dataset.metadata.iloc[0]["diffusion_margin"] == pytest.approx( + 0.5, rel=1e-10, abs=1e-10 + ) + + # median_margin = 0.7 - median([0.2, 0.1]) = 0.7 - 0.15 = 0.55 + assert dataset.metadata.iloc[0]["diffusion_median_margin"] == pytest.approx( + 0.55, rel=1e-10, abs=1e-10 + ) + + # entropy([2/3, 1/3]) = 0.6365141682948128 + assert dataset.metadata.iloc[0]["diffusion_entropy"] == pytest.approx( + 0.6365141682948128, rel=1e-10, abs=1e-10 + ) + + # z-score: mean = 1/3, std = 0.262466929133727, z = (0.7-1/3)/0.262466929133727 + z_score = dataset.metadata.iloc[0]["diffusion_z-score"] + assert z_score == pytest.approx(1.3970013970020956, rel=1e-10, abs=1e-10) + + def test_parse_sequence_string(self, diffusion_beam_features): + """Test the _parse_sequence_string helper method.""" + # Valid string + result = diffusion_beam_features._parse_sequence_string("['A', 'G', 'S']") + assert result == ["A", "G", "S"] + + # Empty list + result = diffusion_beam_features._parse_sequence_string("[]") + assert result == [] + + # Invalid string (should return empty list) + result = diffusion_beam_features._parse_sequence_string("not a list") + assert result == [] + + # Malformed string (should return empty list) + result = diffusion_beam_features._parse_sequence_string("['A', 'G'") + assert result == [] + + def test_compute_features_equal_probabilities(self, diffusion_beam_features): + """Test diffusion features when all sequences have equal probabilities.""" + metadata = pd.DataFrame( + { + "confidence": [0.9], + "diffusion_predictions_beam_0": ["['A']"], + "diffusion_log_probabilities_beam_0": [np.log(1 / 3)], + "diffusion_predictions_beam_1": ["['G']"], + "diffusion_log_probabilities_beam_1": [np.log(1 / 3)], + "diffusion_predictions_beam_2": ["['S']"], + "diffusion_log_probabilities_beam_2": [np.log(1 / 3)], + } + ) + dataset = CalibrationDataset(metadata=metadata, predictions=[None]) + + diffusion_beam_features.compute(dataset) + + # margin = 1/3 - 1/3 = 0.0 + assert dataset.metadata.iloc[0]["diffusion_margin"] == 0.0 + + # median_margin = 1/3 - median([1/3, 1/3]) = 0.0 + assert dataset.metadata.iloc[0]["diffusion_median_margin"] == 0.0 + + # entropy of [0.5, 0.5] (normalized runner-ups) = 0.6931471805599453 + assert dataset.metadata.iloc[0]["diffusion_entropy"] == pytest.approx( + 0.6931471805599453, rel=1e-10, abs=1e-10 + ) + + # z-score should be 0 (all values equal, so std = 0) + assert dataset.metadata.iloc[0]["diffusion_z-score"] == pytest.approx( + 0.0, rel=1e-10, abs=1e-10 + ) + + def test_compute_with_no_diffusion_columns(self, diffusion_beam_features): + """Test that compute raises error when no diffusion beam columns are present.""" + metadata = pd.DataFrame({"confidence": [0.9]}) + dataset = CalibrationDataset(metadata=metadata, predictions=[None]) + + # Should raise error when diffusion predictions are completely missing + with pytest.raises(ValueError, match="No diffusion beam columns found"): + diffusion_beam_features.compute(dataset) + + def test_compute_with_only_prediction_columns(self, diffusion_beam_features): + """Test that compute raises error when log probability columns are missing.""" + metadata = pd.DataFrame( + { + "confidence": [0.9], + "diffusion_predictions_beam_0": ["['A', 'G']"], + # Missing log probability columns + } + ) + dataset = CalibrationDataset(metadata=metadata, predictions=[None]) + + # Should raise error when diffusion log probabilities are missing + with pytest.raises(ValueError, match="No diffusion beam columns found"): + diffusion_beam_features.compute(dataset) + + class TestCalibrationFeaturesInterface: """Test the abstract CalibrationFeatures interface.""" diff --git a/winnow/calibration/calibration_features.py b/winnow/calibration/calibration_features.py index bc052b9..ff15ce8 100644 --- a/winnow/calibration/calibration_features.py +++ b/winnow/calibration/calibration_features.py @@ -9,7 +9,7 @@ from numpy import median from scipy.stats import entropy from sklearn.neural_network import MLPRegressor - +from instanovo.inference.beam_search import ScoredSequence import koinapy from winnow.datasets.calibration_dataset import CalibrationDataset @@ -603,6 +603,284 @@ def row_beam_z_score(row): dataset.metadata["z-score"] = z_score +class DiffusionBeamFeatures(CalibrationFeatures): + """Calculates the margin, median margin and entropy of diffusion beam runners-up. + + This feature class processes beam search results from peptide diffusion refinement, + computing the same metrics as BeamFeatures but for diffusion-refined predictions. + The diffusion beam data should be stored in the metadata columns with names: + diffusion_predictions_beam_0, diffusion_log_probabilities_beam_0, etc. + """ + + @property + def dependencies(self) -> List[FeatureDependency]: + """Returns a list of dependencies required before computing the feature. + + Since this feature does not depend on other features, it returns an empty list. + + Returns: + List[FeatureDependency]: An empty list. + """ + return [] + + @property + def name(self) -> str: + """Returns the name of the feature. + + This method provides the natural language identifier used as the key for the feature. + + Returns: + str: The feature identifier. + """ + return "Diffusion Beam Features" + + @property + def columns(self) -> List[str]: + """Defines the column names for the computed features. + + Returns: + List[str]: A list of column names for diffusion beam features. + """ + return [ + "diffusion_margin", + "diffusion_median_margin", + "diffusion_entropy", + "diffusion_z-score", + ] + + def prepare(self, dataset: CalibrationDataset) -> None: + """Prepares the dataset before feature computation. + + Args: + dataset (CalibrationDataset): The dataset to prepare. + """ + return + + def compute(self, dataset: CalibrationDataset) -> None: + """Computes margin, median margin and entropy for diffusion beam search runners-up. + + - Diffusion Margin: Difference between the highest probability sequence and the second-best sequence. + - Diffusion Median Margin: Difference between the highest probability sequence and the median probability of the runner-ups. + - Diffusion Entropy: Shannon entropy of the normalised probabilities of the runner-up sequences. + - Diffusion Z-score: Distance between the top beam score and the population mean over all beam results for that spectra in units of the standard deviation. + + These metrics help assess the confidence of the top diffusion-refined prediction relative to lower-ranked candidates. + + Args: + dataset (CalibrationDataset): The dataset containing diffusion beam search predictions in metadata columns. + """ + # Parse diffusion predictions from metadata on-the-fly + diffusion_predictions = self._parse_diffusion_predictions(dataset) + + # Check if predictions were successfully parsed + if not diffusion_predictions or all( + pred is None for pred in diffusion_predictions + ): + raise ValueError( + "No diffusion beam columns found in metadata. " + "Expected columns like 'diffusion_predictions_beam_0' and 'diffusion_log_probabilities_beam_0'. " + "Please ensure diffusion beam search results are included in your dataset." + ) + + # Count beams with fewer than 2 sequences + count = sum( + 1 + for prediction in diffusion_predictions + if prediction is None or len(prediction) < 2 + ) + if count > 0: + warnings.warn( + f"{count} diffusion beam search results have fewer than two sequences. " + "This may affect the efficacy of computed diffusion beam features." + ) + + # Compute features using extracted helper methods + top_probs, second_probs = self._compute_probabilities(diffusion_predictions) + second_margin = [ + top_prob - second_prob + for top_prob, second_prob in zip(top_probs, second_probs) + ] + + runner_up_entropy, median_margin = self._compute_runner_up_features( + top_probs, diffusion_predictions + ) + z_score = self._compute_z_scores(diffusion_predictions) + + # Store computed features in dataset metadata + dataset.metadata["diffusion_margin"] = second_margin + dataset.metadata["diffusion_median_margin"] = median_margin + dataset.metadata["diffusion_entropy"] = runner_up_entropy + dataset.metadata["diffusion_z-score"] = z_score + + def _parse_diffusion_predictions( + self, dataset: CalibrationDataset + ) -> List[Optional[List]]: + """Parse diffusion beam predictions from dataset metadata. + + Args: + dataset (CalibrationDataset): The dataset containing diffusion metadata columns. + + Returns: + List[Optional[List]]: Parsed diffusion predictions as ScoredSequence objects. + """ + # Find all diffusion beam columns + diffusion_pred_cols = [ + col + for col in dataset.metadata.columns + if col.startswith("diffusion_predictions_beam_") + ] + diffusion_logprob_cols = [ + col + for col in dataset.metadata.columns + if col.startswith("diffusion_log_probabilities_beam_") + ] + + if not diffusion_pred_cols or not diffusion_logprob_cols: + return [None] * len(dataset.metadata) + + # Sort columns by beam index + diffusion_pred_cols = sorted( + diffusion_pred_cols, key=lambda x: int(x.split("_")[-1]) + ) + diffusion_logprob_cols = sorted( + diffusion_logprob_cols, key=lambda x: int(x.split("_")[-1]) + ) + + # Parse each row's diffusion beams into ScoredSequence objects + diffusion_predictions = [] + for _, row in dataset.metadata.iterrows(): + beam_list = [] + for pred_col, logprob_col in zip( + diffusion_pred_cols, diffusion_logprob_cols + ): + pred_str = row[pred_col] + logprob = row[logprob_col] + + # Skip if either value is missing + if pd.isna(pred_str) or pd.isna(logprob): + break + + # Parse sequence string and create ScoredSequence + sequence = self._parse_sequence_string(pred_str) + if sequence: + scored_seq = ScoredSequence( + sequence=sequence, + mass_error=None, + sequence_log_probability=logprob, + token_log_probabilities=None, + ) + beam_list.append(scored_seq) + + # Add the beam list (or None if empty) + diffusion_predictions.append(beam_list if beam_list else None) + + return diffusion_predictions + + def _parse_sequence_string(self, seq_str: str) -> List[str]: + """Parse a string-formatted sequence into a list of residues. + + Args: + seq_str (str): String representation of sequence, e.g., "['L', 'T', 'D', 'K']" + + Returns: + List[str]: List of residues + """ + import ast + + try: + return ast.literal_eval(seq_str) + except (ValueError, SyntaxError): + return [] + + def _compute_probabilities( + self, diffusion_predictions: List[Optional[List]] + ) -> Tuple[List[float], List[float]]: + """Extract top and second probabilities from diffusion predictions. + + Args: + diffusion_predictions (List[Optional[List]]): Parsed diffusion predictions. + + Returns: + Tuple[List[float], List[float]]: Top probabilities and second probabilities. + """ + top_probs = [ + exp(prediction[0].sequence_log_probability) + if prediction and len(prediction) >= 1 + else 0.0 + for prediction in diffusion_predictions + ] + second_probs = [ + exp(prediction[1].sequence_log_probability) + if prediction and len(prediction) >= 2 + else 0.0 + for prediction in diffusion_predictions + ] + return top_probs, second_probs + + def _compute_runner_up_features( + self, top_probs: List[float], diffusion_predictions: List[Optional[List]] + ) -> Tuple[List[float], List[float]]: + """Compute entropy and median margin from runner-up probabilities. + + Args: + top_probs (List[float]): List of top beam probabilities. + diffusion_predictions (List[Optional[List]]): Parsed diffusion predictions. + + Returns: + Tuple[List[float], List[float]]: Runner-up entropy and median margin values. + """ + runner_up_probs = [ + [exp(item.sequence_log_probability) for item in prediction[1:]] + if prediction and len(prediction) >= 2 + else [0.0] + for prediction in diffusion_predictions + ] + + normalised_runner_up_probs = [ + [probability / sum(probabilities) for probability in probabilities] + if sum(probabilities) != 0 + else 0.0 + for probabilities in runner_up_probs + ] + + runner_up_entropy = [ + entropy(probs) if probs != 0 else 0.0 + for probs in normalised_runner_up_probs + ] + + runner_up_median = [median(probs) for probs in runner_up_probs] + median_margin = [ + top_prob - median_prob + for top_prob, median_prob in zip(top_probs, runner_up_median) + ] + + return runner_up_entropy, median_margin + + def _compute_z_scores( + self, diffusion_predictions: List[Optional[List]] + ) -> List[float]: + """Compute z-scores for each diffusion beam prediction. + + Args: + diffusion_predictions (List[Optional[List]]): Parsed diffusion predictions. + + Returns: + List[float]: Z-score values for each prediction. + """ + + def row_beam_z_score(prediction): + if not prediction or len(prediction) < 1: + return 0.0 + probabilities = [exp(beam.sequence_log_probability) for beam in prediction] + mean_prob = np.mean(probabilities) + std_prob = np.std(probabilities) + if std_prob == 0: + return 0.0 + return (probabilities[0] - mean_prob) / std_prob + + return [row_beam_z_score(prediction) for prediction in diffusion_predictions] + + class RetentionTimeFeature(CalibrationFeatures): """Computes Prosit iRT features and calibrates an iRT predictor. diff --git a/winnow/datasets/data_loaders.py b/winnow/datasets/data_loaders.py index 34c3d4c..ae2acca 100644 --- a/winnow/datasets/data_loaders.py +++ b/winnow/datasets/data_loaders.py @@ -41,8 +41,16 @@ def _load_beam_preds( """ df = pl.read_csv(predictions_path) # Use polars column selectors to split dataframe - beam_df = df.select(cs.contains("_beam_")) - preds_df = df.select(~cs.contains(["_beam_", "_log_probs_"])) + beam_df = df.select( + cs.contains( + "instanovo_predictions_beam", "instanovo_log_probabilities_beam" + ) + ) + preds_df = df.select( + ~cs.contains( + ["instanovo_predictions_beam", "instanovo_log_probabilities_beam"] + ) + ) return preds_df, beam_df @staticmethod @@ -64,9 +72,9 @@ def convert_row_to_scored_sequences( for beam in range(num_beams): seq_col, log_prob_col, token_log_prob_col = ( - f"preds_beam_{beam}", - f"log_probs_beam_{beam}", - f"token_log_probs_{beam}", + f"instanovo_predictions_beam_{beam}", + f"instanovo_log_probabilities_beam_{beam}", + f"token_log_probabilities_beam_{beam}", ) sequence, log_prob, token_log_prob = ( row.get(seq_col), @@ -91,7 +99,7 @@ def convert_row_to_scored_sequences( [ pl.col(col).str.replace_all("L", "I") for col in beam_df.columns - if "preds_beam" in col + if "instanovo_predictions_beam" in col ] ) @@ -114,8 +122,8 @@ def _process_predictions(dataset: pd.DataFrame, has_labels: bool) -> pd.DataFram pd.DataFrame: The processed dataframe. """ rename_dict = { - "preds": "prediction_untokenised", - "preds_tokenised": "prediction", + "predictions": "prediction_untokenised", + "predictions_tokenised": "prediction", "log_probs": "confidence", } if has_labels: diff --git a/winnow/scripts/main.py b/winnow/scripts/main.py index 9123b96..a32b348 100644 --- a/winnow/scripts/main.py +++ b/winnow/scripts/main.py @@ -5,6 +5,7 @@ RetentionTimeFeature, ChimericFeatures, BeamFeatures, + DiffusionBeamFeatures, ) from winnow.calibration.calibrator import ProbabilityCalibrator from winnow.datasets.calibration_dataset import CalibrationDataset @@ -231,6 +232,7 @@ def initialise_calibrator() -> ProbabilityCalibrator: ) calibrator.add_feature(ChimericFeatures(mz_tolerance=MZ_TOLERANCE)) calibrator.add_feature(BeamFeatures()) + calibrator.add_feature(DiffusionBeamFeatures()) return calibrator