Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
47d0168
Add initial task graph and metadata json file
bentsherman Mar 27, 2023
ae67027
Add task inputs and outputs to conrete DAG
bentsherman Mar 28, 2023
8f95cd6
Fix failing tests
bentsherman Mar 28, 2023
9f11e4b
Use path-based APIs to get file metadata
bentsherman Mar 29, 2023
db6aed1
Merge branch 'master' into ben-task-graph
bentsherman Mar 29, 2023
8456892
Use buffer to compute checksum
bentsherman Mar 30, 2023
0dd98d6
Merge branch 'master' into ben-task-graph-pull
bentsherman Apr 21, 2023
0f505d3
Merge branch 'master' into ben-task-graph-pull
bentsherman Apr 26, 2023
e81e584
Replace synchronized with lock
bentsherman Apr 26, 2023
35bac94
Refactor task graph to not depend on task directory naming
bentsherman Apr 29, 2023
8bbe3d7
Replace abstract/concrete with process/task
bentsherman Apr 29, 2023
7ec397f
Add support for AWS SSE env variables
pditommaso May 24, 2023
c0c7492
Merge branch 'master' into ben-task-graph-pull
bentsherman May 26, 2023
49396cf
Fix failing tests
bentsherman May 26, 2023
910a2f9
Merge branch 'master' into ben-task-graph-pull
bentsherman Jul 7, 2023
0c63254
Rename 'process' option to 'workflow'
bentsherman Jul 7, 2023
9b934e0
Save task inputs and outputs to cache db instead of json file
bentsherman Jul 13, 2023
08fda25
Decouple task graph from DAG renderer
bentsherman Jul 24, 2023
40a05e4
Merge branch 'master' into ben-task-graph
bentsherman Jul 24, 2023
5eb9de1
Improve DAG rendering
bentsherman Jul 25, 2023
47e595e
Merge branch 'master' into ben-task-graph
bentsherman Sep 23, 2023
c7422df
Add subworkflows to task graph
bentsherman Sep 23, 2023
cc8b6dc
Remove task DAG rendering (in favor of nf-prov)
bentsherman Sep 28, 2023
8ed4c9f
Merge branch 'master' into ben-task-graph
bentsherman Sep 28, 2023
73459b7
Revert unrelated changes
bentsherman Sep 28, 2023
d46b346
Remove unused code
bentsherman Sep 29, 2023
d851c2d
Use CacheHelper to compute MD5 checksum
bentsherman Oct 3, 2023
501bce7
Add temporary debug logging
bentsherman Oct 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import nextflow.conda.CondaConfig
import nextflow.config.Manifest
import nextflow.container.ContainerConfig
import nextflow.dag.DAG
import nextflow.dag.TaskDAG
import nextflow.exception.AbortOperationException
import nextflow.exception.AbortSignalException
import nextflow.exception.IllegalConfigException
Expand Down Expand Up @@ -204,6 +205,8 @@ class Session implements ISession {

private DAG dag

private TaskDAG taskDag

private CacheDB cache

private Barrier processesBarrier = new Barrier()
Expand Down Expand Up @@ -356,6 +359,7 @@ class Session implements ISession {

// -- DAG object
this.dag = new DAG()
this.taskDag = new TaskDAG()

// -- init work dir
this.workDir = ((config.workDir ?: 'work') as Path).complete()
Expand Down Expand Up @@ -812,6 +816,8 @@ class Session implements ISession {

DAG getDag() { this.dag }

TaskDAG getTaskDag() { this.taskDag }

ExecutorService getExecService() { execService }

/**
Expand Down Expand Up @@ -980,6 +986,8 @@ class Session implements ISession {
void notifyTaskSubmit( TaskHandler handler ) {
final task = handler.task
log.info "[${task.hashLog}] ${task.runType.message} > ${task.name}"
// -- update task graph
taskDag.addTask(task)
// -- save a record in the cache index
cache.putIndexAsync(handler)

Expand Down Expand Up @@ -1017,8 +1025,12 @@ class Session implements ISession {
* @param handler
*/
void notifyTaskComplete( TaskHandler handler ) {
// update task graph
taskDag.addTaskOutputs(handler.task)

// save the completed task in the cache DB
final trace = handler.safeTraceRecord()
taskDag.saveToRecord(handler.task, trace)
cache.putTaskAsync(handler, trace)

// notify the event to the observers
Expand All @@ -1034,6 +1046,10 @@ class Session implements ISession {
}

void notifyTaskCached( TaskHandler handler ) {
// update task graph
taskDag.addTask(handler.task)
taskDag.addTaskOutputs(handler.task)

final trace = handler.getTraceRecord()
// save a record in the cache index only the when the trace record is available
// otherwise it means that the event is trigger by a `stored dir` driven task
Expand Down
2 changes: 1 addition & 1 deletion modules/nextflow/src/main/groovy/nextflow/dag/DAG.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import nextflow.script.params.TupleOutParam
import java.util.concurrent.atomic.AtomicLong

/**
* Model a direct acyclic graph of the pipeline execution.
* Model the directed acyclic graph of the workflow definition.
*
* @author Paolo Di Tommaso <[email protected]>
*/
Expand Down
142 changes: 142 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/dag/TaskDAG.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.dag

import java.nio.file.Files
import java.nio.file.Path
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock

import groovy.transform.CompileStatic
import groovy.transform.TupleConstructor
import groovy.util.logging.Slf4j
import nextflow.extension.FilesEx
import nextflow.processor.TaskRun
import nextflow.script.params.FileOutParam
import nextflow.trace.TraceRecord
/**
* Model the directed acyclic graph of the workflow execution.
*
* @author Ben Sherman <[email protected]>
*/
@Slf4j
@CompileStatic
class TaskDAG {

private Map<TaskRun,Vertex> vertices = new HashMap<>()

private Map<Path,TaskRun> taskLookup = new HashMap<>()

private Lock sync = new ReentrantLock()

Map<TaskRun,Vertex> getVertices() { vertices }

/**
* Add a task to the graph.
*
* @param task
*/
void addTask(TaskRun task) {
final inputs = task.getInputFilesMap()

sync.lock()
try {
// add new task to graph
vertices[task] = new Vertex(inputs)
}
finally {
sync.unlock()
}
}

/**
* Add a task's outputs to the graph.
*
* @param task
*/
void addTaskOutputs(TaskRun task) {
final outputs = task
.getOutputsByType(FileOutParam)
.values()
.flatten() as Set<Path>

sync.lock()
try {
// add task outputs to graph
vertices[task].outputs = outputs

// add new output files to task lookup
for( Path path : outputs )
taskLookup[path] = task
}
finally {
sync.unlock()
}
}

/**
* Get the task that produced the given file.
*
* @param path
*/
TaskRun getProducerTask(Path path) {
taskLookup[path]
}

/**
* Get the vertex for the task that produced the given file.
*
* @param path
*/
Vertex getProducerVertex(Path path) {
vertices[taskLookup[path]]
}

/**
* Save task input and output metadata to trace record.
*
* @param task
* @param record
*/
void saveToRecord(TaskRun task, TraceRecord record) {
final vertex = vertices[task]

record.inputs = vertex.inputs.collect { name, path ->
final producer = getProducerTask(path)
new TraceRecord.Input(
name,
path,
producer ? producer.hash.toString() : null)
}

record.outputs = vertex.outputs.collect { path ->
new TraceRecord.Output(
path,
Files.size(path),
FilesEx.getChecksum(path))
}

log.info "task ${task.name} ; inputs: ${record.inputs} ; outputs: ${record.outputs}"
}

@TupleConstructor(excludes = 'outputs')
static class Vertex {
Map<String,Path> inputs
Set<Path> outputs
}

}
34 changes: 34 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import groovy.json.StringEscapeUtils
import groovy.transform.CompileStatic
import groovy.transform.Memoized
import groovy.transform.PackageScope
import groovy.transform.ToString
import groovy.transform.TupleConstructor
import groovy.util.logging.Slf4j
import nextflow.cloud.types.CloudMachineInfo
import nextflow.extension.Bolts
Expand Down Expand Up @@ -609,4 +611,36 @@ class TraceRecord implements Serializable {
this.machineInfo = value
}

List<Input> getInputs() {
return store.inputs as List<Input>
}

void setInputs(List<Input> inputs) {
store.inputs = inputs
}

List<Output> getOutputs() {
return store.outputs as List<Output>
}

void setOutputs(List<Output> outputs) {
store.outputs = outputs
}

@TupleConstructor
@ToString
static class Input {
String stageName
Path storePath
String source
}

@TupleConstructor
@ToString
static class Output {
Path path
long size
String checksum
}

}
75 changes: 75 additions & 0 deletions modules/nextflow/src/test/groovy/nextflow/dag/TaskDAGTest.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.dag

import java.nio.file.Paths

import com.google.common.hash.HashCode
import nextflow.processor.TaskRun
import spock.lang.Specification
/**
*
* @author Ben Sherman <[email protected]>
*/
class TaskDAGTest extends Specification {

def 'should add task vertices and outputs' () {

given:
def task1 = Mock(TaskRun) {
getInputFilesMap() >> [
'data.txt': Paths.get('/inputs/data.txt')
]
getOutputsByType(_) >> [
'data.foo': Paths.get('/work/00112233/data.foo')
]
}
def task2 = Mock(TaskRun) {
getInputFilesMap() >> [
'data.foo': Paths.get('/work/00112233/data.foo')
]
getOutputsByType(_) >> [
'data.bar': Paths.get('/work/aabbccdd/data.bar')
]
}
def dag = new TaskDAG()

when:
dag.addTask( task1 )
dag.addTask( task2 )
def v1 = dag.vertices[task1]
def v2 = dag.vertices[task2]
then:
v1.inputs.size() == 1
v1.inputs['data.txt'] == Paths.get('/inputs/data.txt')
and:
v2.inputs.size() == 1
v2.inputs['data.foo'] == Paths.get('/work/00112233/data.foo')

when:
dag.addTaskOutputs( task1 )
dag.addTaskOutputs( task2 )
then:
v1.outputs == [ Paths.get('/work/00112233/data.foo') ] as Set
and:
v2.outputs == [ Paths.get('/work/aabbccdd/data.bar') ] as Set
and:
dag.getProducerVertex(v1.inputs['data.txt']) == null
dag.getProducerVertex(v2.inputs['data.foo']) == v1
}

}
15 changes: 15 additions & 0 deletions modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@ import java.nio.file.attribute.FileTime
import java.nio.file.attribute.PosixFilePermission
import java.nio.file.attribute.PosixFilePermissions

import com.google.common.hash.Hashing
import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.transform.stc.ClosureParams
import groovy.transform.stc.FromString
import groovy.util.logging.Slf4j
import nextflow.file.ETagAwareFile
import nextflow.file.FileHelper
import nextflow.file.FileSystemPathFactory
import nextflow.io.ByteBufferBackedInputStream
import nextflow.util.CacheHelper
import nextflow.util.CharsetHelper
import nextflow.util.CheckHelper

Expand Down Expand Up @@ -1599,4 +1602,16 @@ class FilesEx {
static String getScheme(Path path) {
path.getFileSystem().provider().getScheme()
}

static String getChecksum(Path path) {
if( Files.isDirectory(path) )
return null

// use etag if available
if( path instanceof ETagAwareFile )
return path.getETag()

// otherwise compute checksum manually
CacheHelper.hasher(Hashing.md5(), path, CacheHelper.HashMode.DEEP).hash().toString()
}
}
Loading