Skip to content

Commit 0579112

Browse files
authored
Merge pull request #5 from cellgeni/metadata_table
new usage - metadata table and warnings
2 parents 8776d1f + bd627cb commit 0579112

10 files changed

+878
-357
lines changed

README.md

+63-27
Original file line numberDiff line numberDiff line change
@@ -2,45 +2,81 @@
22
This Nextflow pipeline pulls samples from iRODS and converts them to FASTQ files.
33

44
## Contents of Repo
5-
* `main.nf` - the Nextflow pipeline that pulls the samples and converts them.
6-
* `nextflow.config` - the configuration script that controls the cluster scheduler, process and container.
7-
* `examples/samples.csv` - an example samples.csv file, contains one colum with sample names (header 'sample' is required).
5+
* `main.nf` - the Nextflow pipeline that runs all workflows
6+
* `modules/metatable.nf` - a collection of processes that help getting `IRODS` metadata for samples listed in `--findmeta <samples.csv>` file
7+
* `modules/getfiles.nf` - a collection of processes that help loading the data (`.cram` or `.bam` files) from IRODS and converting them to `.fastq.gz` files
8+
* `modules/upload2ftp.nf` - a collection of processes that help uploading a list of `.fastq.gz` files to FTP server (specified in `nextflow.config`)
9+
* `nextflow.config` - the configuration script that controls the cluster scheduler, process and container
10+
* `bin/parser.py` - script that parses metadata from `imeta ls` output and saves it in `.json` format
11+
* `bin/combine_meta.py` - script that combines all metadata in `.json` format and saves it to `.tsv` file
12+
* `examples/samples.csv` - an example samples.csv file, contains one colum with sample names (no header is required)
813
* `examples/run.sh` - an example run script that executes the pipeline.
914

1015
## Pipeline Arguments
11-
* `--meta`: A metadata CSV with the sample IDs, and possibly other iRODS parameters. Header relevant, as specifies iRODS metadata field names (required)
12-
* `--publish_dir`: Path to put the output filess of the pipeline. (default `'results'`)
13-
* `--type`: Other potential arguments, though not mandatory. If ATAC, set this to 'ATAC' (default `null`)
16+
* `--findmeta`: specify a .csv file with sample names to run a metadata search
17+
* `--cram2fastq`: if specified the script runs conversion of cram files that are found on `findmeta` step
18+
* `--meta`: this argument spicifies the .tsv with cram files (potentially from `findmeta` step) to run cram2fastq conversion
19+
* `--publish_dir`: path to put the output filess of the pipeline. (default `'results'`)
1420
* `--index_format`: index-format formula for samtools, only if you really know what you're doing (default `"i*i*"`)
15-
* `--publish_fastqs`: Whether to publish fastqs - other workflows using this may not want to (default `true`)
16-
* `--find_crams_only`: For advanced CRAM list manipulation. Only return the found SAMPLE,CRAM list, e.g. for manual curation/manipulation (default `false`)
17-
* `--cram_list`: Accept SAMPLE,CRAM list on input (default `null`)
18-
* `--merge`: Concatenate FASTQ files for samples with multiple lanes or sample numbers. Only one R1, R2 (and optionally R3) will be generated per each sample. (default `false`)
19-
* `--ftp_upload`: Upload the resulting files to an FTP (default `false`).
20-
* Use in combination with `--ftp_credenials`, `--ftp_host` and `--ftp_path`.
21+
* `--toftp`: upload the resulting files to the ArrayExpress FTP server (default `false`).
22+
* Use in combination with `--ftp_credenials`, `--ftp_host` and `--ftp_path`
23+
* `--fastqfiles`: this argument spicifies the .fastq.gz files (potentially from `cram2fastq` step) to upload them to the ArrayExpress ftp server
2124

