Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/Channel.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ class Channel {
return CH.queue()
}

static DataflowWriteChannel topic(String name) {
if( !NF.topicChannelEnabled ) throw new MissingMethodException('topic', Channel.class, InvokerHelper.EMPTY_ARGS)
return CH.topic(name)
}

/**
* Create a empty channel i.e. only emits a STOP signal
*
Expand Down
4 changes: 4 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/NF.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,8 @@ class NF {
static boolean isRecurseEnabled() {
NextflowMeta.instance.preview.recursion
}

static boolean isTopicChannelEnabled() {
NextflowMeta.instance.preview.topic
}
}
7 changes: 7 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class NextflowMeta {
volatile float dsl
boolean strict
boolean recursion
boolean topic

void setDsl( float num ) {
if( num == 1 )
Expand All @@ -59,6 +60,12 @@ class NextflowMeta {
log.warn "NEXTFLOW RECURSION IS A PREVIEW FEATURE - SYNTAX AND FUNCTIONALITY CAN CHANGE IN FUTURE RELEASES"
this.recursion = recurse
}

void setTopic(Boolean value) {
if( topic )
log.warn "TOPIC CHANNELS ARE A PREVIEW FEATURE - SYNTAX AND FUNCTIONALITY CAN CHANGE IN FUTURE RELEASES"
this.topic = value
}
}

static class Features implements Flags {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -981,15 +981,16 @@ class NextflowDSLImpl implements ASTTransformation {

/**
* Transform a map entry `emit: something` into `emit: 'something'
* and `topic: something` into `topic: 'something'
* (ie. as a constant) in a map expression passed as argument to
* a method call. This allow the syntax
*
* output:
* path 'foo', emit: bar
* path 'foo', emit: bar, topic: baz
*
* @param call
*/
protected void fixOutEmitOption(MethodCallExpression call) {
protected void fixOutEmitAndTopicOptions(MethodCallExpression call) {
List<Expression> args = isTupleX(call.arguments)?.expressions
if( !args ) return
if( args.size()<2 && (args.size()!=1 || call.methodAsString!='_out_stdout')) return
Expand All @@ -1002,6 +1003,9 @@ class NextflowDSLImpl implements ASTTransformation {
if( key?.text == 'emit' && val ) {
map.mapEntryExpressions[i] = new MapEntryExpression(key, constX(val.text))
}
else if( key?.text == 'topic' && val ) {
map.mapEntryExpressions[i] = new MapEntryExpression(key, constX(val.text))
}
}
}

Expand All @@ -1021,7 +1025,7 @@ class NextflowDSLImpl implements ASTTransformation {
// prefix the method name with the string '_out_'
methodCall.setMethod( new ConstantExpression('_out_' + methodName) )
fixMethodCall(methodCall)
fixOutEmitOption(methodCall)
fixOutEmitAndTopicOptions(methodCall)
}

else if( methodName in ['into','mode'] ) {
Expand Down
68 changes: 64 additions & 4 deletions modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package nextflow.extension

import static nextflow.Channel.*

import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
Expand All @@ -15,8 +17,6 @@ import nextflow.Channel
import nextflow.Global
import nextflow.NF
import nextflow.Session
import static nextflow.Channel.STOP

/**
* Helper class to handle channel internal api ops
*
Expand All @@ -30,7 +30,15 @@ class CH {
return (Session) Global.session
}

static private Map<DataflowQueue, DataflowBroadcast> bridges = new HashMap<>(10)
static class Topic {
String name
DataflowBroadcast broadcaster = new DataflowBroadcast()
List<DataflowWriteChannel> writers = new ArrayList<>(10)
}

static final private List<Topic> allTopics = new ArrayList<>(10)

static final private Map<DataflowQueue, DataflowBroadcast> bridges = new HashMap<>(10)

static DataflowReadChannel getReadChannel(channel) {
if (channel instanceof DataflowQueue)
Expand Down Expand Up @@ -68,14 +76,41 @@ class CH {
}

static void broadcast() {
// connect all dataflow queue variables to associated broadcast channel
// connect all broadcast topics, note this must be before the following
// "bridging" step because it can modify the final network topology
connectTopics()
// bridge together all broadcast channels
bridgeChannels()
}

static private void bridgeChannels() {
// connect all dataflow queue variables to associated broadcast channel
for( DataflowQueue queue : bridges.keySet() ) {
log.trace "Bridging dataflow queue=$queue"
def broadcast = bridges.get(queue)
queue.into(broadcast)
}
}

static private void connectTopics() {
for( Topic topic : allTopics ) {
if( topic.writers ) {
// the list of all writing dataflow queues for this topic
final ch = new ArrayList(topic.writers)
// the mix operator requires at least two sources, add an empty channel if needed
if( ch.size()==1 )
ch.add(empty())
// get a list of sources for the mix operator
final sources = ch.collect(it -> getReadChannel(it))
// mix all of them
new MixOp(sources).withTarget(topic.broadcaster).apply()
}
else {
topic.broadcaster.bind(STOP)
}
}
}

static void init() { bridges.clear() }

@PackageScope
Expand Down Expand Up @@ -104,6 +139,31 @@ class CH {
return new DataflowQueue()
}

static DataflowBroadcast topic(String name) {
synchronized (allTopics) {
def topic = allTopics.find(it -> it.name == name)
if( topic!=null )
return topic.broadcaster
// create a new topic
topic = new Topic(name:name)
allTopics.add(topic)
return topic.broadcaster
}
}

static DataflowWriteChannel topicWriter(String name) {
synchronized (allTopics) {
def topic = allTopics.find(it -> it.name == name)
if( topic==null ) {
topic = new Topic(name:name)
allTopics.add(topic)
}
def result = CH.create()
topic.writers.add(result)
return result
}
}

static boolean isChannel(obj) {
obj instanceof DataflowReadChannel || obj instanceof DataflowWriteChannel
}
Expand Down
17 changes: 15 additions & 2 deletions modules/nextflow/src/main/groovy/nextflow/extension/MixOp.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import groovy.transform.CompileStatic
import groovyx.gpars.dataflow.DataflowReadChannel
import groovyx.gpars.dataflow.DataflowWriteChannel
import nextflow.Channel

/**
* Implements Nextflow Mix operator
*
Expand All @@ -36,6 +35,7 @@ class MixOp {

private DataflowReadChannel source
private List<DataflowReadChannel> others
private DataflowWriteChannel target

MixOp(DataflowReadChannel source, DataflowReadChannel other) {
this.source = source
Expand All @@ -47,8 +47,21 @@ class MixOp {
this.others = others.toList()
}

MixOp(List<DataflowReadChannel> channels) {
if( channels.size()<2 )
throw new IllegalArgumentException("Mix operator requires at least 2 source channels")
this.source = channels.get(0)
this.others = channels.subList(1, channels.size())
}

MixOp withTarget(DataflowWriteChannel target) {
this.target = target
return this
}

DataflowWriteChannel apply() {
def target = CH.create()
if( target == null )
target = CH.create()
def count = new AtomicInteger( others.size()+1 )
def handlers = [
onNext: { target << it },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import nextflow.script.params.BaseOutParam
import nextflow.script.params.EachInParam
import nextflow.script.params.InputsList
import nextflow.script.params.OutputsList

/**
* Models a nextflow process definition
*
Expand Down Expand Up @@ -194,8 +193,14 @@ class ProcessDef extends BindableDef implements IterableDef, ChainableDef {
throw new ScriptRuntimeException("Process `$processName` inputs and outputs do not have the same cardinality - Feedback loop is not supported" )

for(int i=0; i<declaredOutputs.size(); i++ ) {
final ch = feedbackChannels ? feedbackChannels[i] : CH.create(singleton)
(declaredOutputs[i] as BaseOutParam).setInto(ch)
final param = (declaredOutputs[i] as BaseOutParam)
final topicName = param.channelTopicName
if( topicName && feedbackChannels )
throw new IllegalArgumentException("Output topic conflict with recursion feature - check process '$name' should not declared any output 'topic'" )
final ch = feedbackChannels
? feedbackChannels[i]
: ( topicName ? CH.topicWriter(topicName) : CH.create(singleton) )
param.setInto(ch)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ abstract class BaseOutParam extends BaseParam implements OutParam {

String channelEmitName

String channelTopicName

BaseOutParam( Binding binding, List list, short ownerIndex = -1) {
super(binding,list,ownerIndex)
}
Expand Down Expand Up @@ -211,4 +213,18 @@ abstract class BaseOutParam extends BaseParam implements OutParam {
this.channelEmitName = value
return this
}

BaseOutParam setTopic( String name ) {
if( isNestedParam() )
throw new IllegalArgumentException("Output `topic` option it not allowed in tuple components")
if( !name )
throw new IllegalArgumentException("Missing output `topic` name")
if( !ConfigHelper.isValidIdentifier(name) ) {
final msg = "Output topic '$name' is not a valid name -- Make sure it starts with an alphabetic or underscore character and it does not contain any blank, dot or other special characters"
throw new IllegalArgumentException(msg)
}

this.channelTopicName = name
return this
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,6 @@ interface OutParam extends Cloneable {

String getChannelEmitName()

String getChannelTopicName()

}
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ class ParamsOutTest extends Dsl2Spec {

process hola {
output:
path "${x}_name", emit: aaa
path "${x}_name", emit: aaa, topic: 'foo'
path "${x}_${y}.fa", emit: bbb
path "simple.txt", emit: ccc
path "data/sub/dir/file:${x}.fa", emit: ddd
Expand Down Expand Up @@ -840,6 +840,7 @@ class ParamsOutTest extends Dsl2Spec {
out0.isDynamic()
out0.isPathQualifier()
out0.channelEmitName == 'aaa'
out0.channelTopicName == 'foo'

out1.name == null
out1.getFilePatterns(ctx,null) == ['hola_99.fa']
Expand Down Expand Up @@ -1196,4 +1197,84 @@ class ParamsOutTest extends Dsl2Spec {
outs[2].inner[1].name == 'bar'

}


def 'should define out with topic' () {
setup:
def text = '''
process hola {
output:
val x, topic: ch0
env FOO, topic: ch1
path '-', topic: ch2
stdout topic: ch3
/return/
}

workflow { hola() }
'''

def binding = [:]
def process = parseAndReturnProcess(text, binding)

when:
def outs = process.config.getOutputs() as List<OutParam>
then:
outs[0].name == 'x'
outs[0].channelTopicName == 'ch0'
and:
outs[1].name == 'FOO'
outs[1].channelTopicName == 'ch1'
and:
outs[2] instanceof StdOutParam // <-- note: declared as `path`, turned into a `stdout`
outs[2].name == '-'
outs[2].channelTopicName == 'ch2'
and:
outs[3] instanceof StdOutParam
outs[3].name == '-'
outs[3].channelTopicName == 'ch3'
}

def 'should define out tuple with topic'() {

setup:
def text = '''
process hola {
output:
tuple val(x), val(y), topic: ch1
tuple path('foo'), topic: ch2
tuple stdout,env(bar), topic: ch3

/return/
}

workflow { hola() }
'''

def binding = [:]
def process = parseAndReturnProcess(text, binding)

when:
def outs = process.config.getOutputs() as List<TupleOutParam>

then:
outs[0].name == 'tupleoutparam<0>'
outs[0].channelTopicName == 'ch1'
outs[0].inner[0] instanceof ValueOutParam
outs[0].inner[0].name == 'x'
outs[0].inner[1] instanceof ValueOutParam
outs[0].inner[1].name == 'y'
and:
outs[1].name == 'tupleoutparam<1>'
outs[1].channelTopicName == 'ch2'
outs[1].inner[0] instanceof FileOutParam
and:
outs[2].name == 'tupleoutparam<2>'
outs[2].channelTopicName == 'ch3'
outs[2].inner[0] instanceof StdOutParam
outs[2].inner[0].name == '-'
outs[2].inner[1] instanceof EnvOutParam
outs[2].inner[1].name == 'bar'

}
}