diff --git a/protzilla/constants/colors.py b/protzilla/constants/colors.py
index 0c6600f7..9a92e93c 100644
--- a/protzilla/constants/colors.py
+++ b/protzilla/constants/colors.py
@@ -11,4 +11,37 @@
"""First color in list. Conventionally used for visualizing outliers."""
PLOT_SECONDARY_COLOR = PLOT_COLOR_SEQUENCE[1]
-"""Second color in list."""
\ No newline at end of file
+"""Second color in list."""
+
+
+def interpolate_color(color_a, color_b, t):
+ """
+ Interpolate between two RGB color strings based on a float t between 0 and 1.
+
+ Args:
+ color_a (str): RGB color string in the format "#RRGGBB".
+ color_b (str): RGB color string in the format "#RRGGBB".
+ t (float): A float between 0 and 1 representing the interpolation factor.
+
+ Returns:
+ str: Interpolated color as an RGB string in the format "#RRGGBB".
+ """
+ if not (0 <= t <= 1):
+ raise ValueError("Interpolation factor t must be between 0 and 1")
+
+ # Convert hex color strings to RGB tuples
+ def hex_to_rgb(hex_color):
+ hex_color = hex_color.lstrip("#")
+ return tuple(int(hex_color[i : i + 2], 16) for i in (0, 2, 4))
+
+ # Convert RGB tuples back to hex color strings
+ def rgb_to_hex(rgb):
+ return f"#{''.join(f'{c:02x}' for c in rgb)}"
+
+ rgb_a = hex_to_rgb(color_a)
+ rgb_b = hex_to_rgb(color_b)
+
+ # Interpolate each color channel
+ interpolated_rgb = tuple(int(a + (b - a) * t) for a, b in zip(rgb_a, rgb_b))
+
+ return rgb_to_hex(interpolated_rgb)
diff --git a/protzilla/data_analysis/protein_coverage.py b/protzilla/data_analysis/protein_coverage.py
new file mode 100644
index 00000000..bd77d783
--- /dev/null
+++ b/protzilla/data_analysis/protein_coverage.py
@@ -0,0 +1,514 @@
+from colour import color_scale
+from docutils.nodes import title
+from tqdm import tqdm
+from protzilla.constants.paths import EXTERNAL_DATA_PATH
+from protzilla.data_analysis.differential_expression_mann_whitney import (
+ mann_whitney_test_on_intensity_data,
+)
+from protzilla.disk_operator import PickleOperator
+from protzilla.constants.colors import (
+ PLOT_PRIMARY_COLOR,
+ PLOT_COLOR_SEQUENCE,
+ interpolate_color,
+)
+from dataclasses import dataclass
+import pandas as pd
+from numpy import log2
+import plotly.graph_objects as go
+from plotly.subplots import make_subplots
+from protzilla.constants.protzilla_logging import logger
+
+# import StrEnum
+from enum import StrEnum
+
+INTENSITY_COLORS = ["#FFFFFF", PLOT_COLOR_SEQUENCE[3]]
+SEQUENCE_DEPTH_PEPTIDE_SPACING = 1
+
+
+class IntensityNormalization(StrEnum):
+ min_max_scaling = "min_max_scaling"
+ none = "none"
+
+
+class AggregationMethod(StrEnum):
+ median = "median"
+ mean = "mean"
+
+
+@dataclass(unsafe_hash=True)
+class PeptideMatch:
+ peptide_sequence: str
+ start_location_on_protein: int
+ end_location_on_protein: int
+ intensity: float = 0.0
+ metadata_group: str = ""
+
+
+@dataclass(unsafe_hash=True)
+class ProteinHit:
+ protein_id: str
+ start_location_on_protein: int
+ end_location_on_protein: int
+
+
+class IntensityNormalization(StrEnum):
+ min_max_scaling = "min_max_scaling"
+ none = "none"
+
+
+def build_kmer_dictionary(
+ protein_dictionary: dict[str, str], k: int = 5
+) -> dict[str, list[tuple[str, int]]]:
+ """
+ Builds a dictionary of all kmers in a list of protein sequences. The kmers are generated by sliding a window of size
+ k along the protein sequences. The dictionary maps a the k-mer to a list of tuples containing the protein ID and
+ the index of
+ the k-mer in the protein sequence. The dictionary is saved to disk for future use.
+ """
+ # if the dictionary already exists, load it from disk
+ kmer_dict_path = EXTERNAL_DATA_PATH / f"kmer_dict_{k}.pkl"
+ if kmer_dict_path.exists():
+ kmer_dict = PickleOperator.read(kmer_dict_path)
+ return kmer_dict
+ kmer_dict = {}
+ for protein_id, protein_sequence in tqdm(
+ protein_dictionary.items(),
+ desc="Building kmer dictionary",
+ unit_scale=True,
+ unit="protein",
+ ):
+ for i in range(len(protein_sequence) - k + 1):
+ kmer = protein_sequence[i : i + k]
+ if kmer in kmer_dict:
+ kmer_dict[kmer].append((protein_id, i))
+ else:
+ kmer_dict[kmer] = [(protein_id, i)]
+ PickleOperator.write(kmer_dict_path, kmer_dict)
+ return kmer_dict
+
+
+def match_peptide_to_protein_ids(
+ peptide_sequence: str,
+ protein_kmer_dictionary: dict[str, list[tuple[str, int]]],
+ protein_dictionary: dict[str, str],
+) -> list[ProteinHit]:
+ """
+ Matches a peptide sequence to a dictionary of kmers in protein sequences.
+ Returns a list of ProteinHit objects containing the protein ID and the start location of the peptide on the protein.
+ """
+ if len(peptide_sequence) == 0:
+ raise ValueError("Peptide sequence is empty.")
+ k = 5
+ peptide_kmers = [
+ peptide_sequence[i : i + k] for i in range(len(peptide_sequence) - k + 1)
+ ]
+ first_kmer, last_kmer = peptide_kmers[0], peptide_kmers[-1]
+ # match the kmers
+ first_kmer_matches = protein_kmer_dictionary.get(first_kmer, [])
+ last_kmer_matches = protein_kmer_dictionary.get(last_kmer, [])
+ if first_kmer_matches == [] or last_kmer_matches == []:
+ return []
+
+ hits = [
+ ProteinHit(
+ protein_id, start_first_kmer, start_first_kmer + len(peptide_sequence)
+ )
+ for protein_id, start_first_kmer in first_kmer_matches
+ for _, start_last_kmer in last_kmer_matches
+ if protein_id == _
+ and start_last_kmer == start_first_kmer + len(peptide_sequence) - k
+ and protein_dictionary[protein_id][
+ start_first_kmer : start_first_kmer + len(peptide_sequence)
+ ]
+ == peptide_sequence
+ ]
+ hits = list(set(hits)) # remove duplicates
+
+ # Just sanity checks
+ for hit in hits:
+ subsequence = protein_dictionary[hit.protein_id][
+ hit.start_location_on_protein : hit.end_location_on_protein
+ ]
+ assert len(subsequence) == len(
+ peptide_sequence
+ ), f"Lengths do not match: {len(subsequence)} != {len(peptide_sequence)}"
+ assert subsequence in peptide_sequence, (
+ f"Subsequence not in peptide sequence:\nA: {subsequence}\nB: "
+ f"{peptide_sequence}"
+ )
+ assert peptide_sequence in protein_dictionary[hit.protein_id], (
+ f"Peptide not in protein sequence: {peptide_sequence} not in "
+ f"{protein_dictionary[protein_id]}"
+ )
+ return hits
+
+
+def plot_protein_coverage(
+ fasta_df: pd.DataFrame,
+ peptide_df: pd.DataFrame,
+ metadata_df: pd.DataFrame,
+ protein_id: str,
+ grouping: str,
+ selected_groups: list[str] = None,
+ aggregation_method: AggregationMethod = AggregationMethod.median,
+) -> None:
+ """
+ Plots the coverage of a protein sequence by peptides.
+ """
+ # generate the protein kmer dictionary
+ if selected_groups is None or selected_groups == []:
+ raise ValueError("No samples provided.")
+
+ # add group information to the peptide dataframe
+ peptide_df = peptide_df.merge(metadata_df, on="Sample")
+
+ reduced_peptide_df = peptide_df[
+ peptide_df[grouping].isin(selected_groups) & peptide_df["Intensity"] > 0
+ ]
+ if len(reduced_peptide_df) == 0:
+ raise ValueError("No peptides found for the samples provided.")
+
+ protein_sequence_dict = dict(
+ zip(fasta_df["Protein ID"], fasta_df["Protein Sequence"])
+ )
+ sequence_kmer_dict = build_kmer_dictionary(protein_sequence_dict, k=5)
+
+ if protein_id not in protein_sequence_dict:
+ raise ValueError(f"Protein ID {protein_id} not found in protein dictionary.")
+
+ protein_sequence = protein_sequence_dict[protein_id]
+ peptide_matches = []
+ coverage_by_group_name = {
+ sample: [0] * len(protein_sequence) for sample in selected_groups
+ }
+
+ for (group_name, peptide_sequence), row in tqdm(
+ reduced_peptide_df[[grouping, "Sequence", "Intensity"]]
+ .drop_duplicates()
+ .groupby([grouping, "Sequence"]),
+ desc="Matching peptides to protein",
+ unit_scale=True,
+ unit="peptide",
+ ):
+ sample = extract_peptide_from_slice(row, aggregation_method)
+ if sample.empty:
+ continue
+ intensity = sample["Intensity"].values[0]
+ protein_hits = match_peptide_to_protein_ids(
+ peptide_sequence=peptide_sequence,
+ protein_kmer_dictionary=sequence_kmer_dict,
+ protein_dictionary=protein_sequence_dict,
+ )
+ # we only care about the hits pertaining to the argument-supplied protein id
+ filtered_protein_hits = filter(
+ lambda x: x.protein_id == protein_id, protein_hits
+ )
+ for protein_hit in filtered_protein_hits:
+ # add the peptide sequence and location to the peptide matches pertaining to the protein id
+ peptide_matches.append(
+ PeptideMatch(
+ peptide_sequence=peptide_sequence,
+ start_location_on_protein=protein_hit.start_location_on_protein,
+ end_location_on_protein=protein_hit.end_location_on_protein,
+ intensity=log2(intensity),
+ metadata_group=group_name,
+ )
+ )
+ # update the coverage of the protein sequence
+ increment_coverage(coverage_by_group_name[group_name], protein_hit)
+
+ if len(peptide_matches) == 0:
+ raise ValueError(f"No peptides matched for protein {protein_id}")
+
+ # now that all the matches have been determined with their start and end on the protein sequence, we need to find
+ # an optimal solution for the location of their rectangles in the plot without overlap while minimizing the
+ # required number of rows (vertical space)
+ rows_by_group_name = distribute_to_rows(peptide_matches)
+
+ fig = build_coverage_plot(
+ coverages_by_group_name=coverage_by_group_name,
+ peptide_matches=peptide_matches,
+ protein_id=protein_id,
+ protein_sequence=protein_sequence,
+ rows_by_group_name=rows_by_group_name,
+ grouping=grouping,
+ aggregation_method=aggregation_method,
+ )
+
+ return dict(plots=[fig])
+
+
+def build_coverage_plot(
+ coverages_by_group_name: list[int],
+ peptide_matches: list[PeptideMatch],
+ protein_id: str,
+ protein_sequence: str,
+ rows_by_group_name: dict[str, list[list[PeptideMatch]]],
+ intensity_normalization: IntensityNormalization = IntensityNormalization.min_max_scaling,
+ grouping: str = "",
+ aggregation_method: AggregationMethod = AggregationMethod.median,
+) -> go.Figure:
+ number_of_subplots = len(rows_by_group_name.keys())
+ protein_sequence_labels = [
+ f"{amino_acid} - {amino_acid_index} "
+ for amino_acid_index, amino_acid in enumerate(protein_sequence)
+ ]
+ subplot_titles = [
+ f"Peptides of {group_name}" for group_name in rows_by_group_name.keys()
+ ]
+ max_coverage_value = get_max_coverage(coverages_by_group_name)
+
+ fig = make_subplots(
+ rows=number_of_subplots,
+ cols=1,
+ shared_xaxes=True,
+ vertical_spacing=0.1,
+ subplot_titles=subplot_titles,
+ specs=[[{"secondary_y": True}] for _ in range(number_of_subplots)],
+ )
+ # Peptides
+ for group_index, group_name in enumerate(rows_by_group_name.keys(), start=1):
+ peptide_rows = rows_by_group_name[group_name]
+ coverage = coverages_by_group_name[group_name]
+
+ # Calculate the desired heights
+ max_bar_height = max_coverage_value
+ desired_peptide_height = max_bar_height * 2 # 1:2 ratio
+
+ # Calculate the height scaling factor for peptides
+ num_peptide_rows = len(peptide_rows)
+ if num_peptide_rows > 0:
+ # Include spacing in the calculation
+ total_spacing = SEQUENCE_DEPTH_PEPTIDE_SPACING
+ peptide_box_height = (
+ desired_peptide_height - total_spacing
+ ) / num_peptide_rows
+ else:
+ peptide_box_height = 1.0
+
+ add_sequence_depth_to_plot(
+ fig=fig,
+ coverage=coverage,
+ group_subplot_index=group_index,
+ protein_sequence_labels=protein_sequence_labels,
+ )
+
+ # Peptide plot
+ current_group_intensities = [
+ peptide_match.intensity for row in peptide_rows for peptide_match in row
+ ]
+ max_intensity, min_intensity = max(current_group_intensities), min(
+ current_group_intensities
+ )
+ scale_intensity = lambda intensity: (
+ (intensity - min_intensity) / (max_intensity - min_intensity)
+ if max_intensity != min_intensity
+ else 0
+ )
+ for row_index, row in enumerate(peptide_rows):
+ for peptide_match in row:
+ add_peptide_to_plot(
+ fig=fig,
+ normalized_intensity=scale_intensity(peptide_match.intensity),
+ peptide_match=peptide_match,
+ group_subplot_index=group_index,
+ protein_sequence_labels=protein_sequence_labels,
+ row_index=row_index,
+ offset=max_coverage_value + SEQUENCE_DEPTH_PEPTIDE_SPACING,
+ box_height=peptide_box_height,
+ color_a=INTENSITY_COLORS[0],
+ color_b=INTENSITY_COLORS[1],
+ grouping=grouping,
+ )
+
+ fig.update_layout(hovermode="closest")
+ # Update y-axis range to accommodate both the bar plot and peptide sections
+ total_height = (
+ max_coverage_value + SEQUENCE_DEPTH_PEPTIDE_SPACING + desired_peptide_height
+ )
+ fig.update_yaxes(
+ range=[0, total_height],
+ showticklabels=False,
+ showgrid=False,
+ row=group_index,
+ col=1,
+ )
+ fig.update_xaxes(showticklabels=False, showgrid=False, row=group_index, col=1)
+ fig.update_xaxes(showticklabels=True, row=number_of_subplots, col=1)
+
+ add_intensity_legend(fig, aggregation_method)
+
+ return fig
+
+
+def add_intensity_legend(
+ fig, aggregation_method: AggregationMethod = AggregationMethod.median
+):
+ color_scale = [[0, INTENSITY_COLORS[0]], [1, INTENSITY_COLORS[1]]]
+ color_legend_trace = go.Scatter(
+ x=[None],
+ y=[None],
+ mode="markers",
+ marker=dict(
+ colorscale=color_scale,
+ showscale=True,
+ cmin=0,
+ cmax=1,
+ colorbar=dict(
+ title=f"Intensity of peptide
(aggregated via {aggregation_method})",
+ x=1.0, # Move further outside the plot
+ len=0.9, # Increase length of colorbar (70% of subplot height)
+ thickness=30, # Increase thickness of colorbar
+ titleside="right", # Move title to the right side of the bar
+ ),
+ size=10,
+ ),
+ showlegend=False,
+ )
+ fig.add_trace(color_legend_trace, row=1, col=1)
+
+
+def add_sequence_depth_to_plot(
+ fig: go.Figure,
+ coverage: list[int],
+ group_subplot_index: int,
+ protein_sequence_labels: list[str],
+) -> None:
+ SEQUENCE_DEPTH_BAR_CHART = dict(
+ x=protein_sequence_labels,
+ y=coverage,
+ showlegend=False,
+ marker=dict(color=PLOT_PRIMARY_COLOR),
+ # add hover text
+ hoverinfo="y",
+ hovertemplate="Sequencing depth at %{x}: %{y}",
+ )
+ fig.add_trace(go.Bar(**SEQUENCE_DEPTH_BAR_CHART), row=group_subplot_index, col=1)
+
+
+def add_peptide_to_plot(
+ fig: go.Figure,
+ normalized_intensity: float,
+ peptide_match: PeptideMatch,
+ group_subplot_index: int,
+ protein_sequence_labels: list[str],
+ row_index: int,
+ offset: int,
+ box_height: float,
+ color_a: str = "#FFFFFF",
+ color_b: str = PLOT_COLOR_SEQUENCE[3],
+ grouping: str = "",
+) -> None:
+ x0, x1 = (
+ peptide_match.start_location_on_protein,
+ peptide_match.end_location_on_protein,
+ )
+ y0, y1 = row_index * box_height + offset, (row_index + 1) * box_height + offset
+ # interpolate between the two colors
+ color = interpolate_color(color_a, color_b, normalized_intensity)
+ PEPTIDE_SHAPE = dict(
+ type="rect",
+ x0=x0 - 0.5,
+ y0=y0,
+ x1=x1 - 0.5,
+ y1=y1,
+ fillcolor=color,
+ line_color=color,
+ layer="above",
+ )
+ fig.add_shape(**PEPTIDE_SHAPE, row=group_subplot_index, col=1)
+ # add invisible plotly object to the rectangle to show the peptide sequence when hovered over
+ fig.add_trace(
+ go.Scatter(
+ x=protein_sequence_labels[
+ peptide_match.start_location_on_protein : peptide_match.end_location_on_protein
+ + 1
+ ],
+ y=[(y0 + y1) / 2] * len(peptide_match.peptide_sequence),
+ text=[
+ f"{grouping}: {peptide_match.metadata_group}
Peptide: {peptide_match.peptide_sequence}
"
+ f"({peptide_match.start_location_on_protein}-{peptide_match.end_location_on_protein})
"
+ f"Intensity: {peptide_match.intensity}"
+ ]
+ * len(peptide_match.peptide_sequence),
+ mode="markers",
+ marker=dict(opacity=0, size=15), # make marker invisible
+ hoverinfo="text",
+ hovertemplate="%{text}",
+ showlegend=False,
+ ),
+ row=group_subplot_index,
+ col=1,
+ )
+
+
+def increment_coverage(coverage: list[int], protein_hit: ProteinHit) -> None:
+ coverage[
+ protein_hit.start_location_on_protein : protein_hit.end_location_on_protein
+ ] = [
+ coverage + 1
+ for coverage in coverage[
+ protein_hit.start_location_on_protein : protein_hit.end_location_on_protein
+ ]
+ ]
+
+
+def extract_peptide_from_slice(
+ slice_df: pd.DataFrame,
+ aggregation_strategy: AggregationMethod = AggregationMethod.median,
+) -> pd.DataFrame:
+ """
+ Given multiple peptide datapoints, return a single peptide datapoint with the intensity value extracted from the
+ slice of the peptide data.
+ """
+ if len(slice_df) == 1:
+ return slice_df
+ if aggregation_strategy == AggregationMethod.median:
+ slice_df = slice_df.groupby("Sequence").median()
+ elif aggregation_strategy == AggregationMethod.mean:
+ slice_df = slice_df.groupby("Sequence").mean()
+ else:
+ raise ValueError(f"Unknown strategy: {aggregation_strategy}")
+ return slice_df
+
+
+def distribute_to_rows(
+ peptide_matches: list[PeptideMatch],
+) -> dict[list[list[PeptideMatch]]]:
+ """
+ Distributes peptide matches to rows such that no two peptides overlap in the same row.
+ Greedy algorithm (see Interval Scheduling Problem).
+ """
+ if len(peptide_matches) == 0:
+ raise ValueError(
+ "Attempted to distribute empty list of peptide matches to rows in plot."
+ )
+ peptide_matches.sort(
+ key=lambda peptide_match: peptide_match.start_location_on_protein
+ )
+ rows = {
+ group_name: []
+ for group_name in set(
+ [peptide_match.metadata_group for peptide_match in peptide_matches]
+ )
+ }
+ for peptide_match in peptide_matches:
+ # find the first row that does not overlap with the current peptide
+ for row in rows[peptide_match.metadata_group]:
+ if (
+ row[-1].end_location_on_protein
+ < peptide_match.start_location_on_protein
+ ):
+ row.append((peptide_match))
+ break
+ else:
+ rows[peptide_match.metadata_group].append([(peptide_match)])
+ return rows
+
+
+def get_max_coverage(coverage: dict[list[int]]) -> int:
+ if len(coverage) == 0:
+ raise ValueError(
+ "Cannot calculate maximum coverage value: No coverage data provided."
+ )
+ return max([max(coverage) for coverage in coverage.values()])
diff --git a/protzilla/disk_operator.py b/protzilla/disk_operator.py
index ce7f07ad..dfee538c 100644
--- a/protzilla/disk_operator.py
+++ b/protzilla/disk_operator.py
@@ -6,6 +6,7 @@
import pandas as pd
import yaml
+import pickle
from plotly.io import read_json, write_json
import protzilla.utilities as utilities
@@ -36,6 +37,25 @@ def __exit__(self, exc_type, exc_val, exc_tb):
return False
return True
+class PickleOperator:
+ @staticmethod
+ def read(file_path: Path):
+ with ErrorHandler():
+ with open(file_path, "rb") as file:
+ logger.info(f"Reading pickle from {file_path}")
+ return pickle.load(file)
+
+ @staticmethod
+ def write(file_path: Path, data):
+ with ErrorHandler():
+ if not file_path.exists():
+ if not file_path.parent.exists():
+ logger.info(
+ f"Parent directory {file_path.parent} did not exist and was created"
+ )
+ file_path.parent.mkdir(parents=True)
+ with open(file_path, "wb") as file:
+ pickle.dump(data, file)
class YamlOperator:
@staticmethod
diff --git a/protzilla/importing/fasta_import.py b/protzilla/importing/fasta_import.py
new file mode 100644
index 00000000..bbdb9373
--- /dev/null
+++ b/protzilla/importing/fasta_import.py
@@ -0,0 +1,36 @@
+"""
+This module contains the code to parse a fasta file containing protein sequences and their ids.
+"""
+import logging
+
+import pandas as pd
+from Bio import SeqIO
+
+
+def parse_fasta_id(fasta_id: str) -> str:
+ """
+ Parse the fasta id to get the protein name from the fasta id string
+ """
+ metadata = fasta_id.split("|")[1]
+ if len(metadata) < 2:
+ logging.warning(f"Metadata too short: {metadata}")
+ return ""
+ return metadata
+
+
+def fasta_import(file_path: str) -> pd.DataFrame:
+ """
+ Import a fasta file and return a DataFrame with the protein sequences and their protein ids
+ """
+ fasta_iterator = SeqIO.parse(open(file_path), "fasta")
+ protein_ids = []
+ protein_sequences = []
+ for fasta_sequence in fasta_iterator:
+ id, sequence = parse_fasta_id(fasta_sequence.id), str(fasta_sequence.seq)
+ protein_ids.append(id)
+ protein_sequences.append(sequence)
+
+ fasta_sequences = pd.DataFrame(
+ {"Protein ID": protein_ids, "Protein Sequence": protein_sequences}
+ )
+ return {"fasta_df": fasta_sequences}
diff --git a/protzilla/methods/data_analysis.py b/protzilla/methods/data_analysis.py
index 511629f5..05a0d0fc 100644
--- a/protzilla/methods/data_analysis.py
+++ b/protzilla/methods/data_analysis.py
@@ -7,15 +7,17 @@
k_means,
)
from protzilla.data_analysis.differential_expression_anova import anova
-from protzilla.data_analysis.differential_expression_kruskal_wallis import kruskal_wallis_test_on_ptm_data, \
- kruskal_wallis_test_on_intensity_data
+from protzilla.data_analysis.differential_expression_kruskal_wallis import (
+ kruskal_wallis_test_on_intensity_data,
+ kruskal_wallis_test_on_ptm_data,
+)
from protzilla.data_analysis.differential_expression_linear_model import linear_model
from protzilla.data_analysis.differential_expression_mann_whitney import (
- mann_whitney_test_on_intensity_data, mann_whitney_test_on_ptm_data)
+ mann_whitney_test_on_intensity_data,
+ mann_whitney_test_on_ptm_data,
+)
from protzilla.data_analysis.differential_expression_t_test import t_test
from protzilla.data_analysis.dimension_reduction import t_sne, umap
-from protzilla.data_analysis.ptm_analysis import ptms_per_sample, \
- ptms_per_protein_and_sample, select_peptides_of_protein
from protzilla.data_analysis.model_evaluation import evaluate_classification_model
from protzilla.data_analysis.plots import (
clustergram_plot,
@@ -23,11 +25,12 @@
prot_quant_plot,
scatter_plot,
)
+from protzilla.data_analysis.protein_coverage import plot_protein_coverage
from protzilla.data_analysis.protein_graphs import peptides_to_isoform, variation_graph
from protzilla.data_analysis.ptm_analysis import (
- select_peptides_of_protein,
ptms_per_protein_and_sample,
ptms_per_sample,
+ select_peptides_of_protein,
)
from protzilla.data_analysis.ptm_quantification import flexiquant_lf
from protzilla.methods.data_preprocessing import TransformationLog
@@ -164,8 +167,10 @@ def plot(self, inputs):
class DifferentialExpressionMannWhitneyOnIntensity(DataAnalysisStep):
display_name = "Mann-Whitney Test"
operation = "differential_expression"
- method_description = ("A function to conduct a Mann-Whitney U test between groups defined in the clinical data."
- "The p-values are corrected for multiple testing.")
+ method_description = (
+ "A function to conduct a Mann-Whitney U test between groups defined in the clinical data."
+ "The p-values are corrected for multiple testing."
+ )
input_keys = [
"protein_df",
@@ -191,7 +196,9 @@ def method(self, inputs: dict) -> dict:
def insert_dataframes(self, steps: StepManager, inputs) -> dict:
if steps.get_step_output(Step, "protein_df", inputs["protein_df"]) is not None:
- inputs["protein_df"] = steps.get_step_output(Step, "protein_df", inputs["protein_df"])
+ inputs["protein_df"] = steps.get_step_output(
+ Step, "protein_df", inputs["protein_df"]
+ )
inputs["metadata_df"] = steps.metadata_df
inputs["log_base"] = steps.get_step_input(TransformationLog, "log_base")
return inputs
@@ -200,8 +207,10 @@ def insert_dataframes(self, steps: StepManager, inputs) -> dict:
class DifferentialExpressionMannWhitneyOnPTM(DataAnalysisStep):
display_name = "Mann-Whitney Test"
operation = "Peptide analysis"
- method_description = ("A function to conduct a Mann-Whitney U test between groups defined in the clinical data."
- "The p-values are corrected for multiple testing.")
+ method_description = (
+ "A function to conduct a Mann-Whitney U test between groups defined in the clinical data."
+ "The p-values are corrected for multiple testing."
+ )
input_keys = [
"ptm_df",
@@ -234,8 +243,10 @@ def insert_dataframes(self, steps: StepManager, inputs) -> dict:
class DifferentialExpressionKruskalWallisOnIntensity(DataAnalysisStep):
display_name = "Kruskal-Wallis Test"
operation = "differential_expression"
- method_description = ("A function to conduct a Kruskal-Wallis test between groups defined in the clinical data."
- "The p-values are corrected for multiple testing.")
+ method_description = (
+ "A function to conduct a Kruskal-Wallis test between groups defined in the clinical data."
+ "The p-values are corrected for multiple testing."
+ )
input_keys = [
"protein_df",
@@ -265,8 +276,10 @@ def insert_dataframes(self, steps: StepManager, inputs) -> dict:
class DifferentialExpressionKruskalWallisOnIntensity(DataAnalysisStep):
display_name = "Kruskal-Wallis Test"
operation = "differential_expression"
- method_description = ("A function to conduct a Kruskal-Wallis test between groups defined in the clinical data."
- "The p-values are corrected for multiple testing.")
+ method_description = (
+ "A function to conduct a Kruskal-Wallis test between groups defined in the clinical data."
+ "The p-values are corrected for multiple testing."
+ )
input_keys = [
"protein_df",
@@ -288,7 +301,9 @@ def method(self, inputs: dict) -> dict:
return kruskal_wallis_test_on_intensity_data(**inputs)
def insert_dataframes(self, steps: StepManager, inputs) -> dict:
- inputs["protein_df"] = steps.get_step_output(Step, "protein_df", inputs["protein_df"])
+ inputs["protein_df"] = steps.get_step_output(
+ Step, "protein_df", inputs["protein_df"]
+ )
inputs["metadata_df"] = steps.metadata_df
inputs["log_base"] = steps.get_step_input(TransformationLog, "log_base")
return inputs
@@ -297,8 +312,10 @@ def insert_dataframes(self, steps: StepManager, inputs) -> dict:
class DifferentialExpressionKruskalWallisOnPTM(DataAnalysisStep):
display_name = "Kruskal-Wallis Test"
operation = "Peptide analysis"
- method_description = ("A function to conduct a Kruskal-Wallis test between groups defined in the clinical data."
- "The p-values are corrected for multiple testing.")
+ method_description = (
+ "A function to conduct a Kruskal-Wallis test between groups defined in the clinical data."
+ "The p-values are corrected for multiple testing."
+ )
input_keys = [
"ptm_df",
@@ -327,9 +344,11 @@ def insert_dataframes(self, steps: StepManager, inputs) -> dict:
class PlotVolcano(PlotStep):
display_name = "Volcano Plot"
operation = "plot"
- method_description = ("Plots the results of a differential expression analysis in a volcano plot. The x-axis shows "
- "the log2 fold change and the y-axis shows the -log10 of the corrected p-values. The user "
- "can define a fold change threshold and an alpha level to highlight significant items.")
+ method_description = (
+ "Plots the results of a differential expression analysis in a volcano plot. The x-axis shows "
+ "the log2 fold change and the y-axis shows the -log10 of the corrected p-values. The user "
+ "can define a fold change threshold and an alpha level to highlight significant items."
+ )
input_keys = [
"p_values",
"fc_threshold",
@@ -762,6 +781,38 @@ def insert_dataframes(self, steps: StepManager, inputs) -> dict:
return inputs
+class PlotProteinCoverage(PlotStep):
+ display_name = "Protein Coverage Plot"
+ operation = "plot"
+ method_description = (
+ "Create a protein coverage plot from a protein graph and peptide data"
+ )
+
+ input_keys = [
+ "protein_id",
+ "fasta_df",
+ "metadata_df",
+ "peptide_df",
+ "grouping",
+ "selected_groups",
+ "aggregation_method",
+ ]
+ output_keys = []
+
+ def method(self, inputs: dict) -> dict:
+ return plot_protein_coverage(**inputs)
+
+ def insert_dataframes(self, steps: StepManager, inputs) -> dict:
+ inputs["fasta_df"] = steps.get_step_output(
+ Step, "fasta_df", inputs["fasta_df_instance"]
+ )
+ inputs["peptide_df"] = steps.get_step_output(
+ Step, "peptide_df", inputs["peptide_df_instance"]
+ )
+ inputs["metadata_df"] = steps.metadata_df
+ return
+
+
class ProteinGraphVariationGraph(DataAnalysisStep):
display_name = "Protein Variation Graph"
operation = "protein_graph"
@@ -841,17 +892,23 @@ def insert_dataframes(self, steps: StepManager, inputs) -> dict:
inputs["metadata_df"] = steps.metadata_df
if inputs["auto_select"]:
- significant_proteins = (
- steps.get_step_output(DataAnalysisStep, "significant_proteins_df", inputs["protein_list"]))
- index_of_most_significant_protein = significant_proteins['corrected_p_value'].idxmin()
- most_significant_protein = significant_proteins.loc[index_of_most_significant_protein]
+ significant_proteins = steps.get_step_output(
+ DataAnalysisStep, "significant_proteins_df", inputs["protein_list"]
+ )
+ index_of_most_significant_protein = significant_proteins[
+ "corrected_p_value"
+ ].idxmin()
+ most_significant_protein = significant_proteins.loc[
+ index_of_most_significant_protein
+ ]
inputs["protein_id"] = [most_significant_protein["Protein ID"]]
- self.messages.append({
- "level": logging.INFO,
- "msg":
- f"Selected the most significant Protein: {most_significant_protein['Protein ID']}, "
- f"from {inputs['protein_list']}"
- })
+ self.messages.append(
+ {
+ "level": logging.INFO,
+ "msg": f"Selected the most significant Protein: {most_significant_protein['Protein ID']}, "
+ f"from {inputs['protein_list']}",
+ }
+ )
return inputs
@@ -859,8 +916,10 @@ def insert_dataframes(self, steps: StepManager, inputs) -> dict:
class PTMsPerSample(DataAnalysisStep):
display_name = "PTMs per Sample"
operation = "Peptide analysis"
- method_description = ("Analyze the post-translational modifications (PTMs) of a single protein of interest. "
- "This function requires a peptide dataframe with PTM information.")
+ method_description = (
+ "Analyze the post-translational modifications (PTMs) of a single protein of interest. "
+ "This function requires a peptide dataframe with PTM information."
+ )
input_keys = [
"peptide_df",
@@ -882,8 +941,10 @@ def insert_dataframes(self, steps: StepManager, inputs) -> dict:
class PTMsProteinAndPerSample(DataAnalysisStep):
display_name = "PTMs per Sample and Protein"
operation = "Peptide analysis"
- method_description = ("Analyze the post-translational modifications (PTMs) of all Proteins. "
- "This function requires a peptide dataframe with PTM information.")
+ method_description = (
+ "Analyze the post-translational modifications (PTMs) of all Proteins. "
+ "This function requires a peptide dataframe with PTM information."
+ )
input_keys = [
"peptide_df",
@@ -899,4 +960,4 @@ def insert_dataframes(self, steps: StepManager, inputs) -> dict:
inputs["peptide_df"] = steps.get_step_output(
Step, "peptide_df", inputs["peptide_df"]
)
- return inputs
\ No newline at end of file
+ return inputs
diff --git a/protzilla/methods/importing.py b/protzilla/methods/importing.py
index 6a2f6835..effcd5f1 100644
--- a/protzilla/methods/importing.py
+++ b/protzilla/methods/importing.py
@@ -1,5 +1,6 @@
from __future__ import annotations
+from protzilla.importing.fasta_import import fasta_import
from protzilla.importing.metadata_import import (
metadata_column_assignment,
metadata_import_method,
@@ -10,7 +11,7 @@
max_quant_import,
ms_fragger_import,
)
-from protzilla.importing.peptide_import import peptide_import, evidence_import
+from protzilla.importing.peptide_import import evidence_import, peptide_import
from protzilla.steps import Step, StepManager
@@ -51,7 +52,9 @@ def method(self, inputs):
class MsFraggerImport(ImportingStep):
display_name = "MS Fragger Combined Protein Import"
operation = "Protein Data Import"
- method_description = "Import the combined_protein.tsv file form output of MS Fragger"
+ method_description = (
+ "Import the combined_protein.tsv file form output of MS Fragger"
+ )
input_keys = ["file_path", "intensity_name", "map_to_uniprot", "aggregation_method"]
output_keys = ["protein_df"]
@@ -139,4 +142,16 @@ class EvidenceImport(ImportingStep):
output_keys = ["peptide_df"]
def method(self, inputs):
- return evidence_import(**inputs)
\ No newline at end of file
+ return evidence_import(**inputs)
+
+
+class FastaImport(ImportingStep):
+ display_name = "Fasta Protein Sequence Import"
+ operation = "fasta_import"
+ method_description = "Import a fasta file containing protein sequences."
+
+ input_keys = ["file_path"]
+ output_keys = ["fasta_df"]
+
+ def method(self, inputs):
+ return fasta_import(**inputs)
diff --git a/protzilla/steps.py b/protzilla/steps.py
index eecf0d04..eac35b65 100644
--- a/protzilla/steps.py
+++ b/protzilla/steps.py
@@ -7,6 +7,7 @@
from enum import Enum
from io import BytesIO
from pathlib import Path
+from protzilla.constants.protzilla_logging import logger
import pandas as pd
import plotly
@@ -455,6 +456,9 @@ def check_instance_identifier(step):
and input_key in step.inputs
):
return step.inputs[input_key]
+ logging.warning(
+ f"No input {input_key} found for step type {step_type} and instance identifier {instance_identifier}"
+ )
return None
def all_steps_in_section(self, section: str) -> list[Step]:
@@ -506,7 +510,6 @@ def metadata_df(self) -> pd.DataFrame | None:
from protzilla.methods.importing import ImportingStep
return self.get_step_output(ImportingStep, "metadata_df")
- logging.warning("No metadata_df found in steps")
@property
def preprocessed_output(self) -> Output:
diff --git a/tests/protzilla/data_analysis/test_protein_coverage.py b/tests/protzilla/data_analysis/test_protein_coverage.py
new file mode 100644
index 00000000..2339d840
--- /dev/null
+++ b/tests/protzilla/data_analysis/test_protein_coverage.py
@@ -0,0 +1,285 @@
+import pytest
+from protzilla.data_analysis.protein_coverage import (
+ distribute_to_rows,
+ get_max_coverage,
+ PeptideMatch,
+)
+
+import pytest
+import pandas as pd
+from protzilla.data_analysis.protein_coverage import (
+ extract_peptide_from_slice,
+ AggregationMethod,
+)
+
+import pytest
+from protzilla.data_analysis.protein_coverage import increment_coverage, ProteinHit
+
+import pytest
+from protzilla.data_analysis.protein_coverage import (
+ match_peptide_to_protein_ids,
+ ProteinHit,
+)
+
+
+def test_match_peptide_to_protein_ids_empty_peptide():
+ with pytest.raises(ValueError, match="Peptide sequence is empty."):
+ match_peptide_to_protein_ids("", {}, {})
+
+
+def test_match_peptide_to_protein_ids_no_matches():
+ peptide_sequence = "ABCDE"
+ protein_kmer_dictionary = {"ABCDE": [("protein1", 0)]}
+ protein_dictionary = {"protein1": "XYZ"}
+ result = match_peptide_to_protein_ids(
+ peptide_sequence, protein_kmer_dictionary, protein_dictionary
+ )
+ assert result == []
+
+
+def test_match_peptide_to_protein_ids_single_match():
+ peptide_sequence = "ABCDE"
+ protein_kmer_dictionary = {"ABCDE": [("protein1", 0)]}
+ protein_dictionary = {"protein1": "ABCDE"}
+ result = match_peptide_to_protein_ids(
+ peptide_sequence, protein_kmer_dictionary, protein_dictionary
+ )
+ expected = [
+ ProteinHit(
+ protein_id="protein1",
+ start_location_on_protein=0,
+ end_location_on_protein=5,
+ )
+ ]
+ assert result == expected
+
+
+def test_match_peptide_to_protein_ids_multiple_matches():
+ peptide_sequence = "ABCDE"
+ protein_kmer_dictionary = {"ABCDE": [("protein1", 0), ("protein2", 0)]}
+ protein_dictionary = {"protein1": "ABCDE", "protein2": "ABCDE"}
+ result = match_peptide_to_protein_ids(
+ peptide_sequence, protein_kmer_dictionary, protein_dictionary
+ )
+ expected = list(
+ set(
+ [
+ ProteinHit(
+ protein_id="protein1",
+ start_location_on_protein=0,
+ end_location_on_protein=5,
+ ),
+ ProteinHit(
+ protein_id="protein2",
+ start_location_on_protein=0,
+ end_location_on_protein=5,
+ ),
+ ]
+ )
+ )
+ assert result == expected
+
+
+def test_match_peptide_to_protein_ids_partial_match():
+ peptide_sequence = "ABCDE"
+ protein_kmer_dictionary = {"ABCDE": [("protein1", 0)]}
+ protein_dictionary = {"protein1": "ABCDEXYZ"}
+ result = match_peptide_to_protein_ids(
+ peptide_sequence, protein_kmer_dictionary, protein_dictionary
+ )
+ expected = [
+ ProteinHit(
+ protein_id="protein1",
+ start_location_on_protein=0,
+ end_location_on_protein=5,
+ )
+ ]
+ assert result == expected
+
+
+def test_increment_coverage():
+ coverage = [0, 0, 0, 0, 0]
+ protein_hit = ProteinHit(
+ protein_id="P12345", start_location_on_protein=1, end_location_on_protein=4
+ )
+ increment_coverage(coverage, protein_hit)
+ assert coverage == [0, 1, 1, 1, 0]
+
+
+def test_increment_coverage_multiple_hits():
+ coverage = [0, 0, 0, 0, 0]
+ protein_hit1 = ProteinHit(
+ protein_id="P12345", start_location_on_protein=1, end_location_on_protein=3
+ )
+ protein_hit2 = ProteinHit(
+ protein_id="P12345", start_location_on_protein=2, end_location_on_protein=4
+ )
+ increment_coverage(coverage, protein_hit1)
+ increment_coverage(coverage, protein_hit2)
+ assert coverage == [0, 1, 2, 1, 0]
+
+
+def test_increment_coverage_no_overlap():
+ coverage = [0, 0, 0, 0, 0]
+ protein_hit1 = ProteinHit(
+ protein_id="P12345", start_location_on_protein=0, end_location_on_protein=2
+ )
+ protein_hit2 = ProteinHit(
+ protein_id="P12345", start_location_on_protein=3, end_location_on_protein=5
+ )
+ increment_coverage(coverage, protein_hit1)
+ increment_coverage(coverage, protein_hit2)
+ assert coverage == [1, 1, 0, 1, 1]
+
+
+def test_extract_peptide_from_slice_single_entry():
+ data = {"Sequence": ["PEPTIDE"], "Intensity": [100]}
+ df = pd.DataFrame(data)
+ result = extract_peptide_from_slice(df)
+ assert result.equals(df)
+
+
+def test_extract_peptide_from_slice_multiple_entries():
+ data = {"Sequence": ["PEPTIDE", "PEPTIDE", "PEPTIDE"], "Intensity": [100, 130, 200]}
+ df = pd.DataFrame(data)
+ result = extract_peptide_from_slice(df)
+ expected = pd.DataFrame({"Sequence": "PEPTIDE", "Intensity": [130.0]})
+ expected.set_index("Sequence", inplace=True)
+ assert result.equals(expected)
+
+
+def test_extract_peptide_from_slice_median():
+ data = {"Sequence": ["PEPTIDE", "PEPTIDE"], "Intensity": [100, 200]}
+ df = pd.DataFrame(data)
+ result = extract_peptide_from_slice(df, AggregationMethod.median)
+ expected = pd.DataFrame({"Sequence": "PEPTIDE", "Intensity": [150.0]})
+ expected.set_index("Sequence", inplace=True)
+ assert result.equals(expected)
+
+
+def test_extract_peptide_from_slice_mean():
+ data = {"Sequence": ["PEPTIDE", "PEPTIDE"], "Intensity": [100, 200]}
+ df = pd.DataFrame(data)
+ result = extract_peptide_from_slice(df, AggregationMethod.mean)
+ expected = pd.DataFrame({"Sequence": "PEPTIDE", "Intensity": [150.0]})
+ expected.set_index("Sequence", inplace=True)
+ assert result.equals(expected)
+
+
+def test_extract_peptide_from_slice_unknown_strategy():
+ data = {"Sequence": ["PEPTIDE", "PEPTIDE"], "Intensity": [100, 200]}
+ df = pd.DataFrame(data)
+ with pytest.raises(ValueError, match="Unknown strategy: unknown"):
+ extract_peptide_from_slice(df, "unknown")
+
+
+import pytest
+from protzilla.data_analysis.protein_coverage import distribute_to_rows, PeptideMatch
+
+
+def test_distribute_to_rows_empty():
+ with pytest.raises(
+ ValueError,
+ match="Attempted to distribute empty list of peptide matches to rows in plot.",
+ ):
+ distribute_to_rows([])
+
+
+def test_distribute_to_rows_single_peptide():
+ peptide_matches = [
+ PeptideMatch(
+ peptide_sequence="AAA",
+ start_location_on_protein=0,
+ end_location_on_protein=3,
+ metadata_group="group1",
+ )
+ ]
+ expected = {"group1": [[peptide_matches[0]]]}
+ assert distribute_to_rows(peptide_matches) == expected
+
+
+def test_distribute_to_rows_non_overlapping():
+ peptide_matches = [
+ PeptideMatch(
+ peptide_sequence="AAA",
+ start_location_on_protein=0,
+ end_location_on_protein=3,
+ metadata_group="group1",
+ ),
+ PeptideMatch(
+ peptide_sequence="BBB",
+ start_location_on_protein=4,
+ end_location_on_protein=7,
+ metadata_group="group1",
+ ),
+ ]
+ expected = {"group1": [[peptide_matches[0], peptide_matches[1]]]}
+ assert distribute_to_rows(peptide_matches) == expected
+
+
+def test_distribute_to_rows_overlapping():
+ peptide_matches = [
+ PeptideMatch(
+ peptide_sequence="AAA",
+ start_location_on_protein=0,
+ end_location_on_protein=3,
+ metadata_group="group1",
+ ),
+ PeptideMatch(
+ peptide_sequence="BBB",
+ start_location_on_protein=2,
+ end_location_on_protein=5,
+ metadata_group="group1",
+ ),
+ ]
+ expected = {"group1": [[peptide_matches[0]], [peptide_matches[1]]]}
+ assert distribute_to_rows(peptide_matches) == expected
+
+
+def test_distribute_to_rows_multiple_groups():
+ peptide_matches = [
+ PeptideMatch(
+ peptide_sequence="AAA",
+ start_location_on_protein=0,
+ end_location_on_protein=3,
+ metadata_group="group1",
+ ),
+ PeptideMatch(
+ peptide_sequence="BBB",
+ start_location_on_protein=4,
+ end_location_on_protein=7,
+ metadata_group="group2",
+ ),
+ ]
+ expected = {"group1": [[peptide_matches[0]]], "group2": [[peptide_matches[1]]]}
+ assert distribute_to_rows(peptide_matches) == expected
+
+
+def test_distribute_to_rows():
+ peptide_matches = [
+ PeptideMatch("PEPTIDE1", 0, 7, 1.0, "group1"),
+ PeptideMatch("PEPTIDE2", 8, 15, 1.0, "group1"),
+ PeptideMatch("PEPTIDE3", 16, 23, 1.0, "group1"),
+ PeptideMatch("PEPTIDE4", 0, 7, 1.0, "group2"),
+ PeptideMatch("PEPTIDE5", 8, 15, 1.0, "group2"),
+ ]
+ rows = distribute_to_rows(peptide_matches)
+ assert len(rows["group1"]) == 1
+ assert len(rows["group2"]) == 1
+
+
+def test_get_max_coverage():
+ coverage = {
+ "group1": [1, 2, 3, 4, 5],
+ "group2": [2, 3, 4, 5, 6],
+ }
+ max_coverage = get_max_coverage(coverage)
+ assert max_coverage == 6
+
+
+def test_get_max_coverage_empty():
+ with pytest.raises(
+ ValueError,
+ match="Cannot calculate maximum coverage value: No coverage data provided.",
+ ):
+ get_max_coverage({})
diff --git a/ui/runs/form_mapping.py b/ui/runs/form_mapping.py
index 7221bba6..4d6fae1c 100644
--- a/ui/runs/form_mapping.py
+++ b/ui/runs/form_mapping.py
@@ -22,6 +22,7 @@
importing.MetadataColumnAssignment: importing_forms.MetadataColumnAssignmentForm,
importing.PeptideImport: importing_forms.PeptideImportForm,
importing.EvidenceImport: importing_forms.EvidenceImportForm,
+ importing.FastaImport: importing_forms.FastaImportForm,
data_preprocessing.FilterProteinsBySamplesMissing: data_preprocessing_forms.FilterProteinsBySamplesMissingForm,
data_preprocessing.FilterByProteinsCount: data_preprocessing_forms.FilterByProteinsCountForm,
data_preprocessing.FilterSamplesByProteinsMissing: data_preprocessing_forms.FilterSamplesByProteinsMissingForm,
@@ -52,6 +53,7 @@
data_analysis.PlotClustergram: data_analysis_forms.PlotClustergramForm,
data_analysis.PlotProtQuant: data_analysis_forms.PlotProtQuantForm,
data_analysis.PlotPrecisionRecallCurve: data_analysis_forms.PlotPrecisionRecallCurveForm,
+ data_analysis.PlotProteinCoverage: data_analysis_forms.PlotProteinCoverageForm,
data_analysis.PlotROC: data_analysis_forms.PlotROCCurveForm,
data_analysis.ClusteringKMeans: data_analysis_forms.ClusteringKMeansForm,
data_analysis.ClusteringExpectationMaximisation: data_analysis_forms.ClusteringExpectationMaximizationForm,
diff --git a/ui/runs/forms/data_analysis.py b/ui/runs/forms/data_analysis.py
index 317fd473..1bee889b 100644
--- a/ui/runs/forms/data_analysis.py
+++ b/ui/runs/forms/data_analysis.py
@@ -1,16 +1,10 @@
-import logging
from enum import Enum, StrEnum
-from protzilla.methods.data_preprocessing import DataPreprocessingStep
from protzilla.methods.data_analysis import (
DataAnalysisStep,
- DifferentialExpressionLinearModel,
- DifferentialExpressionTTest,
DimensionReductionUMAP,
- DataAnalysisStep,
PTMsPerSample,
SelectPeptidesForProtein,
- DifferentialExpressionMannWhitneyOnPTM,
)
from protzilla.methods.data_preprocessing import DataPreprocessingStep
from protzilla.run import Run
@@ -25,7 +19,6 @@
CustomFloatField,
CustomMultipleChoiceField,
CustomNumberField,
- CustomBooleanField,
)
@@ -169,7 +162,11 @@ class DifferentialExpressionANOVAForm(MethodForm):
initial=MultipleTestingCorrectionMethod.benjamini_hochberg,
)
alpha = CustomFloatField(
- label="Error rate (alpha)", min_value=0, max_value=1, step_size=0.01, initial=0.05
+ label="Error rate (alpha)",
+ min_value=0,
+ max_value=1,
+ step_size=0.01,
+ initial=0.05,
)
grouping = CustomChoiceField(choices=[], label="Grouping from metadata")
@@ -178,12 +175,12 @@ class DifferentialExpressionANOVAForm(MethodForm):
)
def fill_form(self, run: Run) -> None:
- self.fields[
- "protein_df"
- ].choices = fill_helper.get_choices_for_protein_df_steps(run)
- self.fields[
- "grouping"
- ].choices = fill_helper.get_choices_for_metadata_non_sample_columns(run)
+ self.fields["protein_df"].choices = (
+ fill_helper.get_choices_for_protein_df_steps(run)
+ )
+ self.fields["grouping"].choices = (
+ fill_helper.get_choices_for_metadata_non_sample_columns(run)
+ )
grouping = self.data.get("grouping", self.fields["grouping"].choices[0][0])
self.fields["selected_groups"].choices = fill_helper.to_choices(
run.steps.metadata_df[grouping].unique()
@@ -219,12 +216,12 @@ class DifferentialExpressionTTestForm(MethodForm):
group2 = CustomChoiceField(choices=[], label="Group 2")
def fill_form(self, run: Run) -> None:
- self.fields[
- "protein_df"
- ].choices = fill_helper.get_choices_for_protein_df_steps(run)
- self.fields[
- "grouping"
- ].choices = fill_helper.get_choices_for_metadata_non_sample_columns(run)
+ self.fields["protein_df"].choices = (
+ fill_helper.get_choices_for_protein_df_steps(run)
+ )
+ self.fields["grouping"].choices = (
+ fill_helper.get_choices_for_metadata_non_sample_columns(run)
+ )
grouping = self.data.get("grouping", self.fields["grouping"].choices[0][0])
@@ -256,16 +253,20 @@ class DifferentialExpressionLinearModelForm(MethodForm):
initial=MultipleTestingCorrectionMethod.benjamini_hochberg,
)
alpha = CustomFloatField(
- label="Error rate (alpha)", min_value=0, max_value=1, step_size=0.01, initial=0.05
+ label="Error rate (alpha)",
+ min_value=0,
+ max_value=1,
+ step_size=0.01,
+ initial=0.05,
)
grouping = CustomChoiceField(choices=[], label="Grouping from metadata")
group1 = CustomChoiceField(choices=[], label="Group 1")
group2 = CustomChoiceField(choices=[], label="Group 2")
def fill_form(self, run: Run) -> None:
- self.fields[
- "grouping"
- ].choices = fill_helper.get_choices_for_metadata_non_sample_columns(run)
+ self.fields["grouping"].choices = (
+ fill_helper.get_choices_for_metadata_non_sample_columns(run)
+ )
grouping = self.data.get("grouping", self.fields["grouping"].choices[0][0])
@@ -293,25 +294,31 @@ def fill_form(self, run: Run) -> None:
class DifferentialExpressionMannWhitneyOnIntensityForm(MethodForm):
is_dynamic = True
- protein_df = CustomChoiceField(
- choices=[], label="Step to use protein data from"
- )
+ protein_df = CustomChoiceField(choices=[], label="Step to use protein data from")
multiple_testing_correction_method = CustomChoiceField(
choices=MultipleTestingCorrectionMethod,
label="Multiple testing correction",
initial=MultipleTestingCorrectionMethod.benjamini_hochberg,
)
alpha = CustomFloatField(
- label="Error rate (alpha)", min_value=0, max_value=1, step_size=0.01, initial=0.05
+ label="Error rate (alpha)",
+ min_value=0,
+ max_value=1,
+ step_size=0.01,
+ initial=0.05,
)
grouping = CustomChoiceField(choices=[], label="Grouping from metadata")
group1 = CustomChoiceField(choices=[], label="Group 1")
group2 = CustomChoiceField(choices=[], label="Group 2")
def fill_form(self, run: Run) -> None:
- self.fields["protein_df"].choices = fill_helper.get_choices_for_protein_df_steps(run)
+ self.fields["protein_df"].choices = (
+ fill_helper.get_choices_for_protein_df_steps(run)
+ )
- self.fields["grouping"].choices = fill_helper.get_choices_for_metadata_non_sample_columns(run)
+ self.fields["grouping"].choices = (
+ fill_helper.get_choices_for_metadata_non_sample_columns(run)
+ )
grouping = self.data.get("grouping", self.fields["grouping"].choices[0][0])
@@ -346,7 +353,11 @@ class DifferentialExpressionMannWhitneyOnPTMForm(MethodForm):
initial=MultipleTestingCorrectionMethod.benjamini_hochberg,
)
alpha = CustomFloatField(
- label="Error rate (alpha)", min_value=0, max_value=1, step_size=0.01, initial=0.05
+ label="Error rate (alpha)",
+ min_value=0,
+ max_value=1,
+ step_size=0.01,
+ initial=0.05,
)
p_value_calculation_method = CustomChoiceField(
choices=PValueCalculationMethod,
@@ -362,7 +373,9 @@ def fill_form(self, run: Run) -> None:
run.steps.get_instance_identifiers(PTMsPerSample, "ptm_df")
)
- self.fields["grouping"].choices = fill_helper.get_choices_for_metadata_non_sample_columns(run)
+ self.fields["grouping"].choices = (
+ fill_helper.get_choices_for_metadata_non_sample_columns(run)
+ )
grouping = self.data.get("grouping", self.fields["grouping"].choices[0][0])
@@ -390,16 +403,18 @@ def fill_form(self, run: Run) -> None:
class DifferentialExpressionKruskalWallisOnIntensityForm(MethodForm):
is_dynamic = True
- protein_df = CustomChoiceField(
- choices=[], label="Step to use protein data from"
- )
+ protein_df = CustomChoiceField(choices=[], label="Step to use protein data from")
multiple_testing_correction_method = CustomChoiceField(
choices=MultipleTestingCorrectionMethod,
label="Multiple testing correction",
initial=MultipleTestingCorrectionMethod.benjamini_hochberg,
)
alpha = CustomFloatField(
- label="Error rate (alpha)", min_value=0, max_value=1, step_size=0.01, initial=0.05
+ label="Error rate (alpha)",
+ min_value=0,
+ max_value=1,
+ step_size=0.01,
+ initial=0.05,
)
grouping = CustomChoiceField(choices=[], label="Grouping from metadata")
@@ -408,9 +423,13 @@ class DifferentialExpressionKruskalWallisOnIntensityForm(MethodForm):
)
def fill_form(self, run: Run) -> None:
- self.fields["protein_df"].choices = fill_helper.get_choices_for_protein_df_steps(run)
+ self.fields["protein_df"].choices = (
+ fill_helper.get_choices_for_protein_df_steps(run)
+ )
- self.fields["grouping"].choices = fill_helper.get_choices_for_metadata_non_sample_columns(run)
+ self.fields["grouping"].choices = (
+ fill_helper.get_choices_for_metadata_non_sample_columns(run)
+ )
grouping = self.data.get("grouping", self.fields["grouping"].choices[0][0])
self.fields["selected_groups"].choices = fill_helper.to_choices(
run.steps.metadata_df[grouping].unique()
@@ -420,16 +439,18 @@ def fill_form(self, run: Run) -> None:
class DifferentialExpressionKruskalWallisOnPTMForm(MethodForm):
is_dynamic = True
- ptm_df = CustomChoiceField(
- choices=[], label="Step to use ptm data from"
- )
+ ptm_df = CustomChoiceField(choices=[], label="Step to use ptm data from")
multiple_testing_correction_method = CustomChoiceField(
choices=MultipleTestingCorrectionMethod,
label="Multiple testing correction",
initial=MultipleTestingCorrectionMethod.benjamini_hochberg,
)
alpha = CustomFloatField(
- label="Error rate (alpha)", min_value=0, max_value=1, step_size=0.01, initial=0.05
+ label="Error rate (alpha)",
+ min_value=0,
+ max_value=1,
+ step_size=0.01,
+ initial=0.05,
)
grouping = CustomChoiceField(choices=[], label="Grouping from metadata")
@@ -441,7 +462,9 @@ def fill_form(self, run: Run) -> None:
self.fields["ptm_df"].choices = fill_helper.to_choices(
run.steps.get_instance_identifiers(PTMsPerSample, "ptm_df")
)
- self.fields["grouping"].choices = fill_helper.get_choices_for_metadata_non_sample_columns(run)
+ self.fields["grouping"].choices = (
+ fill_helper.get_choices_for_metadata_non_sample_columns(run)
+ )
grouping = self.data.get("grouping", self.fields["grouping"].choices[0][0])
self.fields["selected_groups"].choices = fill_helper.to_choices(
run.steps.metadata_df[grouping].unique()
@@ -466,7 +489,8 @@ class PlotVolcanoForm(MethodForm):
def fill_form(self, run: Run) -> None:
self.fields["input_dict"].choices = fill_helper.to_choices(
run.steps.get_instance_identifiers(
- Step, ["corrected_p_values_df", "log2_fold_change_df"],
+ Step,
+ ["corrected_p_values_df", "log2_fold_change_df"],
)
)
@@ -486,7 +510,9 @@ def fill_form(self, run: Run) -> None:
if step_output is not None:
items_of_interest = step_output["PTM"].unique()
- self.fields["items_of_interest"].choices = fill_helper.to_choices(items_of_interest)
+ self.fields["items_of_interest"].choices = fill_helper.to_choices(
+ items_of_interest
+ )
class PlotScatterPlotForm(MethodForm):
@@ -804,7 +830,7 @@ class ClassificationRandomForestForm(MethodForm):
# TODO: Workflow_meta line 1763
train_val_split = CustomNumberField(
label="Choose the size of the validation data set (you can either enter the absolute number of validation "
- "samples or a number between 0.0 and 1.0 to represent the percentage of validation samples)",
+ "samples or a number between 0.0 and 1.0 to represent the percentage of validation samples)",
initial=0.20,
)
# TODO: Workflow_meta line 1770
@@ -892,7 +918,7 @@ class ClassificationSVMForm(MethodForm):
)
train_val_split = CustomNumberField(
label="Choose the size of the validation data set (you can either enter the absolute number of validation "
- "samples or a number between 0.0 and 1.0 to represent the percentage of validation samples)",
+ "samples or a number between 0.0 and 1.0 to represent the percentage of validation samples)",
initial=0.20,
)
# TODO: Workflow_meta line 1973
@@ -1009,7 +1035,7 @@ class DimensionReductionUMAPForm(MethodForm):
)
n_neighbors = CustomNumberField(
label="The size of local neighborhood (in terms of number of neighboring sample points) used for manifold "
- "approximation",
+ "approximation",
min_value=2,
max_value=100,
step_size=1,
@@ -1050,7 +1076,7 @@ class ProteinGraphPeptidesToIsoformForm(MethodForm):
k = CustomNumberField(label="k-mer length", min_value=1, step_size=1, initial=5)
allowed_mismatches = CustomNumberField(
label="Number of allowed mismatched amino acids per peptide. For many allowed mismatches, this can take a "
- "long time.",
+ "long time.",
min_value=0,
step_size=1,
initial=2,
@@ -1064,6 +1090,80 @@ class ProteinGraphVariationGraphForm(MethodForm):
# TODO: workflow_meta line 2291 - 2295
+class PlotProteinCoverageForm(MethodForm):
+ from protzilla.data_analysis.protein_coverage import AggregationMethod
+
+ is_dynamic = True
+ peptide_df_instance = CustomChoiceField(
+ choices=[],
+ label="Step to use peptide data from",
+ )
+ fasta_df_instance = CustomChoiceField(
+ choices=[],
+ label="Step to use fasta protein data from",
+ )
+ protein_id = CustomChoiceField(
+ choices=[],
+ label="Protein ID",
+ )
+ grouping = CustomChoiceField(
+ choices=[],
+ label="Grouping from metadata",
+ )
+ selected_groups = CustomMultipleChoiceField(
+ choices=[],
+ label="Select groups / samples to plot",
+ )
+ aggregation_method = CustomChoiceField(
+ choices=fill_helper.to_choices(AggregationMethod),
+ label="Aggregation method",
+ initial=AggregationMethod.median,
+ )
+
+ def fill_form(self, run: Run) -> None:
+ self.fields["peptide_df_instance"].choices = fill_helper.get_choices(
+ run, "peptide_df"
+ )
+ self.fields["fasta_df_instance"].choices = fill_helper.get_choices(
+ run, "fasta_df", Step
+ )
+ peptide_df_instance_id = self.data.get(
+ "peptide_df_instance", self.fields["peptide_df_instance"].choices[0][0]
+ )
+ fasta_df_instance_id = self.data.get(
+ "fasta_df_instance", self.fields["fasta_df_instance"].choices[0][0]
+ )
+ peptide_df = run.steps.get_step_output(
+ Step, "peptide_df", peptide_df_instance_id
+ )
+ fasta_protein_ids = run.steps.get_step_output(
+ Step, "fasta_df", fasta_df_instance_id
+ )["Protein ID"].unique()
+ peptide_df_protein_ids = peptide_df["Protein ID"].unique()
+ common_protein_ids = sorted(
+ list(set(fasta_protein_ids) & set(peptide_df_protein_ids))
+ )
+ self.fields["protein_id"].choices = fill_helper.to_choices(common_protein_ids)
+
+ # Grouping
+ self.fields["grouping"].choices = (
+ fill_helper.get_choices_for_metadata_non_sample_columns(run)
+ + [("Sample", "Sample")]
+ )
+
+ grouping = self.data.get("grouping", self.fields["grouping"].choices[0][0])
+ if grouping == "Sample":
+ self.fields["selected_groups"].choices = fill_helper.to_choices(
+ peptide_df["Sample"].unique()
+ )
+ else:
+ self.fields["selected_groups"].choices = fill_helper.to_choices(
+ run.steps.metadata_df[grouping].unique()
+ )
+ # if grouping is not Sample, show the aggregation method option
+ self.toggle_visibility("aggregation_method", grouping != "Sample")
+
+
class FLEXIQuantLFForm(MethodForm):
is_dynamic = True
@@ -1143,13 +1243,17 @@ def fill_form(self, run: Run) -> None:
selected_auto_select = self.data.get("auto_select")
- choices = fill_helper.to_choices([] if selected_auto_select else ["all proteins"])
- choices.extend(fill_helper.get_choices(
- run, "significant_proteins_df", DataAnalysisStep
- ))
+ choices = fill_helper.to_choices(
+ [] if selected_auto_select else ["all proteins"]
+ )
+ choices.extend(
+ fill_helper.get_choices(run, "significant_proteins_df", DataAnalysisStep)
+ )
self.fields["protein_list"].choices = choices
- chosen_list = self.data.get("protein_list", self.fields["protein_list"].choices[0][0])
+ chosen_list = self.data.get(
+ "protein_list", self.fields["protein_list"].choices[0][0]
+ )
if not selected_auto_select:
self.toggle_visibility("sort_proteins", True)
self.toggle_visibility("protein_ids", True)
@@ -1163,7 +1267,9 @@ def fill_form(self, run: Run) -> None:
self.fields["protein_ids"].choices = fill_helper.to_choices(
run.steps.get_step_output(
DataAnalysisStep, "significant_proteins_df", chosen_list
- ).sort_values(by="corrected_p_value")["Protein ID"].unique()
+ )
+ .sort_values(by="corrected_p_value")["Protein ID"]
+ .unique()
)
else:
self.fields["protein_ids"].choices = fill_helper.to_choices(
@@ -1183,9 +1289,7 @@ class PTMsPerSampleForm(MethodForm):
)
def fill_form(self, run: Run) -> None:
- self.fields["peptide_df"].choices = fill_helper.get_choices(
- run, "peptide_df"
- )
+ self.fields["peptide_df"].choices = fill_helper.get_choices(run, "peptide_df")
single_protein_peptides = run.steps.get_instance_identifiers(
SelectPeptidesForProtein, "peptide_df"
@@ -1207,4 +1311,4 @@ def fill_form(self, run: Run) -> None:
SelectPeptidesForProtein, "peptide_df"
)
if single_protein_peptides:
- self.fields["peptide_df"].initial = single_protein_peptides[0]
\ No newline at end of file
+ self.fields["peptide_df"].initial = single_protein_peptides[0]
diff --git a/ui/runs/forms/fill_helper.py b/ui/runs/forms/fill_helper.py
index eef83be6..d4f86960 100644
--- a/ui/runs/forms/fill_helper.py
+++ b/ui/runs/forms/fill_helper.py
@@ -1,5 +1,6 @@
from protzilla.run import Run
from protzilla.steps import Step
+from protzilla.constants.protzilla_logging import logger
def to_choices(choices: list[str], required: bool = True) -> list[tuple[str, str]]:
@@ -29,6 +30,9 @@ def get_choices(
def get_choices_for_metadata_non_sample_columns(run: Run) -> list[tuple[str, str]]:
+ if run.steps.metadata_df is None:
+ logger.warning("No metadata_df found in run")
+ return []
return to_choices(
run.steps.metadata_df.columns[
run.steps.metadata_df.columns != "Sample"
diff --git a/ui/runs/forms/importing.py b/ui/runs/forms/importing.py
index 8975e3ba..600ea517 100644
--- a/ui/runs/forms/importing.py
+++ b/ui/runs/forms/importing.py
@@ -192,3 +192,7 @@ def fill_form(self, run: Run) -> None:
self.fields["map_to_uniprot"].initial = run.steps.get_step_input(
[MaxQuantImport, MsFraggerImport, DiannImport], "map_to_uniprot"
)
+
+
+class FastaImportForm(MethodForm):
+ file_path = CustomFileField(label="Fasta file")