-
Notifications
You must be signed in to change notification settings - Fork 25
Add a fragment count column #407
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -64,8 +64,7 @@ | |
|
|
||
| def _filter_and_rename_columns(df, column_map): | ||
| """Filter, apply a scaling factor and rename columns based on provided specifications.""" | ||
| selected_columns = [] | ||
| rename_map = {} | ||
| result = pd.DataFrame(index=df.index) | ||
|
|
||
| for col in column_map: | ||
| names = col.get("name", []) | ||
|
|
@@ -77,16 +76,12 @@ def _filter_and_rename_columns(df, column_map): | |
|
|
||
| for name in names: | ||
| if name in df.columns: | ||
| selected_columns.append(name) | ||
| rename_map[name] = slug | ||
| # Some column values are given a specific format (e.g. millions) | ||
| # and need to be scaled on a common scale | ||
| df[name] = df[name] * scaling_factor.get(name, 1) | ||
| result[slug] = df[name] * scaling_factor.get(name, 1) | ||
| break | ||
|
|
||
| df = df[selected_columns].rename(columns=rename_map) | ||
|
|
||
| return df | ||
| return result | ||
|
|
||
|
|
||
| def _aggregate_df(df, column_map, prefix=None): | ||
|
|
@@ -119,7 +114,22 @@ def _clean_sample_name( | |
| return sfx_pattern.sub("", filename) | ||
|
|
||
|
|
||
| def general_multiqc_parser(file_object, name, column_map): | ||
| def _is_paired_end(sample) -> Optional[bool]: | ||
| reads = list(sample.data.filter(type="data:reads:fastq:")) | ||
| origin_read_obj = reads[-1] if reads else None | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Medium]
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The ordering is determined by the backend, so this is not entirely applicable. |
||
| if origin_read_obj is None: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Medium] |
||
| return None | ||
| if origin_read_obj._process._type.endswith("paired:"): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [High] Strict Match the library token as a path segment instead of a strict suffix (e.g.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an insightful comment, as the code was written with assumptions of the user running the General RNA-seq pipeline. Will modify to make it more general. |
||
| return True | ||
| elif origin_read_obj._process._type.endswith("single:"): | ||
| return False | ||
| else: | ||
| raise ValueError( | ||
| f"Cannot determine the library strategy of sample: {sample.name}" | ||
| ) | ||
|
|
||
|
|
||
| def general_multiqc_parser(file_object, name, column_map, sample): | ||
| """General parser for MultiQC files. | ||
|
|
||
| If the column_map argument is not specified the function will retain the original column names | ||
|
|
@@ -138,6 +148,34 @@ def general_multiqc_parser(file_object, name, column_map): | |
| if df.empty: | ||
| return pd.Series(name=name) | ||
|
|
||
| # Sum the read count per lane separately for each read mate | ||
| # and insert the resulting value into the first column for the corresponding mate. | ||
| # This is is done in order to report the actual read pairs for samples | ||
| # sequenced in paired-end mode, while preserving the sum across lanes. | ||
| fragment_metrics = [ | ||
| "estimated_fragment_count_raw", | ||
| "estimated_fragment_count_trimmed", | ||
| ] | ||
| fragment_columns = df.columns.intersection(fragment_metrics) | ||
|
|
||
| if _is_paired_end(sample): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Medium] if not fragment_columns.empty:
paired = _is_paired_end(sample)
... # the whole paired/single block, only here |
||
| for col in fragment_columns: | ||
| for s in ["_1", "_2"]: | ||
| mate_mask = df.index.str.endswith(s) & df[col].notna() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [High] The (1) (2)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will test further |
||
| if not mate_mask.any(): | ||
| continue | ||
| total = df.loc[mate_mask, col].sum(min_count=1) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Medium] "Mean of the two mate totals" is an estimator that degrades silently. When only one mate's FastQC row is present, the other mate's mask is empty and |
||
| first_row = df.index[mate_mask][0] | ||
| df.loc[mate_mask, col] = pd.NA | ||
| df.loc[first_row, col] = total | ||
| else: | ||
| for col in fragment_columns: | ||
| first_row = df[col].first_valid_index() | ||
| total = df[col].sum(min_count=1) | ||
| df[col] = pd.NA | ||
| if first_row is not None: | ||
| df.loc[first_row, col] = total | ||
|
|
||
| series = _aggregate_df(df=df, column_map=column_map) | ||
|
|
||
| series.name = name | ||
|
|
@@ -453,27 +491,25 @@ def __init__( | |
|
|
||
| def _parse_file(self, file_obj, sample_id, data_type): | ||
| """Parse file object and return one DataFrame line.""" | ||
| if data_type not in self.DATA_TYPES: | ||
| raise ValueError(f"Unknown data type: {data_type}") | ||
|
|
||
| sample = next((s for s in self._samples if s.id == sample_id), None) | ||
|
|
||
| if data_type == self.MACS_PREPEAK: | ||
| case_sample = next( | ||
| (sample for sample in self._samples if sample.id == sample_id), None | ||
| ) | ||
| sample_name = _clean_sample_name(case_sample.name) | ||
| return self.DATA_TYPES[data_type]["parser"]( | ||
| file_object=file_obj, | ||
| name=sample_id, | ||
| column_map=self.DATA_TYPES[data_type]["column_map"], | ||
| sample_name=sample_name, | ||
| sample_name=_clean_sample_name(sample.name), | ||
| ) | ||
| else: | ||
| if data_type in self.DATA_TYPES: | ||
| parser_func = self.DATA_TYPES[data_type]["parser"] | ||
| return parser_func( | ||
| file_object=file_obj, | ||
| name=sample_id, | ||
| column_map=self.DATA_TYPES[data_type]["column_map"], | ||
| ) | ||
| else: | ||
| raise ValueError(f"Unknown data type: {data_type}") | ||
|
|
||
| return self.DATA_TYPES[data_type]["parser"]( | ||
| file_object=file_obj, | ||
| name=sample_id, | ||
| column_map=self.DATA_TYPES[data_type]["column_map"], | ||
| sample=sample, | ||
| ) | ||
|
|
||
| def _get_data_uri(self, data: Data, data_type: str) -> str: | ||
| """Get the file path based on data type.""" | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[High] Blocking, synchronous network call inside the asyncio download loop.
sample.data.filter(...)materialized withlist()runsResolweQuery._fetch → self.api.get(...)— a blocking slumber/requestsGET. It executes insidegeneral_multiqc_parser → _parse_file, which is called from theasync def _download_filecoroutine gathered byasyncio.gather(base.py), with no executor /to_thread. So it stalls the event loop and serializes the very downloads the chunked async path exists to parallelize, adding a round-trip per parsed file. It is also uncached (.filter()clones with an empty cache; no memoization on_is_paired_end).Suggest resolving library strategy once per sample off the event loop (e.g. precompute a
{sample_id: Optional[bool]}map) and passing the boolean into the parser.