25+
## Examples of use
26+
1. Run a metadata search for a specified list of samples:
27+
```shell
28+
nextflow run main.nf --findmeta ./examples/samples.csv
29+
```
30+
31+
2. Download cram files (specified in metadata.tsv) from IRODS and convert them to fastq
32+
```shell
33+
nextflow run main.nf --cram2fastq --meta metadata/metadata.tsv
34+
```
35+
36+
3. Upload fastq files to ftp server (you to set up the ftp server in nextflow.config):
37+
```shell
38+
nextflow run main.nf --toftp --fastqfiles ./results/
39+
```
40+
41+
4. Combine several steps to run them together
42+
```shell
43+
nextflow run main.nf --findmeta ./examples/samples.csv --cram2fastq --toftp
44+
```
2245

2346
## Graph
2447
```mermaid
2548
---
2649
title: Nextflow pipeline for retrieving CRAM files stored in IRODS and convert them to FASTQ
2750
---
28-
flowchart LR
29-
subgraph "find and pull CRAM files"
51+
flowchart TB
52+
subgraph findmeta["Find CRAM metadata"]
53+
direction LR
3054
v0([findCrams])
31-
v1([combineCramLists])
32-
v2([downloadCram])
33-
v3([renameCram])
55+
v1([getMetadata])
56+
v2([parseMetadata])
57+
v3([combineMetadata])
58+
end
59+
subgraph downloadcrams["Covert CRAMS --> FASTQ"]
60+
direction LR
61+
v4([downloadCram])
62+
v5([cramToFastq])
63+
v6([calculateReadLength])
64+
v7([checkATAC])
65+
v8([renameATAC])
66+
v9([saveMetaToJson])
67+
v10([updateMetadata])
3468
end
35-
v4(((cramToFastq)))
36-
subgraph "optional steps"
37-
v5([merge])
38-
v6([uploadFTP])
69+
subgraph uploadtoftp["Upload data to FTP"]
70+
direction LR
71+
v11([concatFastqs])
72+
v12([uploadFTP])
3973
end
40-
v0 --> v1
41-
v1 --> v2
42-
v2 --> v3
43-
v3 --> v4
44-
v4 --> v5
45-
v5 --> v6
74+
v0 --> v1 --> v2 --> v3
75+
v4 --> v5 --> v6 --> v7{10X ATAC}
76+
v11 --> v12
77+
v7 --YES--> v8
78+
v8 --> v9
79+
v7 --NO--> v9
80+
v9 --> v10
81+
findmeta -.-> downloadcrams -.-> uploadtoftp
4682
```

bin/combine_meta.py

