Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
47d0168
Add initial task graph and metadata json file
bentsherman Mar 27, 2023
ae67027
Add task inputs and outputs to conrete DAG
bentsherman Mar 28, 2023
8f95cd6
Fix failing tests
bentsherman Mar 28, 2023
9f11e4b
Use path-based APIs to get file metadata
bentsherman Mar 29, 2023
db6aed1
Merge branch 'master' into ben-task-graph
bentsherman Mar 29, 2023
8456892
Use buffer to compute checksum
bentsherman Mar 30, 2023
0dd98d6
Merge branch 'master' into ben-task-graph-pull
bentsherman Apr 21, 2023
0f505d3
Merge branch 'master' into ben-task-graph-pull
bentsherman Apr 26, 2023
e81e584
Replace synchronized with lock
bentsherman Apr 26, 2023
35bac94
Refactor task graph to not depend on task directory naming
bentsherman Apr 29, 2023
8bbe3d7
Replace abstract/concrete with process/task
bentsherman Apr 29, 2023
7ec397f
Add support for AWS SSE env variables
pditommaso May 24, 2023
c0c7492
Merge branch 'master' into ben-task-graph-pull
bentsherman May 26, 2023
49396cf
Fix failing tests
bentsherman May 26, 2023
910a2f9
Merge branch 'master' into ben-task-graph-pull
bentsherman Jul 7, 2023
0c63254
Rename 'process' option to 'workflow'
bentsherman Jul 7, 2023
9b934e0
Save task inputs and outputs to cache db instead of json file
bentsherman Jul 13, 2023
08fda25
Decouple task graph from DAG renderer
bentsherman Jul 24, 2023
40a05e4
Merge branch 'master' into ben-task-graph
bentsherman Jul 24, 2023
5eb9de1
Improve DAG rendering
bentsherman Jul 25, 2023
47e595e
Merge branch 'master' into ben-task-graph
bentsherman Sep 23, 2023
c7422df
Add subworkflows to task graph
bentsherman Sep 23, 2023
cc8b6dc
Remove task DAG rendering (in favor of nf-prov)
bentsherman Sep 28, 2023
8ed4c9f
Merge branch 'master' into ben-task-graph
bentsherman Sep 28, 2023
73459b7
Revert unrelated changes
bentsherman Sep 28, 2023
d46b346
Remove unused code
bentsherman Sep 29, 2023
d851c2d
Use CacheHelper to compute MD5 checksum
bentsherman Oct 3, 2023
501bce7
Add temporary debug logging
bentsherman Oct 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/config.rst
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ Name Description
enabled When ``true`` turns on the generation of the DAG file (default: ``false``).
file Graph file name (default: ``dag-<timestamp>.dot``).
overwrite When ``true`` overwrites any existing DAG file with the same name.
type Can be ``abstract`` to render the abstract DAG or ``concrete`` to render the concrete (task) DAG (default: ``abstract``).
================== ================

