diff --git a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala index 0bf998c7372..c59a4d39d48 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TraversalBuilder.scala @@ -6,14 +6,15 @@ package akka.stream.impl import scala.collection.immutable.Map.Map1 import scala.language.existentials - import akka.annotation.{ DoNotInherit, InternalApi } import akka.stream._ import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.impl.TraversalBuilder.{ AnyFunction1, AnyFunction2 } import akka.stream.impl.fusing.GraphStageModule +import akka.stream.impl.fusing.GraphStages.IterableSource import akka.stream.impl.fusing.GraphStages.SingleSource import akka.stream.scaladsl.Keep +import akka.stream.stage.GraphStage import akka.util.OptionVal import akka.util.unused @@ -370,6 +371,42 @@ import akka.util.unused } } } + + /** + * Try to find `SingleSource` or wrapped such. This is used as a + * performance optimization in FlattenMerge and possibly other places. + */ + def getDirectPushableSource[A >: Null](graph: Graph[SourceShape[A], _]): OptionVal[GraphStage[SourceShape[A]]] = { + graph match { + case single: SingleSource[A] @unchecked => OptionVal.Some(single) + case iterable: IterableSource[A] @unchecked => OptionVal.Some(iterable) + case EmptySource => OptionVal.Some(EmptySource) + case _ => + graph.traversalBuilder match { + case l: LinearTraversalBuilder => + // It would be != EmptyTraversal if mapMaterializedValue was used and then we can't optimize. + if ((l.traversalSoFar ne EmptyTraversal) || l.attributes.isAsync) { + OptionVal.None + } else { + l.pendingBuilder match { + case OptionVal.Some(a: AtomicTraversalBuilder) => + a.module match { + case m: GraphStageModule[_, _] => + m.stage match { + case single: SingleSource[A] @unchecked => OptionVal.Some(single) + case iterable: IterableSource[A] @unchecked => OptionVal.Some(iterable) + case EmptySource => OptionVal.Some(EmptySource) + case _ => OptionVal.None + } + case _ => OptionVal.None + } + case _ => OptionVal.None + } + } + case _ => OptionVal.None + } + } + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index 0a975858214..c69973b7af2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -19,15 +19,19 @@ import akka.stream.Attributes.SourceLocation import akka.stream.impl.{ Buffer => BufferImpl } import akka.stream.impl.ActorSubscriberMessage import akka.stream.impl.ActorSubscriberMessage.OnError +import akka.stream.impl.EmptySource import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.SubscriptionTimeoutException import akka.stream.impl.TraversalBuilder +import akka.stream.impl.fusing.GraphStages.IterableSource import akka.stream.impl.fusing.GraphStages.SingleSource import akka.stream.scaladsl._ import akka.stream.stage._ import akka.util.OptionVal import akka.util.ccompat.JavaConverters._ +import scala.annotation.nowarn + /** * INTERNAL API */ @@ -39,11 +43,11 @@ import akka.util.ccompat.JavaConverters._ override def initialAttributes = DefaultAttributes.flattenMerge override val shape = FlowShape(in, out) - override def createLogic(enclosingAttributes: Attributes) = - new GraphStageLogic(shape) with OutHandler with InHandler { + override def createLogic(enclosingAttributes: Attributes): GraphStageLogic with InHandler with OutHandler = + new GraphStageLogic(shape) with InHandler with OutHandler { var sources = Set.empty[SubSinkInlet[T]] - var pendingSingleSources = 0 - def activeSources = sources.size + pendingSingleSources + var pendingDirectPushSources = 0 + def activeSources: Int = sources.size + pendingDirectPushSources // To be able to optimize for SingleSource without materializing them the queue may hold either // SubSinkInlet[T] or SingleSource @@ -60,15 +64,32 @@ import akka.util.ccompat.JavaConverters._ case single: SingleSource[T] @unchecked => push(out, single.elem) removeSource(single) + case iterableSource: IterableSource[T] @unchecked => + handleIterableSource(iterableSource) case other => throw new IllegalStateException(s"Unexpected source type in queue: '${other.getClass}'") } } + @nowarn("msg=deprecated") + private def handleIterableSource(iterableSource: IterableSource[T]): Unit = { + val elements = iterableSource.elements + if (elements.hasDefiniteSize) { + if (elements.isEmpty) { + removeSource(iterableSource) + } else if (elements.size == 1) { + push(out, elements.head) + removeSource(iterableSource) + } + } else { + emitMultiple(out, elements, () => removeSource(iterableSource)) + } + } + override def onPush(): Unit = { val source = grab(in) addSource(source) - if (activeSources < breadth) tryPull(in) + if (activeSources < breadth && !hasBeenPulled(in)) tryPull(in) } override def onUpstreamFinish(): Unit = if (activeSources == 0) completeStage() @@ -87,47 +108,73 @@ import akka.util.ccompat.JavaConverters._ def addSource(source: Graph[SourceShape[T], M]): Unit = { // If it's a SingleSource or wrapped such we can push the element directly instead of materializing it. // Have to use AnyRef because of OptionVal null value. - TraversalBuilder.getSingleSource(source.asInstanceOf[Graph[SourceShape[AnyRef], M]]) match { - case OptionVal.Some(single) => - if (isAvailable(out) && queue.isEmpty) { - push(out, single.elem.asInstanceOf[T]) - } else { - queue.enqueue(single) - pendingSingleSources += 1 - } - case _ => - val sinkIn = new SubSinkInlet[T]("FlattenMergeSink") - sinkIn.setHandler(new InHandler { - override def onPush(): Unit = { - if (isAvailable(out)) { - push(out, sinkIn.grab()) - sinkIn.pull() + TraversalBuilder.getDirectPushableSource(source.asInstanceOf[Graph[SourceShape[AnyRef], M]]) match { + case OptionVal.Some(s) => + s.asInstanceOf[GraphStage[SourceShape[T]]] match { + case single: SingleSource[T] @unchecked => + if (isAvailable(out) && queue.isEmpty) { + push(out, single.elem) } else { - queue.enqueue(sinkIn) + queue.enqueue(single) + pendingDirectPushSources += 1 } - } - override def onUpstreamFinish(): Unit = if (!sinkIn.isAvailable) removeSource(sinkIn) - }) - sinkIn.pull() - sources += sinkIn - val graph = Source.fromGraph(source).to(sinkIn.sink) - interpreter.subFusingMaterializer.materialize(graph, defaultAttributes = enclosingAttributes) + case iterable: IterableSource[T] @unchecked => + pendingDirectPushSources += 1 + if (isAvailable(out) && queue.isEmpty) { + handleIterableSource(iterable) + } else { + queue.enqueue(iterable) + } + case EmptySource => + tryPullOrComplete() + case _ => + addSourceWithMaterialization(source) + } + case _ => + addSourceWithMaterialization(source) } } + private def addSourceWithMaterialization(source: Graph[SourceShape[T], M]): Unit = { + val sinkIn = new SubSinkInlet[T]("FlattenMergeSink") + sinkIn.setHandler(new InHandler { + override def onPush(): Unit = { + if (isAvailable(out)) { + push(out, sinkIn.grab()) + sinkIn.pull() + } else { + queue.enqueue(sinkIn) + } + } + override def onUpstreamFinish(): Unit = if (!sinkIn.isAvailable) removeSource(sinkIn) + }) + sinkIn.pull() + sources += sinkIn + val graph = Source.fromGraph(source).to(sinkIn.sink) + interpreter.subFusingMaterializer.materialize(graph, defaultAttributes = enclosingAttributes) + } + def removeSource(src: AnyRef): Unit = { val pullSuppressed = activeSources == breadth src match { case sub: SubSinkInlet[T] @unchecked => sources -= sub - case _: SingleSource[_] => - pendingSingleSources -= 1 + case _: SingleSource[_] | _: IterableSource[_] => + pendingDirectPushSources -= 1 case other => throw new IllegalArgumentException(s"Unexpected source type: '${other.getClass}'") } if (pullSuppressed) tryPull(in) if (activeSources == 0 && isClosed(in)) completeStage() } + private def tryPullOrComplete(): Unit = { + if (activeSources < breadth) { + tryPull(in) + } else if (activeSources == 0 && isClosed(in)) { + completeStage() + } + } + override def postStop(): Unit = sources.foreach(_.cancel()) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index c9b8467c3d7..3adc6479325 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -272,6 +272,7 @@ object Source { apply(new immutable.Iterable[T] { override def iterator: Iterator[T] = f() override def toString: String = "() => Iterator" + override def hasDefiniteSize: Boolean = false }) /** diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index fc2f4a4406f..dc4a89e5386 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -101,7 +101,7 @@ abstract class AbstractGraphStageWithMaterializedValue[+S <: Shape, M] extends G * A GraphStage consists of a [[Shape]] which describes its input and output ports and a factory function that * creates a [[GraphStageLogic]] which implements the processing logic that ties the ports together. */ -abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, NotUsed] { +abstract class GraphStage[+S <: Shape] extends GraphStageWithMaterializedValue[S, NotUsed] { final override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, NotUsed) = (createLogic(inheritedAttributes), NotUsed)