+250
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
#!/usr/bin/env python
2+
3+
import os
4+
import sys
5+
import json
6+
import csv
7+
from typing import Set, List, Dict, Any
8+
from collections import defaultdict
9+
import logging
10+
import argparse
11+
12+
13+
PARSER = argparse.ArgumentParser(
14+
description="Reads metadata from a set of .json files and combines everything to a .tsv file"
15+
)
16+
PARSER.add_argument(
17+
"dir",
18+
metavar="dir",
19+
type=str,
20+
help="specify a path to the directory with a set of .json files you want to combine",
21+
)
22+
PARSER.add_argument(
23+
"-a",
24+
"--validate_all",
25+
help="if specified runs all validation steps, if not runs library type validation only",
26+
action="store_true",
27+
)
28+
29+
30+
class ColoredFormatter(logging.Formatter):
31+
blue = "\n\033[94m"
32+
yellow = "\033[93m"
33+
reset = "\033[0m"
34+
format = "%(levelname)s: %(message)s"
35+
36+
FORMATS = {
37+
logging.INFO: blue + format + reset,
38+
logging.WARNING: yellow + format + reset,
39+
}
40+
41+
def format(self, record):
42+
log_fmt = self.FORMATS.get(record.levelno)
43+
formatter = logging.Formatter(log_fmt)
44+
return formatter.format(record)
45+
46+
47+
def setup_logging() -> None:
48+
"""
49+
Setup logging configuration of the script
50+
"""
51+
# a basic config to save logs to metadata.log
52+
logging.basicConfig(
53+
level=logging.INFO,
54+
format="%(levelname)s: %(message)s",
55+
filename="metadata.log",
56+
filemode="w",
57+
)
58+
59+
# define a Handler which writes INFO messages or higher to the sys.stderr
60+
console = logging.StreamHandler(sys.stdout)
61+
console.setLevel(logging.INFO)
62+
# tell the handler to use colored format
63+
console.setFormatter(ColoredFormatter())
64+
# add the handler to the root logger
65+
logging.getLogger("").addHandler(console)
66+
67+
68+
def get_sampleindex(meta_list: List[Dict[str, Any]]) -> Dict[str, int]:
69+
"""
70+
Get an indexes of all unique samples
71+
meta_list (List[Dict[str, Any]]): a list containing metadata for all samples
72+
return (Dict[str, int]): indexes of all unique samples in the list
73+
"""
74+
sample_index = defaultdict(list)
75+
for i, sample_meta in enumerate(meta_list):
76+
sample_index[sample_meta["sample"]].append(i)
77+
return sample_index
78+
79+
80+
def validate_filenames(meta_list: List[Dict[str, Any]]) -> None:
81+
"""
82+
Check if there are duplicated filenames of fastq files in the metadata list
83+
meta_list (List[Dict[str, Any]]): a list containing metadata for all samples
84+
"""
85+
# get duplicated filenames
86+
filenames = [meta["fastq_prefix"] for meta in meta_list]
87+
duplicated_filenames = {name for name in filenames if filenames.count(name) > 1}
88+
# raise a warning
89+
if len(duplicated_filenames) != 0:
90+
message = "There are duplicated filenames:\n" + "\n".join(duplicated_filenames)
91+
logging.warning(message)
92+
93+
94+
def raise_sample_warning(sample: str, warning_messages: List[str]) -> None:
95+
"""
96+
Checks if there were already any warning for a sample and prints a HEADER if there was none
97+
sample (str): a sample for which the warnings are raised
98+
warning_messages (List[str]): a list of warning for particular sample
99+
"""
100+
if warning_messages:
101+
# print a header with sample name
102+
logging.info(f"Sample {sample}:")
103+
# raise all warnings
104+
for message in warning_messages:
105+
logging.warning(message)
106+
107+
108+
def validate_consistency(
109+
sample: str,
110+
meta_list: List[Dict[str, Any]],
111+
column: str,
112+
warning_messages: List[str],
113+
) -> None:
114+
"""
115+
Check if there are multiple values in `column`
116+
sample (str): sample name
117+
meta_list (List[Dict[str, Any]]): a list containing metadata for all files of a particular sample
118+
column (str): a column of interest in `meta_list`
119+
warning_messages: (List[str]): a list of warning messages from previous validation steps
120+
"""
121+
# get unique values
122+
unique_values = {meta.get(column, "NaN") for meta in meta_list}
123+
if len(unique_values) > 1:
124+
# make a warning message
125+
warning_message = (
126+
f"There are multiple values of {column} available:"
127+
+ ",".join(unique_values)
128+
)
129+
# save a warning message to the list
130+
warning_messages.append(warning_message)
131+
132+
133+
def validate_readcounts(
134+
sample: str, meta_list: List[Dict[str, Any]], warning_messages: List[str]
135+
) -> None:
136+
"""
137+
Check if there IRODS total_counts equals samtools output
138+
sample (str): sample name
139+
meta_list (List[Dict[str, Any]]): a list containing metadata for all files of a particular sample
140+
warning_messages: (List[str]): a list of warning messages from previous validation steps
141+
"""
142+
# get samples with inconsistent total number of reads
143+
warning_list = [
144+
cram_meta["cram_path"]
145+
for cram_meta in meta_list
146+
if cram_meta["total_reads"] != cram_meta["num_reads_processed"]
147+
]
148+
if warning_list:
149+
# make a warning message
150+
warning_message = (
151+
"IRODS total_count != num_reads_processed for files:"
152+
+ ",".join(warning_list)
153+
)
154+
# save a warning message to the list
155+
warning_messages.append(warning_message)
156+
157+
158+
def validate_atac(
159+
sample: str, meta_list: List[Dict[str, Any]], warning_messages: List[str]
160+
) -> None:
161+
"""
162+
Check if there IRODS total_counts equals samtools output
163+
sample (str): sample name
164+
meta_list (List[Dict[str, Any]]): a list containing metadata for all files of a particular sample
165+
warning_messages: (List[str]): a list of warning messages from previous validation steps
166+
"""
167+
warning_list = [
168+
cram_meta["cram_path"]
169+
for cram_meta in meta_list
170+
if "atac" in cram_meta["library_type"].lower() and cram_meta["i2len"] == "24"
171+
]
172+
if warning_list:
173+
# make a warning message
174+
warning_title = f"The following files are suspected to be 10X ATAC. They were renamed according to CellRanger naming convention :"
175+
warning_message = warning_title + ",".join(warning_list)
176+
# save a warning message to the list
177+
warning_messages.append(warning_message)
178+
179+
180+
def validate_metalist(meta_list: List[Dict[str, Any]], validate_all: bool) -> None:
181+
"""
182+
Validates metadata values in a list of columns
183+
meta_list (List[Dict[str, Any]]): a list containing metadata for all files of a particular sample
184+
warning_messages (List[str]): a list of warning for particular sample
185+
"""
186+
# get sample indexes
187+
sample_index = get_sampleindex(meta_list)
188+
189+
# check if there are dulicated filenames
190+
validate_filenames(meta_list)
191+
192+
# validate cram files for each sample
193+
for sample, indexes in sample_index.items():
194+
warning_messages = list()
195+
# subsample metadata list
196+
subsample_metalist = [meta_list[idx] for idx in indexes]
197+
# validate metadata
198+
validate_consistency(
199+
sample, subsample_metalist, "library_type", warning_messages
200+
)
201+
if validate_all:
202+
validate_readcounts(sample, subsample_metalist, warning_messages)
203+
validate_consistency(sample, subsample_metalist, "r1len", warning_messages)
204+
validate_consistency(sample, subsample_metalist, "r2len", warning_messages)
205+
validate_atac(sample, subsample_metalist, warning_messages)
206+
# raise all warning messages
207+
raise_sample_warning(sample, warning_messages)
208+
209+
210+
def main() -> None:
211+
# set up logging
212+
setup_logging()
213+
214+
# parse arguments
215+
args = PARSER.parse_args()
216+
217+
# read positional argument with filedir path
218+
dirpath = args.dir.rstrip("/")
219+
220+
# read all json files to meta_list
221+
meta_list = list()
222+
223+
for filename in os.listdir(dirpath):
224+
with open(f"{dirpath}/{filename}", "r") as file:
225+
# reading the json file
226+
sample_meta = json.load(file)
227+
meta_list.append(sample_meta)
228+
229+
# save the field names
230+
fieldnames = sample_meta.keys()
231+
232+
# sort the the data by sample name
233+
meta_list = sorted(meta_list, key=lambda x: x["sample"])
234+
235+
# validate metadata
236+
validate_metalist(meta_list, args.validate_all)
237+
238+
# write all metadata to csv
239+
with open("metadata.tsv", mode="w") as csv_file:
240+
# create writer object
241+
writer = csv.DictWriter(csv_file, fieldnames=fieldnames, delimiter="\t")
242+
243+
# write the data
244+
writer.writeheader()
245+
for sample_meta in meta_list:
246+
writer.writerow(sample_meta)
247+
248+
249+
if __name__ == "__main__":
250+
main()

0 commit comments

Comments
 (0)