The above options can be used by prefixing them with the ``dag`` scope or surrounding them by curly
Expand Down
9 changes: 9 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import nextflow.conda.CondaConfig
import nextflow.config.Manifest
import nextflow.container.ContainerConfig
import nextflow.dag.DAG
import nextflow.dag.ConcreteDAG
import nextflow.exception.AbortOperationException
import nextflow.exception.AbortSignalException
import nextflow.exception.IllegalConfigException
Expand Down Expand Up @@ -193,6 +194,8 @@ class Session implements ISession {

private DAG dag

private ConcreteDAG concreteDag

private CacheDB cache

private Barrier processesBarrier = new Barrier()
Expand Down Expand Up @@ -345,6 +348,7 @@ class Session implements ISession {

// -- DAG object
this.dag = new DAG()
this.concreteDag = new ConcreteDAG()

// -- init work dir
this.workDir = ((config.workDir ?: 'work') as Path).complete()
Expand Down Expand Up @@ -799,6 +803,8 @@ class Session implements ISession {

DAG getDag() { this.dag }

ConcreteDAG getConcreteDAG() { this.concreteDag }

ExecutorService getExecService() { execService }

/**
Expand Down Expand Up @@ -1007,6 +1013,9 @@ class Session implements ISession {
final trace = handler.safeTraceRecord()
cache.putTaskAsync(handler, trace)

// save the task meta file to the task directory
handler.writeMetaFile()

// notify the event to the observers
for( int i=0; i<observers.size(); i++ ) {
final observer = observers.get(i)
Expand Down
105 changes: 105 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/dag/ConcreteDAG.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.dag

import java.nio.file.Path
import java.util.regex.Pattern

import groovy.transform.MapConstructor
import groovy.transform.ToString
import groovy.util.logging.Slf4j
import nextflow.processor.TaskRun
import nextflow.script.params.FileOutParam
/**
* Model the conrete (task) graph of a pipeline execution.
*
* @author Ben Sherman <bentshermann@gmail.com>
*/
@Slf4j
class ConcreteDAG {

Map<String,Task> nodes = new HashMap<>(100)

/**
* Add a task to the graph
*
* @param task
*/
synchronized void addTask( TaskRun task ) {
final hash = task.hash.toString()
final label = "[${hash.substring(0,2)}/${hash.substring(2,8)}] ${task.name}"
final inputs = task.getInputFilesMap()
.collect { name, path ->
new Input(name: name, path: path, predecessor: getPredecessorHash(path))
}

nodes[hash] = new Task(
index: nodes.size(),
label: label,
inputs: inputs
)
}

static public String getPredecessorHash(Path path) {
final pattern = Pattern.compile('.*/([0-9a-f]{2}/[0-9a-f]{30})')
final matcher = pattern.matcher(path.toString())

matcher.find() ? matcher.group(1).replace('/', '') : null
}

/**
* Add a task's outputs to the graph
*
* @param task
*/
synchronized void addTaskOutputs( TaskRun task ) {
final hash = task.hash.toString()
final outputs = task.getOutputsByType(FileOutParam)
.values()
.flatten()
.collect { path ->
new Output(name: path.name, path: path)
}

nodes[hash].outputs = outputs
}

@MapConstructor
@ToString(includeNames = true, includes = 'label', includePackage=false)
static protected class Task {
int index
String label
List<Input> inputs
List<Output> outputs

String getSlug() { "t${index}" }
}

@MapConstructor
static protected class Input {
String name
Path path
String predecessor
}

@MapConstructor
static protected class Output {
String name
Path path
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import java.nio.file.Path
class CytoscapeHtmlRenderer implements DagRenderer {

@Override
void renderDocument(DAG dag, Path file) {
void renderAbstractGraph(DAG dag, Path file) {
String tmplPage = readTemplate()
String network = CytoscapeJsRenderer.renderNetwork(dag)
file.text = tmplPage.replaceAll(~/\/\* REPLACE_WITH_NETWORK_DATA \*\//, network)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import java.nio.file.Path
class CytoscapeJsRenderer implements DagRenderer {

@Override
void renderDocument(DAG dag, Path file) {
void renderAbstractGraph(DAG dag, Path file) {
file.text = renderNetwork(dag)
}

Expand Down
2 changes: 1 addition & 1 deletion modules/nextflow/src/main/groovy/nextflow/dag/DAG.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import nextflow.script.params.TupleOutParam
import java.util.concurrent.atomic.AtomicLong

/**
* Model a direct acyclic graph of the pipeline execution.
* Model the abstract graph of a pipeline execution.
*
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
*/
Expand Down
15 changes: 12 additions & 3 deletions modules/nextflow/src/main/groovy/nextflow/dag/DagRenderer.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,19 @@ import java.nio.file.Path
* @author Paolo Di Tommaso <paolo.ditommaso@gmail.com>
* @author Mike Smoot <mes@aescon.com>
*/
interface DagRenderer {
trait DagRenderer {

/**
* Render the dag to the specified file.
* Render an abstract (process/operator) DAG.
*/
void renderDocument(DAG dag, Path file);
void renderAbstractGraph(DAG dag, Path file) {
throw new UnsupportedOperationException("Abstract graph rendering is not supported for this file format")
}

/**
* Render a concrete (task) DAG.
*/
void renderConcreteGraph(ConcreteDAG dag, Path file) {
throw new UnsupportedOperationException("Concrete graph rendering is not supported for this file format")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class DotRenderer implements DagRenderer {
static String normalise(String str) { str.replaceAll(/[^0-9_A-Za-z]/,'') }

@Override
void renderDocument(DAG dag, Path file) {
void renderAbstractGraph(DAG dag, Path file) {
file.text = renderNetwork(dag)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class GexfRenderer implements DagRenderer {
}

@Override
void renderDocument(DAG dag, Path file) {
void renderAbstractGraph(DAG dag, Path file) {
final Charset charset = Charset.defaultCharset()
Writer bw = Files.newBufferedWriter(file, charset)
final XMLOutputFactory xof = XMLOutputFactory.newFactory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class GraphvizRenderer implements DagRenderer {
* See http://www.graphviz.org for more info.
*/
@Override
void renderDocument(DAG dag, Path target) {
void renderAbstractGraph(DAG dag, Path target) {
def result = Files.createTempFile('nxf-',".$format")
def temp = Files.createTempFile('nxf-','.dot')
// save the DAG as `dot` to a temp file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@ import java.nio.file.Path
class MermaidRenderer implements DagRenderer {

@Override
void renderDocument(DAG dag, Path file) {
file.text = renderNetwork(dag)
}

String renderNetwork(DAG dag) {
void renderAbstractGraph(DAG dag, Path file) {
def lines = []
lines << "flowchart TD"

Expand All @@ -44,7 +40,7 @@ class MermaidRenderer implements DagRenderer {

lines << ""

return lines.join('\n')
file.text = lines.join('\n')
}

private String renderVertex(DAG.Vertex vertex) {
Expand Down Expand Up @@ -75,4 +71,49 @@ class MermaidRenderer implements DagRenderer {

return "${edge.from.name} -->${label} ${edge.to.name}"
}

@Override
void renderConcreteGraph(ConcreteDAG graph, Path file) {
def renderedOutputs = [] as Set<Path>
def numInputs = 0
def numOutputs = 0

def lines = []
lines << "flowchart TD"

// render tasks and task inputs
graph.nodes.values().each { task ->
// render task node
lines << " ${task.getSlug()}[\"${task.label}\"]"

task.inputs.each { input ->
// render task input from predecessor
if( input.predecessor != null ) {
final pred = graph.nodes[input.predecessor]
lines << " ${pred.getSlug()} -->|${input.name}| ${task.getSlug()}"
renderedOutputs << input.path
}

// render task input from source node
else {
numInputs += 1
lines << " i${numInputs}(( )) -->|${input.name}| ${task.getSlug()}"
}
}
}

// render task outputs with sink nodes
graph.nodes.values().each { task ->
task.outputs.each { output ->
if( output.path !in renderedOutputs ) {
numOutputs += 1
lines << " ${task.getSlug()} -->|${output.name}| o${numOutputs}(( ))"
}
}
}

lines << ""

file.text = lines.join('\n')
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class NodeMarker {
static private Session getSession() { Global.session as Session }

/**
* Creates a new vertex in the DAG representing a computing `process`
* Creates a vertex in the abstract DAG representing a computing `process`
*
* @param label The label associated to the process
* @param inputs The list of inputs entering in the process
Expand All @@ -52,7 +52,7 @@ class NodeMarker {
}

/**
* Creates a new DAG vertex representing a dataflow operator
* Creates a vertex in the abstract DAG representing a dataflow operator
*
* @param label The operator label
* @param inputs The operator input(s). It can be either a single channel or a list of channels.
Expand All @@ -66,7 +66,7 @@ class NodeMarker {
}

/**
* Creates a vertex in the DAG representing a dataflow channel source.
* Creates a vertex in the abstract DAG representing a dataflow channel source.
*
* @param label The node description
* @param source Either a dataflow channel or a list of channel.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@ package nextflow.processor

import static nextflow.processor.TaskStatus.*

import java.nio.file.Files
import java.nio.file.NoSuchFileException

import groovy.json.JsonBuilder
import groovy.util.logging.Slf4j
import nextflow.dag.ConcreteDAG
import nextflow.extension.FilesEx
import nextflow.script.params.FileOutParam
import nextflow.trace.TraceRecord
/**
* Actions to handle the underlying job running the user task.
Expand Down Expand Up @@ -213,6 +218,29 @@ abstract class TaskHandler {
return record
}

void writeMetaFile() {
final record = [
hash: task.hash.toString(),
inputs: task.getInputFilesMap().collect { name, path ->
[
name: name,
path: path.toString(),
predecessor: ConcreteDAG.getPredecessorHash(path)
]
},
outputs: task.getOutputsByType(FileOutParam).values().flatten().collect { path ->
[
name: path.name,
path: path.toString(),
size: Files.size(path),
checksum: FilesEx.getChecksum(path)
]
}
]

task.workDir.resolve(TaskRun.CMD_META).text = new JsonBuilder(record).toString()
}

/**
* Determine if a process can be forked i.e. can launch
* a parallel task execution. This is only enforced when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ class TaskProcessor {
def invoke = new InvokeTaskAdapter(this, opInputs.size())
session.allOperators << (operator = new DataflowOperator(group, params, invoke))

// notify the creation of a new vertex the execution DAG
// notify the creation of a new process in the abstract DAG
NodeMarker.addProcessNode(this, config.getInputs(), config.getOutputs())

// fix issue #41
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ class TaskRun implements Cloneable {
static final public String CMD_RUN = '.command.run'
static final public String CMD_TRACE = '.command.trace'
static final public String CMD_ENV = '.command.env'
static final public String CMD_META = '.command.meta.json'


String toString( ) {
Expand Down
Loading