Skip to content

Commit 41a0276

Browse files
authored
Merge pull request #74 from DataBiosphere/hotfix-assoc-agg
Hotfix assoc agg
2 parents 89d560d + 63f1df6 commit 41a0276

File tree

10 files changed

+108
-35
lines changed

10 files changed

+108
-35
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,5 @@ assoc-aggregate/cromwell-68.1.jar
1212
*.integer
1313
*.zip
1414
cwl.output.json
15-
job.json
15+
job.json
16+
_test-data-and-truths_/assoc/big_silly_file_chr1.gds

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ This project is a Workflow Description Language (WDL) implementation of several
1313
* Documentation of inputs, how each workflow works, and WDL-specific workarounds
1414

1515
## Usage
16-
These workflows are tested on both Terra and the local Cromwell execution engine. Example files are provided in `test-data-and-truths` and in `gs://topmed_workflow_testing/UWGAC_WDL/`.
16+
These workflows are tested on both Terra and the local Cromwell execution engine. Example files are provided in `test-data-and-truths` and in `gs://topmed_workflow_testing/UWGAC_WDL/`.
1717

18-
Essentially all workflows which take in chromosome-level files share filename requirements. For these files, the chromosome must be included in the filename with the format `chr##` where `##` is the name of the chromosome (1-24 or X, Y). Chromosome can be included at any part of the filename provided they follow this format. For instance, data_subset_chr1.gds, data_chr1_subset.gds, and chr1_data_subset.gds are all valid names, while data_chromosome1_subset.gds and data_subset_c1.gds are not valid.
18+
Essentially all workflows which take in chromosome-level files share filename requirements. For these files, the chromosome must be included in the filename with the format `chr##` where `##` is the name of the chromosome (1-24 or X, Y). Chromosome can be included at any part of the filename provided they follow this format. For instance, data_subset_chr1.gds, data_chr1_subset.gds, and chr1_data_subset.gds are all valid names, while data_chromosome1_subset.gds and data_subset_c1.gds are not valid. Note that the association aggregate, LD prune, and null model workflows additionally require that you have greater than one input GDS file (ie, input at least chr1 and chr2).
1919

