diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index 9109d503ea..f844b87349 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -39,6 +39,7 @@ import nextflow.conda.CondaConfig import nextflow.config.Manifest import nextflow.container.ContainerConfig import nextflow.dag.DAG +import nextflow.dag.TaskDAG import nextflow.exception.AbortOperationException import nextflow.exception.AbortSignalException import nextflow.exception.IllegalConfigException @@ -204,6 +205,8 @@ class Session implements ISession { private DAG dag + private TaskDAG taskDag + private CacheDB cache private Barrier processesBarrier = new Barrier() @@ -356,6 +359,7 @@ class Session implements ISession { // -- DAG object this.dag = new DAG() + this.taskDag = new TaskDAG() // -- init work dir this.workDir = ((config.workDir ?: 'work') as Path).complete() @@ -812,6 +816,8 @@ class Session implements ISession { DAG getDag() { this.dag } + TaskDAG getTaskDag() { this.taskDag } + ExecutorService getExecService() { execService } /** @@ -980,6 +986,8 @@ class Session implements ISession { void notifyTaskSubmit( TaskHandler handler ) { final task = handler.task log.info "[${task.hashLog}] ${task.runType.message} > ${task.name}" + // -- update task graph + taskDag.addTask(task) // -- save a record in the cache index cache.putIndexAsync(handler) @@ -1017,8 +1025,12 @@ class Session implements ISession { * @param handler */ void notifyTaskComplete( TaskHandler handler ) { + // update task graph + taskDag.addTaskOutputs(handler.task) + // save the completed task in the cache DB final trace = handler.safeTraceRecord() + taskDag.saveToRecord(handler.task, trace) cache.putTaskAsync(handler, trace) // notify the event to the observers @@ -1034,6 +1046,10 @@ class Session implements ISession { } void notifyTaskCached( TaskHandler handler ) { + // update task graph + taskDag.addTask(handler.task) + taskDag.addTaskOutputs(handler.task) + final trace = handler.getTraceRecord() // save a record in the cache index only the when the trace record is available // otherwise it means that the event is trigger by a `stored dir` driven task diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/DAG.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/DAG.groovy index e61dac5985..414d664e9c 100644 --- a/modules/nextflow/src/main/groovy/nextflow/dag/DAG.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/dag/DAG.groovy @@ -43,7 +43,7 @@ import nextflow.script.params.TupleOutParam import java.util.concurrent.atomic.AtomicLong /** - * Model a direct acyclic graph of the pipeline execution. + * Model the directed acyclic graph of the workflow definition. * * @author Paolo Di Tommaso */ diff --git a/modules/nextflow/src/main/groovy/nextflow/dag/TaskDAG.groovy b/modules/nextflow/src/main/groovy/nextflow/dag/TaskDAG.groovy new file mode 100644 index 0000000000..328670d646 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/dag/TaskDAG.groovy @@ -0,0 +1,142 @@ +/* + * Copyright 2013-2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.dag + +import java.nio.file.Files +import java.nio.file.Path +import java.util.concurrent.locks.Lock +import java.util.concurrent.locks.ReentrantLock + +import groovy.transform.CompileStatic +import groovy.transform.TupleConstructor +import groovy.util.logging.Slf4j +import nextflow.extension.FilesEx +import nextflow.processor.TaskRun +import nextflow.script.params.FileOutParam +import nextflow.trace.TraceRecord +/** + * Model the directed acyclic graph of the workflow execution. + * + * @author Ben Sherman + */ +@Slf4j +@CompileStatic +class TaskDAG { + + private Map vertices = new HashMap<>() + + private Map taskLookup = new HashMap<>() + + private Lock sync = new ReentrantLock() + + Map getVertices() { vertices } + + /** + * Add a task to the graph. + * + * @param task + */ + void addTask(TaskRun task) { + final inputs = task.getInputFilesMap() + + sync.lock() + try { + // add new task to graph + vertices[task] = new Vertex(inputs) + } + finally { + sync.unlock() + } + } + + /** + * Add a task's outputs to the graph. + * + * @param task + */ + void addTaskOutputs(TaskRun task) { + final outputs = task + .getOutputsByType(FileOutParam) + .values() + .flatten() as Set + + sync.lock() + try { + // add task outputs to graph + vertices[task].outputs = outputs + + // add new output files to task lookup + for( Path path : outputs ) + taskLookup[path] = task + } + finally { + sync.unlock() + } + } + + /** + * Get the task that produced the given file. + * + * @param path + */ + TaskRun getProducerTask(Path path) { + taskLookup[path] + } + + /** + * Get the vertex for the task that produced the given file. + * + * @param path + */ + Vertex getProducerVertex(Path path) { + vertices[taskLookup[path]] + } + + /** + * Save task input and output metadata to trace record. + * + * @param task + * @param record + */ + void saveToRecord(TaskRun task, TraceRecord record) { + final vertex = vertices[task] + + record.inputs = vertex.inputs.collect { name, path -> + final producer = getProducerTask(path) + new TraceRecord.Input( + name, + path, + producer ? producer.hash.toString() : null) + } + + record.outputs = vertex.outputs.collect { path -> + new TraceRecord.Output( + path, + Files.size(path), + FilesEx.getChecksum(path)) + } + + log.info "task ${task.name} ; inputs: ${record.inputs} ; outputs: ${record.outputs}" + } + + @TupleConstructor(excludes = 'outputs') + static class Vertex { + Map inputs + Set outputs + } + +} diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy index 93b5b6856b..339edb77fe 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TraceRecord.groovy @@ -23,6 +23,8 @@ import groovy.json.StringEscapeUtils import groovy.transform.CompileStatic import groovy.transform.Memoized import groovy.transform.PackageScope +import groovy.transform.ToString +import groovy.transform.TupleConstructor import groovy.util.logging.Slf4j import nextflow.cloud.types.CloudMachineInfo import nextflow.extension.Bolts @@ -609,4 +611,36 @@ class TraceRecord implements Serializable { this.machineInfo = value } + List getInputs() { + return store.inputs as List + } + + void setInputs(List inputs) { + store.inputs = inputs + } + + List getOutputs() { + return store.outputs as List + } + + void setOutputs(List outputs) { + store.outputs = outputs + } + + @TupleConstructor + @ToString + static class Input { + String stageName + Path storePath + String source + } + + @TupleConstructor + @ToString + static class Output { + Path path + long size + String checksum + } + } diff --git a/modules/nextflow/src/test/groovy/nextflow/dag/TaskDAGTest.groovy b/modules/nextflow/src/test/groovy/nextflow/dag/TaskDAGTest.groovy new file mode 100644 index 0000000000..fa5ba19f24 --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/dag/TaskDAGTest.groovy @@ -0,0 +1,75 @@ +/* + * Copyright 2013-2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.dag + +import java.nio.file.Paths + +import com.google.common.hash.HashCode +import nextflow.processor.TaskRun +import spock.lang.Specification +/** + * + * @author Ben Sherman + */ +class TaskDAGTest extends Specification { + + def 'should add task vertices and outputs' () { + + given: + def task1 = Mock(TaskRun) { + getInputFilesMap() >> [ + 'data.txt': Paths.get('/inputs/data.txt') + ] + getOutputsByType(_) >> [ + 'data.foo': Paths.get('/work/00112233/data.foo') + ] + } + def task2 = Mock(TaskRun) { + getInputFilesMap() >> [ + 'data.foo': Paths.get('/work/00112233/data.foo') + ] + getOutputsByType(_) >> [ + 'data.bar': Paths.get('/work/aabbccdd/data.bar') + ] + } + def dag = new TaskDAG() + + when: + dag.addTask( task1 ) + dag.addTask( task2 ) + def v1 = dag.vertices[task1] + def v2 = dag.vertices[task2] + then: + v1.inputs.size() == 1 + v1.inputs['data.txt'] == Paths.get('/inputs/data.txt') + and: + v2.inputs.size() == 1 + v2.inputs['data.foo'] == Paths.get('/work/00112233/data.foo') + + when: + dag.addTaskOutputs( task1 ) + dag.addTaskOutputs( task2 ) + then: + v1.outputs == [ Paths.get('/work/00112233/data.foo') ] as Set + and: + v2.outputs == [ Paths.get('/work/aabbccdd/data.bar') ] as Set + and: + dag.getProducerVertex(v1.inputs['data.txt']) == null + dag.getProducerVertex(v2.inputs['data.foo']) == v1 + } + +} diff --git a/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy b/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy index 389dab5107..86c70f596b 100644 --- a/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy +++ b/modules/nf-commons/src/main/nextflow/extension/FilesEx.groovy @@ -35,14 +35,17 @@ import java.nio.file.attribute.FileTime import java.nio.file.attribute.PosixFilePermission import java.nio.file.attribute.PosixFilePermissions +import com.google.common.hash.Hashing import groovy.transform.CompileStatic import groovy.transform.PackageScope import groovy.transform.stc.ClosureParams import groovy.transform.stc.FromString import groovy.util.logging.Slf4j +import nextflow.file.ETagAwareFile import nextflow.file.FileHelper import nextflow.file.FileSystemPathFactory import nextflow.io.ByteBufferBackedInputStream +import nextflow.util.CacheHelper import nextflow.util.CharsetHelper import nextflow.util.CheckHelper @@ -1599,4 +1602,16 @@ class FilesEx { static String getScheme(Path path) { path.getFileSystem().provider().getScheme() } + + static String getChecksum(Path path) { + if( Files.isDirectory(path) ) + return null + + // use etag if available + if( path instanceof ETagAwareFile ) + return path.getETag() + + // otherwise compute checksum manually + CacheHelper.hasher(Hashing.md5(), path, CacheHelper.HashMode.DEEP).hash().toString() + } } diff --git a/modules/nf-commons/src/main/nextflow/file/ETagAwareFile.groovy b/modules/nf-commons/src/main/nextflow/file/ETagAwareFile.groovy new file mode 100644 index 0000000000..f1c40073b3 --- /dev/null +++ b/modules/nf-commons/src/main/nextflow/file/ETagAwareFile.groovy @@ -0,0 +1,29 @@ +/* + * Copyright 2013-2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.file + +/** + * Defines the interface for files that have an ETag + * + * @author Ben Sherman + */ +interface ETagAwareFile { + + String getETag() + +} diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Path.java b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Path.java index 2a5e193b8c..ff4cbf679d 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Path.java +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Path.java @@ -42,13 +42,14 @@ import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import nextflow.file.ETagAwareFile; import nextflow.file.TagAwareFile; import static com.google.common.collect.Iterables.concat; import static com.google.common.collect.Iterables.filter; import static com.google.common.collect.Iterables.transform; import static java.lang.String.format; -public class S3Path implements Path, TagAwareFile { +public class S3Path implements Path, ETagAwareFile, TagAwareFile { public static final String PATH_SEPARATOR = "/"; /** @@ -562,6 +563,14 @@ public String getContentType() { return contentType; } + @Override + public String getETag() { + return fileSystem + .getClient() + .getObjectMetadata(getBucket(), getKey()) + .getETag(); + } + public String getStorageClass() { return storageClass; } diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileAttributes.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileAttributes.groovy index a9960aff56..047a4e0ce3 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileAttributes.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzFileAttributes.groovy @@ -46,6 +46,8 @@ class AzFileAttributes implements BasicFileAttributes { private String objectId + private String etag + static AzFileAttributes root() { new AzFileAttributes(size: 0, objectId: '/', directory: true) } @@ -60,6 +62,7 @@ class AzFileAttributes implements BasicFileAttributes { updateTime = time(props.getLastModified()) directory = client.blobName.endsWith('/') size = props.getBlobSize() + etag = props.getETag() // Support for Azure Data Lake Storage Gen2 with hierarchical namespace enabled final meta = props.getMetadata() @@ -75,6 +78,7 @@ class AzFileAttributes implements BasicFileAttributes { creationTime = time(item.properties.getCreationTime()) updateTime = time(item.properties.getLastModified()) size = item.properties.getContentLength() + etag = item.properties.getETag() } } @@ -150,6 +154,10 @@ class AzFileAttributes implements BasicFileAttributes { return objectId } + String getETag() { + return etag + } + @Override boolean equals( Object obj ) { if( this.class != obj?.class ) return false diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzPath.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzPath.groovy index 2f654b4ad8..3bdd222f6b 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzPath.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/nio/AzPath.groovy @@ -29,6 +29,7 @@ import com.azure.storage.blob.models.BlobItem import groovy.transform.CompileStatic import groovy.transform.EqualsAndHashCode import groovy.transform.PackageScope +import nextflow.file.ETagAwareFile /** * Implements Azure path object @@ -37,7 +38,7 @@ import groovy.transform.PackageScope */ @CompileStatic @EqualsAndHashCode(includes = 'fs,path,directory', includeFields = true) -class AzPath implements Path { +class AzPath implements Path, ETagAwareFile { private AzFileSystem fs @@ -305,6 +306,11 @@ class AzPath implements Path { return this.toString() <=> other.toString() } + @Override + String getETag() { + return attributes.getETag() + } + String getContainerName() { if( path.isAbsolute() ) { path.nameCount==0 ? '/' : path.getName(0)