11// main.nf
2- // I-BLESS Pipeline Month 2 + Month 3 Steps 1–4
3- // FASTQ → QC → BAM → Break Calling → bedGraph/bigWig → normalized tracks → differential stats → validation
2+ // I-BLESS Pipeline Month 2 + Month 3 + Month 4 updates
3+ // FASTQ → QC → BAM → Break Calling → bedGraph/bigWig → normalized tracks → differential stats → validation → bin-size sweep summary
44// ---------------------------------------------------------
55
66nextflow. enable. dsl = 2
@@ -65,6 +65,32 @@ process BREAK_CALLING {
6565}
6666
6767
68+ // ---------------------------------------------------------
69+ // Helper: build per-bin-size samples TSV for stats module
70+ // Input: (bin_size, [ \"sample\\tplus_bg\\tminus_bg\", ... ])
71+ // Output: (bin_size, samples.tsv)
72+ // ---------------------------------------------------------
73+ process MAKE_SAMPLES_TSV {
74+ tag " bin_${ bin_size} "
75+ publishDir " ${ params.outdir} /stats/bin_${ bin_size} " , mode: ' copy'
76+
77+ input:
78+ tuple val(bin_size), val(lines)
79+
80+ output:
81+ tuple val(bin_size), file(" samples_for_stats_bin_${ bin_size} .tsv" )
82+
83+ script:
84+ // lines is a Groovy List<String>
85+ def content = (lines as List ). join(" \n " ) + " \n "
86+ """
87+ cat > samples_for_stats_bin_${ bin_size} .tsv <<'EOT'
88+ ${ content}
89+ EOT
90+ """
91+ }
92+
93+
6894// ---------------------------------------------------------
6995// Workflow
7096// ---------------------------------------------------------
@@ -90,152 +116,116 @@ workflow {
90116 // ---------------------------------------------------------
91117 if ( params. viz?. enabled ) {
92118
93- // Bin-size sweep support: if viz.bin_sizes is provided, run viz→stats per bin size.
94- // Otherwise, fall back to viz.bin_size (or 500 if unset in config).
119+ // Bin-size sweep support:
120+ // - if viz.bin_sizes provided: run viz→stats/validation per bin size
121+ // - else fall back to viz.bin_size (or 500 if unset)
95122 def bin_sizes = (params. viz?. bin_sizes ?: [ params. viz?. bin_size ?: 500 ]) as List
123+ Channel
124+ .fromList(bin_sizes. collect { it as int })
125+ .set { bin_sizes_ch }
126+
127+ // Fan-out breaks across bin sizes WITHOUT re-invoking the module in a loop
128+ def breaks_with_bin = breaks_out
129+ .cross(bin_sizes_ch)
130+ .map { br, bs ->
131+ def (sample_id, breaks_bed) = br
132+ tuple(bs as int , sample_id, breaks_bed)
133+ }
96134
97- // Hold combined outputs across all bin sizes so we can summarize later
98- def all_diff_out = Channel . empty()
135+ // Step 1: create raw binned bedGraphs + bigWigs + total_dsb.txt
136+ def tracks_out = BREAKS_TO_TRACKS (breaks_with_bin, file(params. genome_fasta))
137+ /*
138+ tracks_out tuple:
139+ (bin_size,
140+ sample_id,
141+ plus.bedGraph, minus.bedGraph,
142+ plus.bw, minus.bw,
143+ total_dsb.txt)
144+ */
145+
146+ // IGV sessions (raw tracks)
147+ def igv_in = tracks_out. map { bin_size , sample_id , plus_bg , minus_bg , plus_bw , minus_bw , total_txt ->
148+ tuple(bin_size, sample_id, plus_bw, minus_bw)
149+ }
150+ MAKE_IGV_SESSION (igv_in)
151+
152+ // Step 2: normalize bedGraphs and create normalized bigWigs
153+ def norm_in = tracks_out. map { bin_size , sample_id , plus_bg , minus_bg , plus_bw , minus_bw , total_txt ->
154+ tuple(bin_size, sample_id, plus_bg, minus_bg, total_txt)
155+ }
156+ NORMALIZE_TRACKS (norm_in, file(params. genome_fasta))
99157
100158 // ---------------------------------------------------------
101- // For each bin size: Step 1 (tracks) → IGV sessions → Step 2 (normalize) → Stats/ Validation
159+ // Month 3 Step 3 + Step 4 (Stats + Validation)
102160 // ---------------------------------------------------------
103- bin_sizes. each { bs ->
104-
105- // Step 1: create raw binned bedGraphs + bigWigs + total_dsb.txt
106- def breaks_with_bin = breaks_out. map { sample_id , breaks_bed ->
107- tuple(bs as int , sample_id, breaks_bed)
108- }
161+ if ( params. stats?. enabled ) {
109162
110- def tracks_out = BREAKS_TO_TRACKS (breaks_with_bin, file(params. genome_fasta))
163+ // Build per-bin-size samples TSVs from raw bedGraphs
164+ def samples_lines_grouped = tracks_out
165+ .map { bin_size, sample_id, plus_bg, minus_bg, plus_bw, minus_bw, total_txt ->
166+ tuple(bin_size, " ${ sample_id} \t ${ plus_bg} \t ${ minus_bg} " )
167+ }
168+ .groupTuple(by : 0 ) // (bin_size, [line1, line2, ...])
111169
112- /*
113- tracks_out tuple:
114- (bin_size,
115- sample_id,
116- plus.bedGraph, minus.bedGraph,
117- plus.bw, minus.bw,
118- total_dsb.txt)
119- */
170+ def samples_tsv_by_bin = MAKE_SAMPLES_TSV (samples_lines_grouped) // (bin_size, samples.tsv)
120171
121- // IGV sessions (raw tracks)
122- def igv_in = tracks_out. map { bin_size , sample_id , plus_bg , minus_bg , plus_bw , minus_bw , total_txt ->
123- tuple(bin_size, sample_id, plus_bw, minus_bw)
124- }
125- MAKE_IGV_SESSION (igv_in)
172+ // Build a channel of tuples: (bin_size, samples_tsv, contrast_name, contrast_spec)
173+ def contrasts = params. stats. contrasts ?: []
174+ def conditions = params. stats. conditions ?: [:]
126175
127- // Step 2: normalize bedGraphs and create normalized bigWigs
128- def norm_in = tracks_out. map { bin_size , sample_id , plus_bg , minus_bg , plus_bw , minus_bw , total_txt ->
129- tuple(bin_size, sample_id, plus_bg, minus_bg, total_txt)
176+ def resolve_group = { obj ->
177+ if (obj == null ) return []
178+ if (obj instanceof List ) return obj as List
179+ def s = obj as String
180+ if (conditions. containsKey(s)) return (conditions[s] as List )
181+ return [s]
130182 }
131- NORMALIZE_TRACKS (norm_in, file(params. genome_fasta))
132-
133- // ---------------------------------------------------------
134- // Month 3 Step 3 + Step 4 (Stats + Validation)
135- // ---------------------------------------------------------
136- if ( params. stats?. enabled ) {
137183
138- // Build samples TSV (raw bedGraphs) for this bin size
139- def samples_tsv_ch = tracks_out
140- .map { bin_size, sample_id, plus_bg, minus_bg, plus_bw, minus_bw, total_txt ->
141- tuple(sample_id, plus_bg, minus_bg)
142- }
143- .collectFile(
144- name : " samples_for_stats_bin_${ bs} .tsv" ,
145- storeDir : " ${ params.outdir} /stats/bin_${ bs} " ,
146- newLine : true
147- ) { row ->
148- " ${ row[0]} \t ${ row[1]} \t ${ row[2]} "
184+ def stats_in_ch = samples_tsv_by_bin. flatMap { bs , samples_tsv ->
185+ contrasts. collect { c ->
186+ def cname = c. name as String
187+ def case_list = []
188+ def ctrl_list = []
189+
190+ if (c. containsKey(' case_condition' ) || c. containsKey(' control_condition' )) {
191+ def cc = c. case_condition as String
192+ def ct = c. control_condition as String
193+ if (! conditions. containsKey(cc) || ! conditions. containsKey(ct)) {
194+ throw new IllegalArgumentException (
195+ " Unknown condition key in contrast '${ cname} ': case_condition='${ cc} ', control_condition='${ ct} '. Available: ${ conditions.keySet()} "
196+ )
197+ }
198+ case_list = (conditions[cc] as List )
199+ ctrl_list = (conditions[ct] as List )
200+ } else {
201+ case_list = resolve_group(c. case )
202+ ctrl_list = resolve_group(c. control)
149203 }
150204
151- // Build a channel of tuples: (bin_size, samples_tsv, contrast_name, contrast_spec)
152- def contrasts = params. stats. contrasts ?: []
153-
154- /*
155- Replicate-aware contrast specification (backwards compatible):
156- - legacy:
157- contrasts:
158- - name: "APH_vs_DMSO_rep1"
159- case: ["SAMPLE_A"]
160- control: ["SAMPLE_B"]
161- - condition-based (recommended):
162- conditions:
163- APH: ["APH_rep1", "APH_rep2"]
164- DMSO: ["DMSO_rep1", "DMSO_rep2"]
165- contrasts:
166- - name: "APH_vs_DMSO"
167- case_condition: "APH"
168- control_condition: "DMSO"
169-
170- Also supported (shorthand):
171- case: "APH" control: "DMSO" (if those keys exist in conditions)
172- */
173- def conditions = params. stats. conditions ?: [:]
174-
175- def resolve_group = { obj ->
176- if (obj == null ) {
177- return []
178- }
179- // If it's already a list of sample IDs
180- if (obj instanceof List ) {
181- return obj as List
205+ if (! case_list || ! ctrl_list) {
206+ throw new IllegalArgumentException (" Contrast '${ cname} ' must define non-empty case and control groups" )
182207 }
183- // If it's a string, treat it as a condition name if present; otherwise a single sample ID
184- def s = obj as String
185- if (conditions. containsKey(s)) {
186- return (conditions[s] as List )
187- }
188- return [s]
189- }
190208
191- def stats_in_ch = samples_tsv_ch. flatMap { samples_tsv ->
192- contrasts. collect { c ->
193- def cname = c. name as String
194- def case_list = []
195- def ctrl_list = []
196-
197- if (c. containsKey(' case_condition' ) || c. containsKey(' control_condition' )) {
198- def cc = c. case_condition as String
199- def ct = c. control_condition as String
200- if (! conditions. containsKey(cc) || ! conditions. containsKey(ct)) {
201- throw new IllegalArgumentException (" Unknown condition key in contrast '${ cname} ': case_condition='${ cc} ', control_condition='${ ct} '. Available: ${ conditions.keySet()} " )
202- }
203- case_list = (conditions[cc] as List )
204- ctrl_list = (conditions[ct] as List )
205- } else {
206- case_list = resolve_group(c. case )
207- ctrl_list = resolve_group(c. control)
208- }
209-
210- if (! case_list || ! ctrl_list) {
211- throw new IllegalArgumentException (" Contrast '${ cname} ' must define non-empty case and control groups" )
212- }
213-
214- def case_ids = (case_list as List ). join(' ,' )
215- def ctrl_ids = (ctrl_list as List ). join(' ,' )
216- def spec = " ${ cname} :${ case_ids} :${ ctrl_ids} "
217- tuple(bs as int , samples_tsv, cname, spec)
218- }
209+ def case_ids = (case_list as List ). join(' ,' )
210+ def ctrl_ids = (ctrl_list as List ). join(' ,' )
211+ def spec = " ${ cname} :${ case_ids} :${ ctrl_ids} "
212+ tuple(bs as int , samples_tsv, cname, spec)
219213 }
214+ }
220215
221- // Step 3: differential testing (runs once per contrast)
222- def diff_out = DIFF_BREAKS (stats_in_ch)
223- all_diff_out = all_diff_out. mix(diff_out)
216+ // Step 3: differential testing (once; runs per (bin_size, contrast) tuple)
217+ def diff_out = DIFF_BREAKS (stats_in_ch)
224218
225- // Step 4: validation (optional; runs once per contrast)
226- if ( params. validation?. enabled ) {
227- VALIDATE_DIFF (stats_in_ch)
228- }
219+ // Step 4: validation (optional; runs per (bin_size, contrast) tuple)
220+ if ( params. validation?. enabled ) {
221+ VALIDATE_DIFF (stats_in_ch)
229222 }
230- }
231223
232- // After the sweep finishes, write a single summary table across all bin sizes/contrasts
233- if ( params. stats?. enabled ) {
234- def summary_in = all_diff_out. map { bin_size , contrast_name , tsv , sig , sig_up , sig_down , volcano_png , volcano_pdf , ma_png , ma_pdf , summary_txt ->
224+ // After stats finish, write a single summary table across all bin sizes/contrasts
225+ def summary_in = diff_out. map { bin_size , contrast_name , tsv , sig , sig_up , sig_down , volcano_png , volcano_pdf , ma_png , ma_pdf , summary_txt ->
235226 tuple(bin_size, contrast_name, summary_txt)
236227 }
237228
238- // Build a single manifest TSV to aggregate in one process
239229 def manifest_ch = summary_in. collectFile(
240230 name : " bin_sweep_inputs.tsv" ,
241231 storeDir : " ${ params.outdir} /stats" ,
@@ -247,4 +237,4 @@ workflow {
247237 BIN_SWEEP_SUMMARY (manifest_ch)
248238 }
249239 }
250- }
240+ }
0 commit comments