2020
The original CWL pipelines had arguments relating to runtime such as `ncores` and `cluster_type` that do not apply to WDL. Please familiarize yourself with the [runtime attributes of WDL](https://cromwell.readthedocs.io/en/stable/RuntimeAttributes/) if you are unsure how your settings may transfer. For more information on specific runtime attributes for specific tasks, see [the further reading section](https://github.com/DataBiosphere/analysis_pipeline_WDL/main/README.md#further-reading).
2121

assoc-aggregate/assoc-aggregate.wdl

Lines changed: 57 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,20 @@ task wdl_validate_inputs {
1616
String? genome_build
1717
String? aggregate_type
1818
String? test
19+
Int? num_gds_files
1920

2021
# no runtime attr because this is a trivial task that does not scale
2122
}
2223

2324
command <<<
2425
set -eux -o pipefail
26+
27+
if [[ ~{num_gds_files} = 1 ]]
28+
then
29+
echo "Invalid input - you need to put it at least two GDS files (preferably consecutive ones, like chr1 and chr2)"
30+
exit 1
31+
fi
32+
2533
#acceptable genome builds: ("hg38" "hg19")
2634
#acceptable aggreg types: ("allele" "position")
2735
acceptable_test_values=("burden" "skat" "smmat" "fastskat" "skato")
@@ -97,6 +105,10 @@ task sbg_gds_renamer {
97105
input {
98106
File in_variant
99107

108+
# this is ignored by the script itself, but including this stops this task from firing
109+
# before wdl_validate_inputs finishes
110+
String? noop
111+
100112
Boolean debug = false
101113

102114
# runtime attributes, which you shouldn't need to adjust as this is a very light task
@@ -404,7 +416,7 @@ task sbg_prepare_segments_1 {
404416
# Although the format of the outputs are different from the CWL, the actual contents of each
405417
# component (gds, segment number, and agg file) should match the CWL perfectly (barring compute
406418
# platform differences, etc). The format is a zip file containing each segment's components in
407-
# order to work around WDL's limitations. Essentially, CWL easily scatters on the dot-product
419+
# order to work around WDL's limitations. Essentially, CWL easily scatters on the dot-product of
408420
# multiple arrays, but trying to that in WDL is painful. See cwl-vs-wdl-dev.md for more info.
409421
410422
input {
@@ -414,14 +426,14 @@ task sbg_prepare_segments_1 {
414426
Array[File]? variant_include_files
415427

416428
# runtime attr
417-
Int addldisk = 10
418-
Int cpu = 2
419-
Int memory = 4
420-
Int preempt = 2
429+
Int addldisk = 100
430+
Int cpu = 12
431+
Int memory = 16
432+
Int preempt = 0
421433
}
422434

423435
# estimate disk size required
424-
Int gds_size = 2 * ceil(size(input_gds_files, "GB"))
436+
Int gds_size = 5 * ceil(size(input_gds_files, "GB"))
425437
Int seg_size = 2 * ceil(size(segments_file, "GB"))
426438
Int agg_size = 2 * ceil(size(aggregate_files, "GB"))
427439
Int dsk_size = gds_size + seg_size + agg_size + addldisk
@@ -463,6 +475,7 @@ task sbg_prepare_segments_1 {
463475
from zipfile import ZipFile
464476
import os
465477
import shutil
478+
import datetime
466479
467480
def find_chromosome(file):
468481
chr_array = []
@@ -516,7 +529,10 @@ task sbg_prepare_segments_1 {
516529
segments = segments[1:] # remove first line
517530
return segments
518531
519-
# prepare GDS output
532+
######################
533+
# prepare GDS output #
534+
######################
535+
beginning = datetime.datetime.now()
520536
input_gdss = pair_chromosome_gds(IIinput_gds_filesII)
521537
output_gdss = []
522538
gds_segments = wdl_get_segments()
@@ -530,8 +546,12 @@ task sbg_prepare_segments_1 {
530546
gds_output_hack = open("gds_output_debug.txt", "w")
531547
gds_output_hack.writelines(["%s " % thing for thing in output_gdss])
532548
gds_output_hack.close()
549+
print("Info: GDS output prepared in %s minutes" % divmod((datetime.datetime.now()-beginning).total_seconds(), 60)[0])
533550
534-
# prepare segment output
551+
######################
552+
# prepare seg output #
553+
######################
554+
beginning = datetime.datetime.now()
535555
input_gdss = pair_chromosome_gds(IIinput_gds_filesII)
536556
output_segments = []
537557
actual_segments = wdl_get_segments()
@@ -544,23 +564,28 @@ task sbg_prepare_segments_1 {
544564
seg_num = i+1
545565
output_segments.append(seg_num)
546566
output_seg_as_file = open("%s.integer" % seg_num, "w")
547-
567+
548568
# I don't know for sure if this case is actually problematic, but I suspect it will be.
549569
if max(output_segments) != len(output_segments):
550570
print("ERROR: output_segments needs to be a list of consecutive integers.")
551571
print("Debug: Max of list: %s. Len of list: %s." %
552572
[max(output_segments), len(output_segments)])
553573
print("Debug: List is as follows:\n\t%s" % output_segments)
554574
exit(1)
575+
555576
segs_output_hack = open("segs_output_debug.txt", "w")
556577
segs_output_hack.writelines(["%s " % thing for thing in output_segments])
557578
segs_output_hack.close()
579+
print("Info: Segment output prepared in %s minutes" % divmod((datetime.datetime.now()-beginning).total_seconds(), 60)[0])
558580
559-
# prepare aggregate output
581+
######################
582+
# prepare agg output #
583+
######################
560584
# The CWL accounts for there being no aggregate files as the CWL considers them an optional
561585
# input. We don't need to account for that because the way WDL works means it they are a
562586
# required output of a previous task and a required input of this task. That said, if this
563587
# code is reused for other WDLs, it may need some adjustments right around here.
588+
beginning = datetime.datetime.now()
564589
input_gdss = pair_chromosome_gds(IIinput_gds_filesII)
565590
agg_segments = wdl_get_segments()
566591
if 'chr' in os.path.basename(IIaggregate_filesII[0]):
@@ -577,8 +602,12 @@ task sbg_prepare_segments_1 {
577602
output_aggregate_files.append(input_aggregate_files[chr])
578603
elif (chr in input_gdss):
579604
output_aggregate_files.append(None)
605+
print("Info: Aggregate output prepared in %s minutes" % divmod((datetime.datetime.now()-beginning).total_seconds(), 60)[0])
580606
581-
# prepare variant include output
607+
#########################
608+
# prepare varinc output #
609+
#########################
610+
beginning = datetime.datetime.now()
582611
input_gdss = pair_chromosome_gds(IIinput_gds_filesII)
583612
var_segments = wdl_get_segments()
584613
if IIvariant_include_filesII != [""]:
@@ -608,6 +637,7 @@ task sbg_prepare_segments_1 {
608637
var_output_hack = open("variant_output_debug.txt", "w")
609638
var_output_hack.writelines(["%s " % thing for thing in output_variant_files])
610639
var_output_hack.close()
640+
print("Info: Variant include output prepared in %s minutes" % divmod((datetime.datetime.now()-beginning).total_seconds(), 60)[0])
611641
612642
# We can only consistently tell output files apart by their extension. If var include files
613643
# and agg files are both outputs, this is problematic, as they both share the RData ext.
@@ -617,9 +647,11 @@ task sbg_prepare_segments_1 {
617647
os.mkdir("temp")
618648
619649
# make a bunch of zip files
650+
print("Preparing zip file outputs...")
620651
for i in range(0, max(output_segments)):
652+
beginning = datetime.datetime.now()
621653
plusone = i+1
622-
this_zip = ZipFile("dotprod%s.zip" % plusone, "w")
654+
this_zip = ZipFile("dotprod%s.zip" % plusone, "w", allowZip64=True)
623655
this_zip.write("%s" % output_gdss[i])
624656
this_zip.write("%s.integer" % output_segments[i])
625657
this_zip.write("%s" % output_aggregate_files[i])
@@ -651,13 +683,15 @@ task sbg_prepare_segments_1 {
651683
652684
this_zip.write("varinclude/%s" % output_variant_files[i])
653685
this_zip.close()
686+
print("Info: Wrote dotprod%s.zip" % plusone)
687+
print("Info: This took %s minutes" % divmod((datetime.datetime.now()-beginning).total_seconds(), 60)[0])
654688
CODE
655689
>>>
656690

657691
runtime {
658692
cpu: cpu
659693
docker: "uwgac/topmed-master@sha256:c564d54f5a3b9daed7a7677f860155f3b8c310b0771212c2eef1d6338f5c2600" # uwgac/topmed-master:2.12.0
660-
disks: "local-disk " + dsk_size + " HDD"
694+
disks: "local-disk " + dsk_size + " SSD"
661695
memory: "${memory} GB"
662696
preemptibles: "${preempt}"
663697
}
@@ -692,10 +726,10 @@ task assoc_aggregate {
692726
String? genome_build # acts as enum
693727
694728
# runtime attr
695-
Int addldisk = 1
696-
Int cpu = 1
697-
Int memory = 8
698-
Int preempt = 0
729+
Int addldisk = 50
730+
Int cpu = 4
731+
Int memory = 16
732+
Int preempt = 1
699733

700734
Boolean debug = false
701735
}
@@ -1346,19 +1380,23 @@ workflow assoc_agg {
13461380
String? weight_user
13471381
}
13481382

1383+
Int num_gds_files = length(input_gds_files)
1384+
13491385
# In order to force this to run first, all other tasks that use these "psuedoenums"
13501386
# (Strings that mimic type Enum from CWL) will take them in via outputs of this task
13511387
call wdl_validate_inputs {
13521388
input:
13531389
genome_build = genome_build,
13541390
aggregate_type = aggregate_type,
1355-
test = test
1391+
test = test,
1392+
num_gds_files = num_gds_files
13561393
}
13571394
13581395
scatter(gds_file in input_gds_files) {
13591396
call sbg_gds_renamer {
13601397
input:
1361-
in_variant = gds_file
1398+
in_variant = gds_file,
1399+
noop = wdl_validate_inputs.valid_genome_build
13621400
}
13631401
}
13641402

assoc-aggregate/checker/assoc-aggregate-checker.wdl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
version 1.0
22
import "https://raw.githubusercontent.com/DataBiosphere/analysis_pipeline_WDL/v7.0.0/assoc-aggregate/assoc-aggregate.wdl" as assoc_agg_wf
3+
#import "../assoc-aggregate.wdl" as assoc_agg_wf # use this if you want to test a local version
34
import "https://raw.githubusercontent.com/dockstore/checker-WDL-templates/v1.1.0/checker_tasks/arraycheck_task.wdl" as verify_array
45

56
workflow aggie_checker {

assoc-aggregate/prepare_segments_1.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from zipfile import ZipFile
1414
import os
1515
import shutil
16+
import datetime
1617

1718
def find_chromosome(file):
1819
chr_array = []
@@ -66,7 +67,10 @@ def wdl_get_segments():
6667
segments = segments[1:] # remove first line
6768
return segments
6869

69-
# prepare GDS output
70+
######################
71+
# prepare GDS output #
72+
######################
73+
beginning = datetime.datetime.now()
7074
input_gdss = pair_chromosome_gds(IIinput_gds_filesII)
7175
output_gdss = []
7276
gds_segments = wdl_get_segments()
@@ -80,8 +84,12 @@ def wdl_get_segments():
8084
gds_output_hack = open("gds_output_debug.txt", "w")
8185
gds_output_hack.writelines(["%s " % thing for thing in output_gdss])
8286
gds_output_hack.close()
87+
print("Info: GDS output prepared in %s minutes" % divmod((datetime.datetime.now()-beginning).total_seconds(), 60)[0])
8388

84-
# prepare segment output
89+
######################
90+
# prepare seg output #
91+
######################
92+
beginning = datetime.datetime.now()
8593
input_gdss = pair_chromosome_gds(IIinput_gds_filesII)
8694
output_segments = []
8795
actual_segments = wdl_get_segments()
@@ -106,12 +114,16 @@ def wdl_get_segments():
106114
segs_output_hack = open("segs_output_debug.txt", "w")
107115
segs_output_hack.writelines(["%s " % thing for thing in output_segments])
108116
segs_output_hack.close()
117+
print("Info: Segment output prepared in %s minutes" % divmod((datetime.datetime.now()-beginning).total_seconds(), 60)[0])
109118

110-
# prepare aggregate output
119+
######################
120+
# prepare agg output #
121+
######################
111122
# The CWL accounts for there being no aggregate files as the CWL considers them an optional
112123
# input. We don't need to account for that because the way WDL works means it they are a
113124
# required output of a previous task and a required input of this task. That said, if this
114125
# code is reused for other WDLs, it may need some adjustments right around here.
126+
beginning = datetime.datetime.now()
115127
input_gdss = pair_chromosome_gds(IIinput_gds_filesII)
116128
agg_segments = wdl_get_segments()
117129
if 'chr' in os.path.basename(IIaggregate_filesII[0]):
@@ -128,8 +140,12 @@ def wdl_get_segments():
128140
output_aggregate_files.append(input_aggregate_files[chr])
129141
elif (chr in input_gdss):
130142
output_aggregate_files.append(None)
143+
print("Info: Aggregate output prepared in %s minutes" % divmod((datetime.datetime.now()-beginning).total_seconds(), 60)[0])
131144

132-
# prepare variant include output
145+
#########################
146+
# prepare varinc output #
147+
#########################
148+
beginning = datetime.datetime.now()
133149
input_gdss = pair_chromosome_gds(IIinput_gds_filesII)
134150
var_segments = wdl_get_segments()
135151
if IIvariant_include_filesII != [""]:
@@ -159,6 +175,7 @@ def wdl_get_segments():
159175
var_output_hack = open("variant_output_debug.txt", "w")
160176
var_output_hack.writelines(["%s " % thing for thing in output_variant_files])
161177
var_output_hack.close()
178+
print("Info: Variant include output prepared in %s minutes" % divmod((datetime.datetime.now()-beginning).total_seconds(), 60)[0])
162179

163180
# We can only consistently tell output files apart by their extension. If var include files
164181
# and agg files are both outputs, this is problematic, as they both share the RData ext.
@@ -168,9 +185,11 @@ def wdl_get_segments():
168185
os.mkdir("temp")
169186

170187
# make a bunch of zip files
188+
print("Preparing zip file outputs...")
171189
for i in range(0, max(output_segments)):
190+
beginning = datetime.datetime.now()
172191
plusone = i+1
173-
this_zip = ZipFile("dotprod%s.zip" % plusone, "w")
192+
this_zip = ZipFile("dotprod%s.zip" % plusone, "w", allowZip64=True)
174193
this_zip.write("%s" % output_gdss[i])
175194
this_zip.write("%s.integer" % output_segments[i])
176195
this_zip.write("%s" % output_aggregate_files[i])
@@ -201,4 +220,6 @@ def wdl_get_segments():
201220
exit(1)
202221

203222
this_zip.write("varinclude/%s" % output_variant_files[i])
204-
this_zip.close()
223+
this_zip.close()
224+
print("Info: Wrote dotprod%s.zip" % plusone)
225+
print("Info: This took %s minutes" % divmod((datetime.datetime.now()-beginning).total_seconds(), 60)[0])

assoc-aggregate/readme.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ Aggregate tests are typically used to jointly test rare variants. This workflow
3131

3232
* Do not use this pipeline with non-consecutive chromosomes. A run containing chr1, chr2, and chr3 will work. A run containing chr1, chr20, and chr13 may not. (Non-autosomes excluded -- a run containing chr1, chr2, and chrX will not error out, but as noted above chrX will be dropped.)
3333

34+
* Do not use this pipeline on only one chromosome.
35+
36+
* If your data consists of GDS files that are >10 GB, it is *recommended* to set a lower number of segments in order to decrease delocalization time in the prepare segments task. For reference, it took almost two hours for a 25-segment output of 4.3 GB GDS files to finish delocalizing in prepare segments. I am currently seeking additional guidance from Cromwell and Terra devs on handling this bottleneck better.
37+
3438
## Sample Inputs
3539
* terra-allele, local-allele, and the first part of the checker workflow are based upon [assoc_aggregate_allele.config](https://github.com/UW-GAC/analysis_pipeline/blob/master/testdata/assoc_aggregate_allele.config)
3640
* terra-position, local-position, and the second part of the checker workflow are based upon [assoc_aggregate_position.config](https://github.com/UW-GAC/analysis_pipeline/blob/master/testdata/assoc_aggregate_position.config)

ld-pruning/ld-pruning-terra.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,8 @@
2323
"gs://topmed_workflow_testing/UWGAC_WDL/checker/a_vcf2gds/1KG_phase3_subset_chr21.gds",
2424
"gs://topmed_workflow_testing/UWGAC_WDL/checker/a_vcf2gds/1KG_phase3_subset_chr22.gds"
2525
],
26+
"ldpruning.ld_pruning.addldisk": "5",
27+
"ldpruning.ld_pruning.cpu": "2",
28+
"ldpruning.ld_pruning.memory": "4",
2629
"ldpruning.ld_pruning.genome_build": "hg38"
2730
}

ld-pruning/ld-pruning.wdl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@ task ld_pruning {
1818
String? out_prefix
1919

2020
# runtime attributes
21-
Int addldisk = 5
22-
Int cpu = 2
23-
Int memory = 4
21+
Int addldisk = 10
22+
Int cpu = 4
23+
Int memory = 8
2424
Int preempt = 3
2525
}
2626

@@ -201,7 +201,7 @@ task merge_gds {
201201
# runtime attributes
202202
Int addldisk = 5
203203
Int cpu = 2
204-
Int memory = 4
204+
Int memory = 8
205205
Int preempt = 3
206206
}
207207

vcf-to-gds/vcf-to-gds-terra.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@
2525
"gs://topmed_workflow_testing/UWGAC_WDL/1KG_phase3_subset_chrX.vcf.gz"
2626
],
2727
"vcftogds.vcf2gds.addldisk": 1,
28+
"vcftogds.vcf2gds.cpu": 2,
29+
"vcftogds.vcf2gds.memory": 4,
2830
"vcftogds.unique_variant_id.addldisk": 1,
31+
"vcftogds.unique_variant_id.memory": 4,
2932
"vcftogds.check_gds": true,
3033
"vcftogds.check_gds.addldisk": 1,
3134
"vcftogds.check_gds.preempt": 1

0 commit comments

Comments
 (0)