-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathvalidate_input.py
107 lines (91 loc) · 5.87 KB
/
validate_input.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import os
import csv
import yaml
import argparse
from glob import glob
import itertools
def read_sample_sheet(path):
fp = open(path, "r")
data = csv.reader(fp)
rows = [row for row in data if ''.join(row).strip()]
header = rows[0]; rows = rows[1:]
sample_sheet = [dict(zip(header, row)) for row in rows]
return sample_sheet
def read_config_file(path):
with open(path, 'rt') as infile:
config = yaml.load(infile)
return config
def validate_memory_restrictions(config):
for rule in config['execution']['rules']:
if 'memory' in config['execution']['rules'][rule]:
value = config['execution']['rules'][rule]['memory']
if not type(value) == int and not value.isdigit():
raise Exception("ERROR: memory limits must be expressed as a plain number of megabytes. Got '{}' in '{}'.".format(value, rule))
def validate_config(config):
# Check that all locations exist
for loc in config['locations']:
if (not loc == 'output-dir') and (not (os.path.isdir(config['locations'][loc]) or os.path.isfile(config['locations'][loc]))):
raise Exception("ERROR: The following necessary directory/file does not exist: '{}' ({})".format(config['locations'][loc], loc))
if not loc == 'output-dir':
if config['locations'][loc].endswith(".gz") or config['locations'][loc].endswith(".bz2") or config['locations'][loc].endswith(".xz"):
raise Exception("ERROR: The {} file '{}' is referenced in its compressed form like it was downloaded. However, the tools of this workflow expects the reference as plain FASTA/GTF files that are directly readable. Please unpack these files and update the settings to the respective new filename. This does _not_ hold for the FASTQ.gz files of the samples which shall remain compressed.".format(loc,config['locations'][loc]))
validate_memory_restrictions(config)
sample_sheet = read_sample_sheet(config['locations']['sample-sheet'])
# Check if the required fields are found in the sample sheet
required_fields = ['name', 'reads', 'reads2', 'sample_type']
if 'DEanalyses' in config:
# also add the list of covariates from the DE analyses if available
covariates = [config['DEanalyses'][x]['covariates'] for x in config['DEanalyses'].keys() if 'covariates' in config['DEanalyses'][x]]
# cleanup and get the set of unique covariates
covariates = [y.strip() for y in itertools.chain(*[x.split(',') for x in covariates])]
# remove empty strings
covariates = [x for x in covariates if x]
required_fields = required_fields + covariates
required_fields = set(required_fields)
not_found = required_fields.difference(set(sample_sheet[0].keys()))
if len(not_found) > 0:
raise Exception("ERROR: Required field(s) {} could not be found in the sample sheet file '{}'".format(not_found, config['locations']['sample-sheet']))
# Check that requested analyses make sense
if 'DEanalyses' in config:
for analysis in config['DEanalyses']:
for group in config['DEanalyses'][analysis]['case_sample_groups'] .split(',') + config['DEanalyses'][analysis]['control_sample_groups'].split(','):
group = group.strip() #remove any leading/trailing whitespaces in the sample group names
if not any(row['sample_type'] == group for row in sample_sheet):
raise Exception("ERROR: no samples in sample sheet have sample type '{}', specified in analysis {}.".format(group, analysis))
# Check that reads files exist; sample names are unique to each row;
samples = {}
row_index = 1
for row in sample_sheet:
sample = row['name']
if sample in samples:
raise Exception('ERROR: name "{}" is not unique. Replace it with a unique name in the sample_sheet.'.format(sample))
else:
samples[sample] = 1
filenames = [row['reads'], row['reads2']] if row['reads2'] else [row['reads']]
# don't allow paths in the sample sheet, only allow the base name of the file
if sum([os.path.basename(x) != x for x in filenames]) > 0:
raise Exception(" ".join(["ERROR: read file names in the sample sheet must be basenames",
"not paths to the files. See sample ", sample, "in the sample sheet"]))
for filename in filenames:
fullpath = os.path.join(config['locations']['reads-dir'], filename)
if not os.path.isfile(fullpath):
filenameFlankedWithWhitespace = filename.startswith(' ') or filename.endswith(' ') or filename.startswith('\t') or filename.endswith('\t')
if filenameFlankedWithWhitespace:
raise Exception("ERROR: missing reads file: '{}', likely caused by blanks flanking the filename, please correct.".format(fullpath))
else:
raise Exception("ERROR: missing reads file: '{}'".format(fullpath))
# check if some of the columns contain missing information
fields = ['name', 'reads', 'sample_type'] # fields for which missing info is not allowed
missing = [f for f in fields if not row[f].strip()]
if len(missing) > 0:
raise Exception("".join(["ERROR: Missing information in sample sheet at row #",str(row_index),
". Missing info is not allowed for name/reads/sample_type fields"]))
row_index = row_index + 1
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config-file', required=True, help='Path of configuration file [settings.yaml]')
parser.add_argument('-s', '--sample-sheet-file', required=True, help='Path of sample sheet [sample_sheet.csv]')
args = parser.parse_args()
config = read_config_file(args.config_file)
config['locations']['sample-sheet'] = args.sample_sheet_file
validate_config(config)