Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ package akka.stream.impl

import scala.collection.immutable.Map.Map1
import scala.language.existentials

import akka.annotation.{ DoNotInherit, InternalApi }
import akka.stream._
import akka.stream.impl.StreamLayout.AtomicModule
import akka.stream.impl.TraversalBuilder.{ AnyFunction1, AnyFunction2 }
import akka.stream.impl.fusing.GraphStageModule
import akka.stream.impl.fusing.GraphStages.IterableSource
import akka.stream.impl.fusing.GraphStages.SingleSource
import akka.stream.scaladsl.Keep
import akka.stream.stage.GraphStage
import akka.util.OptionVal
import akka.util.unused

Expand Down Expand Up @@ -370,6 +371,42 @@ import akka.util.unused
}
}
}

/**
* Try to find `SingleSource` or wrapped such. This is used as a
* performance optimization in FlattenMerge and possibly other places.
*/
def getDirectPushableSource[A >: Null](graph: Graph[SourceShape[A], _]): OptionVal[GraphStage[SourceShape[A]]] = {
graph match {
case single: SingleSource[A] @unchecked => OptionVal.Some(single)
case iterable: IterableSource[A] @unchecked => OptionVal.Some(iterable)
case EmptySource => OptionVal.Some(EmptySource)
case _ =>
graph.traversalBuilder match {
case l: LinearTraversalBuilder =>
// It would be != EmptyTraversal if mapMaterializedValue was used and then we can't optimize.
if ((l.traversalSoFar ne EmptyTraversal) || l.attributes.isAsync) {
OptionVal.None
} else {
l.pendingBuilder match {
case OptionVal.Some(a: AtomicTraversalBuilder) =>
a.module match {
case m: GraphStageModule[_, _] =>
m.stage match {
case single: SingleSource[A] @unchecked => OptionVal.Some(single)
case iterable: IterableSource[A] @unchecked => OptionVal.Some(iterable)
case EmptySource => OptionVal.Some(EmptySource)
case _ => OptionVal.None
}
case _ => OptionVal.None
}
case _ => OptionVal.None
}
}
case _ => OptionVal.None
}
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ import akka.stream.Attributes.SourceLocation
import akka.stream.impl.{ Buffer => BufferImpl }
import akka.stream.impl.ActorSubscriberMessage
import akka.stream.impl.ActorSubscriberMessage.OnError
import akka.stream.impl.EmptySource
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.SubscriptionTimeoutException
import akka.stream.impl.TraversalBuilder
import akka.stream.impl.fusing.GraphStages.IterableSource
import akka.stream.impl.fusing.GraphStages.SingleSource
import akka.stream.scaladsl._
import akka.stream.stage._
import akka.util.OptionVal
import akka.util.ccompat.JavaConverters._

import scala.annotation.nowarn

/**
* INTERNAL API
*/
Expand All @@ -39,11 +43,11 @@ import akka.util.ccompat.JavaConverters._
override def initialAttributes = DefaultAttributes.flattenMerge
override val shape = FlowShape(in, out)

override def createLogic(enclosingAttributes: Attributes) =
new GraphStageLogic(shape) with OutHandler with InHandler {
override def createLogic(enclosingAttributes: Attributes): GraphStageLogic with InHandler with OutHandler =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to repeat the type here, we are implementing an abstract method

new GraphStageLogic(shape) with InHandler with OutHandler {
var sources = Set.empty[SubSinkInlet[T]]
var pendingSingleSources = 0
def activeSources = sources.size + pendingSingleSources
var pendingDirectPushSources = 0
def activeSources: Int = sources.size + pendingDirectPushSources

// To be able to optimize for SingleSource without materializing them the queue may hold either
// SubSinkInlet[T] or SingleSource
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update comment as well

Expand All @@ -60,15 +64,32 @@ import akka.util.ccompat.JavaConverters._
case single: SingleSource[T] @unchecked =>
push(out, single.elem)
removeSource(single)
case iterableSource: IterableSource[T] @unchecked =>
handleIterableSource(iterableSource)
case other =>
throw new IllegalStateException(s"Unexpected source type in queue: '${other.getClass}'")
}
}

@nowarn("msg=deprecated")
private def handleIterableSource(iterableSource: IterableSource[T]): Unit = {
val elements = iterableSource.elements
if (elements.hasDefiniteSize) {
if (elements.isEmpty) {
removeSource(iterableSource)
} else if (elements.size == 1) {
push(out, elements.head)
removeSource(iterableSource)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handling the case of known of more than 1 element is missing

}
} else {
emitMultiple(out, elements, () => removeSource(iterableSource))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not run with the SupervisionStrategy of IterableSource

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work, it will have to be something more clever, emitMultiple will replace the current handlers and emit until end so would only work for breadth = 1 (flatMapConcat), for merge it will turn it into concat.

There will have to be something more clever wrt combining the current open streams with current iterators, in the queue keeping their state across pulls from downstream for non 0-1 size cases.

}
}

override def onPush(): Unit = {
val source = grab(in)
addSource(source)
if (activeSources < breadth) tryPull(in)
if (activeSources < breadth && !hasBeenPulled(in)) tryPull(in)
}

override def onUpstreamFinish(): Unit = if (activeSources == 0) completeStage()
Expand All @@ -87,47 +108,73 @@ import akka.util.ccompat.JavaConverters._
def addSource(source: Graph[SourceShape[T], M]): Unit = {
// If it's a SingleSource or wrapped such we can push the element directly instead of materializing it.
// Have to use AnyRef because of OptionVal null value.
TraversalBuilder.getSingleSource(source.asInstanceOf[Graph[SourceShape[AnyRef], M]]) match {
case OptionVal.Some(single) =>
if (isAvailable(out) && queue.isEmpty) {
push(out, single.elem.asInstanceOf[T])
} else {
queue.enqueue(single)
pendingSingleSources += 1
}
case _ =>
val sinkIn = new SubSinkInlet[T]("FlattenMergeSink")
sinkIn.setHandler(new InHandler {
override def onPush(): Unit = {
if (isAvailable(out)) {
push(out, sinkIn.grab())
sinkIn.pull()
TraversalBuilder.getDirectPushableSource(source.asInstanceOf[Graph[SourceShape[AnyRef], M]]) match {
case OptionVal.Some(s) =>
s.asInstanceOf[GraphStage[SourceShape[T]]] match {
case single: SingleSource[T] @unchecked =>
if (isAvailable(out) && queue.isEmpty) {
push(out, single.elem)
} else {
queue.enqueue(sinkIn)
queue.enqueue(single)
pendingDirectPushSources += 1
}
}
override def onUpstreamFinish(): Unit = if (!sinkIn.isAvailable) removeSource(sinkIn)
})
sinkIn.pull()
sources += sinkIn
val graph = Source.fromGraph(source).to(sinkIn.sink)
interpreter.subFusingMaterializer.materialize(graph, defaultAttributes = enclosingAttributes)
case iterable: IterableSource[T] @unchecked =>
pendingDirectPushSources += 1
if (isAvailable(out) && queue.isEmpty) {
handleIterableSource(iterable)
} else {
queue.enqueue(iterable)
}
case EmptySource =>
tryPullOrComplete()
case _ =>
addSourceWithMaterialization(source)
}
case _ =>
addSourceWithMaterialization(source)
}
}

private def addSourceWithMaterialization(source: Graph[SourceShape[T], M]): Unit = {
val sinkIn = new SubSinkInlet[T]("FlattenMergeSink")
sinkIn.setHandler(new InHandler {
override def onPush(): Unit = {
if (isAvailable(out)) {
push(out, sinkIn.grab())
sinkIn.pull()
} else {
queue.enqueue(sinkIn)
}
}
override def onUpstreamFinish(): Unit = if (!sinkIn.isAvailable) removeSource(sinkIn)
})
sinkIn.pull()
sources += sinkIn
val graph = Source.fromGraph(source).to(sinkIn.sink)
interpreter.subFusingMaterializer.materialize(graph, defaultAttributes = enclosingAttributes)
}

def removeSource(src: AnyRef): Unit = {
val pullSuppressed = activeSources == breadth
src match {
case sub: SubSinkInlet[T] @unchecked =>
sources -= sub
case _: SingleSource[_] =>
pendingSingleSources -= 1
case _: SingleSource[_] | _: IterableSource[_] =>
pendingDirectPushSources -= 1
case other => throw new IllegalArgumentException(s"Unexpected source type: '${other.getClass}'")
}
if (pullSuppressed) tryPull(in)
if (activeSources == 0 && isClosed(in)) completeStage()
}

private def tryPullOrComplete(): Unit = {
if (activeSources < breadth) {
tryPull(in)
} else if (activeSources == 0 && isClosed(in)) {
completeStage()
}
}

override def postStop(): Unit = sources.foreach(_.cancel())

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ object Source {
apply(new immutable.Iterable[T] {
override def iterator: Iterator[T] = f()
override def toString: String = "() => Iterator"
override def hasDefiniteSize: Boolean = false
})

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ abstract class AbstractGraphStageWithMaterializedValue[+S <: Shape, M] extends G
* A GraphStage consists of a [[Shape]] which describes its input and output ports and a factory function that
* creates a [[GraphStageLogic]] which implements the processing logic that ties the ports together.
*/
abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, NotUsed] {
abstract class GraphStage[+S <: Shape] extends GraphStageWithMaterializedValue[S, NotUsed] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this variance change needed?

final override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, NotUsed) =
(createLogic(inheritedAttributes), NotUsed)

Expand Down