-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Optimize flatMapConcat for iterable and empty source. #31602
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
cc8f08f to
accdcc3
Compare
| push(out, elements.head) | ||
| removeSource(iterableSource) | ||
| } else { | ||
| emitMultiple(out, elements, () => removeSource(iterableSource)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not run with the SupervisionStrategy of IterableSource
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't work, it will have to be something more clever, emitMultiple will replace the current handlers and emit until end so would only work for breadth = 1 (flatMapConcat), for merge it will turn it into concat.
There will have to be something more clever wrt combining the current open streams with current iterators, in the queue keeping their state across pulls from downstream for non 0-1 size cases.
|
|
||
| private def handleIterableSource(iterableSource: IterableSource[T]): Unit = { | ||
| val elements = iterableSource.elements | ||
| if (elements.knownSize == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not available in 2.13.x, maybe hasDefiniteSize
accdcc3 to
856524c
Compare
johanandren
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good one to optimize, but tricky
|
|
||
| override def createLogic(enclosingAttributes: Attributes) = | ||
| new GraphStageLogic(shape) with OutHandler with InHandler { | ||
| override def createLogic(enclosingAttributes: Attributes): GraphStageLogic with InHandler with OutHandler = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to repeat the type here, we are implementing an abstract method
| * creates a [[GraphStageLogic]] which implements the processing logic that ties the ports together. | ||
| */ | ||
| abstract class GraphStage[S <: Shape] extends GraphStageWithMaterializedValue[S, NotUsed] { | ||
| abstract class GraphStage[+S <: Shape] extends GraphStageWithMaterializedValue[S, NotUsed] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this variance change needed?
| def activeSources: Int = sources.size + pendingDirectPushSources | ||
|
|
||
| // To be able to optimize for SingleSource without materializing them the queue may hold either | ||
| // SubSinkInlet[T] or SingleSource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update comment as well
| push(out, elements.head) | ||
| removeSource(iterableSource) | ||
| } else { | ||
| emitMultiple(out, elements, () => removeSource(iterableSource)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't work, it will have to be something more clever, emitMultiple will replace the current handlers and emit until end so would only work for breadth = 1 (flatMapConcat), for merge it will turn it into concat.
There will have to be something more clever wrt combining the current open streams with current iterators, in the queue keeping their state across pulls from downstream for non 0-1 size cases.
| removeSource(iterableSource) | ||
| } else if (elements.size == 1) { | ||
| push(out, elements.head) | ||
| removeSource(iterableSource) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handling the case of known of more than 1 element is missing
|
|
|
was done in #32024 |
A follow up of #31372.
refs: #21462
Draft status, just make the test pass and the
SupervisionStrategyis not handleIf this is acceptable I will try with
Source.fromFuturelater in a separated pr.single source case is already done in : #25242
Run with main branch:

Run with this branch:
