From 52618a734208db1623eefc9c7788ead1d1332ae8 Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Mon, 13 Mar 2023 20:47:07 +0000 Subject: [PATCH 01/16] groupwithin new implementation --- core/shared/src/main/scala/fs2/Stream.scala | 148 +++++++----------- .../scala/fs2/StreamCombinatorsSuite.scala | 107 ++++++++++++- 2 files changed, 163 insertions(+), 92 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 96f09ec383..b65438c1a3 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1404,106 +1404,74 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, def groupWithin[F2[x] >: F[x]]( chunkSize: Int, timeout: FiniteDuration - )(implicit F: Temporal[F2]): Stream[F2, Chunk[O]] = { - - case class JunctionBuffer[T]( - data: Vector[T], - endOfSupply: Option[Either[Throwable, Unit]], - endOfDemand: Option[Either[Throwable, Unit]] - ) { - def splitAt(n: Int): (JunctionBuffer[T], JunctionBuffer[T]) = - if (this.data.size >= n) { - val (head, tail) = this.data.splitAt(n.toInt) - (this.copy(tail), this.copy(head)) - } else { - (this.copy(Vector.empty), this) - } - } + )(implicit F: Temporal[F2]): Stream[F2, Chunk[O]] = + if (timeout.toNanos == 0 || chunkSize == 1) chunkN(chunkSize) + else + Stream.force { + for { + supply <- Semaphore[F2](0) + supplyEnded <- SignallingRef.of[F2, Boolean](false) + buffer <- Queue.bounded[F2, O](chunkSize) // buffering and backpressure + awaitFirstChunk = Stream.fromQueueUnterminated(buffer, chunkSize).chunkMin(1).head + } yield { + // note: it can produce an empty chunk + val emitChunk: F2[Chunk[O]] = buffer.tryTakeN(Some(chunkSize)).map(Chunk.seq) - val outputLong = chunkSize.toLong - fs2.Stream.force { - for { - demand <- Semaphore[F2](outputLong) - supply <- Semaphore[F2](0L) - buffer <- Ref[F2].of( - JunctionBuffer[O](Vector.empty[O], endOfSupply = None, endOfDemand = None) - ) - } yield { - /* - Buffer: stores items from input to be sent on next output chunk - * - Demand Semaphore: to avoid adding too many items to buffer - * - Supply: counts filled positions for next output chunk */ - def enqueue(t: O): F2[Boolean] = - for { - _ <- demand.acquire - buf <- buffer.modify(buf => (buf.copy(buf.data :+ t), buf)) - _ <- supply.release - } yield buf.endOfDemand.isEmpty - - val dequeueNextOutput: F2[Option[Vector[O]]] = { - // Trigger: waits until the supply buffer is full (with acquireN) - val waitSupply = supply.acquireN(outputLong).guaranteeCase { - case Outcome.Succeeded(_) => supply.releaseN(outputLong) - case _ => F.unit + // we need to check the buffer size, rather than the available supply since + // the supply is increased at the end so it won't always report the buffer size accurately + val isBufferEmpty: F2[Boolean] = buffer.size.map(_ == 0) + + val streamExhausted: F2[Boolean] = (isBufferEmpty, supplyEnded.get).mapN(_ && _) + + // "subscribing" to the buffer waiting for the first chunk. Note: we + // can't wait forever: at most we can wait until the supply has ended + val awaitChunk: F2[Chunk[O]] = { + + val waitForOne = Stream.eval(supply.acquire).interruptWhen(supplyEnded).compile.drain + + // we need to check again after waiting: (we might have just received the final chunk, + // in that case we need to flush any residual elements in the buffer without blocking + waitForOne *> F.ifM(supplyEnded.get)(emitChunk, awaitFirstChunk.compile.lastOrError) } - val onTimeout: F2[Long] = - for { - _ <- supply.acquire // waits until there is at least one element in buffer - m <- supply.available - k = m.min(outputLong - 1) - b <- supply.tryAcquireN(k) - } yield if (b) k + 1 else 1 - - // in JS cancellation doesn't always seem to run, so race conditions should restore state on their own - for { - acq <- F.race(F.sleep(timeout), waitSupply).flatMap { - case Left(_) => onTimeout - case Right(_) => supply.acquireN(outputLong).as(outputLong) - } - buf <- buffer.modify(_.splitAt(acq.toInt)) - _ <- demand.releaseN(buf.data.size.toLong) - res <- buf.endOfSupply match { - case Some(Left(error)) => F.raiseError(error) - case Some(Right(_)) if buf.data.isEmpty => F.pure(None) - case _ => F.pure(Some(buf.data)) - } - } yield res - } + // releasing a number of permits equal to {chunkSize} is enough in most cases, but in + // order to ensure prompt termination of the consumer on interruption even when the timeout + // has not kicked in yet nor we've seen enough elements we need to max out the supply + val maxOutSupply: F2[Unit] = supply.releaseN(Int.MaxValue.toLong + chunkSize) - def endSupply(result: Either[Throwable, Unit]): F2[Unit] = - buffer.update(_.copy(endOfSupply = Some(result))) *> supply.releaseN(Int.MaxValue) + // enabling termination of the consumer stream when the producer completes naturally + // (i.e runs out of elements) or when the combined stream (consumer + producer) is interrupted + val endSupply: F2[Unit] = supplyEnded.set(true) *> maxOutSupply - def endDemand(result: Either[Throwable, Unit]): F2[Unit] = - buffer.update(_.copy(endOfDemand = Some(result))) *> demand.releaseN(Int.MaxValue) + val enqueue: F2[Unit] = + foreach(buffer.offer(_) <* supply.release).compile.drain.guarantee(endSupply) - def toEnding(ec: ExitCase): Either[Throwable, Unit] = ec match { - case ExitCase.Succeeded => Right(()) - case ExitCase.Errored(e) => Left(e) - case ExitCase.Canceled => Right(()) - } + def lowerSupply(flushed: Chunk[O], awaited: Int): F2[Chunk[O]] = + supply.acquireN((flushed.size.toLong - awaited).max(0)).as(flushed) - val enqueueAsync = F.start { - this - .evalMap(enqueue) - .forall(identity) - .onFinalizeCase(ec => endSupply(toEnding(ec))) - .compile - .drain - } + // emit immediately or wait before doing so, subsequently lowering the supply by however + // many elements have been flushed (excluding the element already awaited, if needed) + val emitNextOnTimeout: F2[Chunk[O]] = { + val waitAndEmit = awaitChunk.map((_, 1)) + val emitImmediately = emitChunk.map((_, 0)) + F.ifM(isBufferEmpty)(waitAndEmit, emitImmediately).flatMap((lowerSupply _).tupled) + } - val outputStream: Stream[F2, Chunk[O]] = - Stream - .eval(dequeueNextOutput) - .repeat - .collectWhile { case Some(data) => Chunk.vector(data) } + val onTimeout: F2[Chunk[O]] = + F.ifM(streamExhausted)(F.pure(Chunk.empty[O]), emitNextOnTimeout) - Stream - .bracketCase(enqueueAsync) { case (upstream, exitCase) => - endDemand(toEnding(exitCase)) *> upstream.cancel - } >> outputStream + val dequeue: F2[Chunk[O]] = + F.race(supply.acquireN(chunkSize.toLong), F.sleep(timeout)).flatMap { + case Left(_) => emitChunk + case Right(_) => onTimeout + } + + Stream + .repeatEval(dequeue) + .collectWhile { case os if os.nonEmpty => os } + .concurrently(Stream.eval(enqueue)) + } } - } - } /** If `this` terminates with `Stream.raiseError(e)`, invoke `h(e)`. * diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index e48fe36efa..696923ff73 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -23,7 +23,7 @@ package fs2 import cats.effect.kernel.Deferred import cats.effect.kernel.Ref -import cats.effect.std.{Semaphore, Queue} +import cats.effect.std.{Queue, Semaphore} import cats.effect.testkit.TestControl import cats.effect.{IO, SyncIO} import cats.syntax.all._ @@ -34,6 +34,7 @@ import org.scalacheck.Prop.forAll import scala.concurrent.duration._ import scala.concurrent.TimeoutException +import scala.util.control.NoStackTrace class StreamCombinatorsSuite extends Fs2Suite { @@ -747,7 +748,7 @@ class StreamCombinatorsSuite extends Fs2Suite { } } - test("accumulation and splitting".flaky) { + test("accumulation and splitting") { val t = 200.millis val size = 5 val sleep = Stream.sleep_[IO](2 * t) @@ -774,6 +775,36 @@ class StreamCombinatorsSuite extends Fs2Suite { source.groupWithin(size, t).map(_.toList).assertEmits(expected) } + test("accumulation and splitting 2") { + val t = 200.millis + val size = 5 + val sleep = Stream.sleep_[IO](2 * t) + val longSleep = Stream.sleep_[IO](10 * t) + + def chunk(from: Int, size: Int) = + Stream.range(from, from + size).chunkAll.unchunks + + // this test example is designed to have good coverage of + // the chunk manipulation logic in groupWithin + val source = + chunk(from = 1, size = 3) ++ + sleep ++ + chunk(from = 4, size = 1) ++ longSleep ++ + chunk(from = 5, size = 11) ++ + chunk(from = 16, size = 7) + + val expected = List( + List(1, 2, 3), + List(4), + List(5, 6, 7, 8, 9), + List(10, 11, 12, 13, 14), + List(15, 16, 17, 18, 19), + List(20, 21, 22) + ) + + source.groupWithin(size, t).map(_.toList).assertEmits(expected) + } + test("does not reset timeout if nothing is emitted") { TestControl .executeEmbed( @@ -833,6 +864,78 @@ class StreamCombinatorsSuite extends Fs2Suite { ) .assertEquals(0.millis) } + + test("stress test: all elements are processed") { + + val rangeLength = 100000 + + Stream + .eval(Ref.of[IO, Int](0)) + .flatMap { counter => + Stream + .range(0, rangeLength) + .covary[IO] + .groupWithin(4096, 100.micros) + .evalTap(ch => counter.update(_ + ch.size)) *> Stream.eval(counter.get) + } + .compile + .lastOrError + .assertEquals(rangeLength) + + } + + test("upstream failures are propagated downstream") { + + case object SevenNotAllowed extends NoStackTrace + + val source = Stream + .unfold(0)(s => Some((s, s + 1))) + .covary[IO] + .evalMap(n => if (n == 7) IO.raiseError(SevenNotAllowed) else IO.pure(n)) + + val downstream = source.groupWithin(100, 2.seconds) + + downstream.compile.lastOrError.intercept[SevenNotAllowed.type] + } + + test( + "upstream interruption causes immediate downstream termination with all elements being emitted" + ) { + + val sourceTimeout = 5.5.seconds + val downstreamTimeout = sourceTimeout + 2.seconds + + TestControl + .executeEmbed( + Ref[IO] + .of(0.millis) + .flatMap { ref => + val source: Stream[IO, Int] = + Stream + .unfold(0)(s => Some((s, s + 1))) + .covary[IO] + .meteredStartImmediately(1.second) + .interruptAfter(sourceTimeout) + + // large chunkSize and timeout (no emissions expected in the window + // specified, unless source ends, due to interruption or + // natural termination (i.e runs out of elements) + val downstream: Stream[IO, Chunk[Int]] = + source.groupWithin(Int.MaxValue, 1.day) + + downstream.compile.lastOrError + .map(_.toList) + .timeout(downstreamTimeout) + .flatTap(_ => IO.monotonic.flatMap(ref.set)) + .flatMap(emit => ref.get.map(timeLapsed => (timeLapsed, emit))) + } + ) + .assertEquals( + // downstream ended immediately (i.e timeLapsed = sourceTimeout) + // emitting whatever was accumulated at the time of interruption + (sourceTimeout, List(0, 1, 2, 3, 4, 5)) + ) + } } property("head")(forAll((s: Stream[Pure, Int]) => assertEquals(s.head.toList, s.toList.take(1)))) From 87ecc3689645bf9223d82740925df4b9a8eb6bff Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Sat, 25 Mar 2023 18:13:01 +0000 Subject: [PATCH 02/16] readability --- core/shared/src/main/scala/fs2/Stream.scala | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index b65438c1a3..6120d5cd90 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1410,7 +1410,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, Stream.force { for { supply <- Semaphore[F2](0) - supplyEnded <- SignallingRef.of[F2, Boolean](false) + supplyEnded <- SignallingRef[F2].of(false) buffer <- Queue.bounded[F2, O](chunkSize) // buffering and backpressure awaitFirstChunk = Stream.fromQueueUnterminated(buffer, chunkSize).chunkMin(1).head } yield { @@ -1446,19 +1446,16 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, val enqueue: F2[Unit] = foreach(buffer.offer(_) <* supply.release).compile.drain.guarantee(endSupply) - def lowerSupply(flushed: Chunk[O], awaited: Int): F2[Chunk[O]] = - supply.acquireN((flushed.size.toLong - awaited).max(0)).as(flushed) - // emit immediately or wait before doing so, subsequently lowering the supply by however // many elements have been flushed (excluding the element already awaited, if needed) - val emitNextOnTimeout: F2[Chunk[O]] = { - val waitAndEmit = awaitChunk.map((_, 1)) - val emitImmediately = emitChunk.map((_, 0)) - F.ifM(isBufferEmpty)(waitAndEmit, emitImmediately).flatMap((lowerSupply _).tupled) - } + val emitNextChunk: F2[Chunk[O]] = for { + isEmpty <- isBufferEmpty + (flushed, awaited) <- if (isEmpty) awaitChunk.map((_, 1)) else emitChunk.map((_, 0)) + _ <- supply.acquireN((flushed.size.toLong - awaited).max(0)) + } yield flushed val onTimeout: F2[Chunk[O]] = - F.ifM(streamExhausted)(F.pure(Chunk.empty[O]), emitNextOnTimeout) + F.ifM(streamExhausted)(F.pure(Chunk.empty[O]), emitNextChunk) val dequeue: F2[Chunk[O]] = F.race(supply.acquireN(chunkSize.toLong), F.sleep(timeout)).flatMap { From bfc3a43a7d185355fdb4693975d5ee9edb0fde0b Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Sat, 25 Mar 2023 18:37:08 +0000 Subject: [PATCH 03/16] fmt --- .../src/test/scala/fs2/StreamCombinatorsSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index 696923ff73..e83b43a68e 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -787,11 +787,11 @@ class StreamCombinatorsSuite extends Fs2Suite { // this test example is designed to have good coverage of // the chunk manipulation logic in groupWithin val source = - chunk(from = 1, size = 3) ++ - sleep ++ - chunk(from = 4, size = 1) ++ longSleep ++ - chunk(from = 5, size = 11) ++ - chunk(from = 16, size = 7) + chunk(from = 1, size = 3) ++ + sleep ++ + chunk(from = 4, size = 1) ++ longSleep ++ + chunk(from = 5, size = 11) ++ + chunk(from = 16, size = 7) val expected = List( List(1, 2, 3), From ca35428371648d9999f98f0bd0de8715a7465af1 Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Sun, 26 Mar 2023 00:45:57 +0000 Subject: [PATCH 04/16] increasing timeout for js platform, avoid deconstructing tuple in for-compr to avoid CI js failure --- core/shared/src/main/scala/fs2/Stream.scala | 7 ++++--- .../shared/src/test/scala/fs2/StreamCombinatorsSuite.scala | 2 ++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 6120d5cd90..29ce774302 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1449,9 +1449,10 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, // emit immediately or wait before doing so, subsequently lowering the supply by however // many elements have been flushed (excluding the element already awaited, if needed) val emitNextChunk: F2[Chunk[O]] = for { - isEmpty <- isBufferEmpty - (flushed, awaited) <- if (isEmpty) awaitChunk.map((_, 1)) else emitChunk.map((_, 0)) - _ <- supply.acquireN((flushed.size.toLong - awaited).max(0)) + mustWait <- isBufferEmpty + flushed <- if (mustWait) awaitChunk else emitChunk + awaitedCount = if (mustWait) 1 else 0 + _ <- supply.acquireN((flushed.size.toLong - awaitedCount).max(0)) } yield flushed val onTimeout: F2[Chunk[O]] = diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index e83b43a68e..84a6c2e180 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -38,6 +38,8 @@ import scala.util.control.NoStackTrace class StreamCombinatorsSuite extends Fs2Suite { + override def munitIOTimeout: FiniteDuration = 1.minute + group("awakeEvery") { test("basic") { Stream From 4080294c7f977245e50242cfffe10cc599bb1d5a Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Sun, 26 Mar 2023 04:48:55 +0100 Subject: [PATCH 05/16] removing additional timeout override --- core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index e4a78dccb6..9253e5121c 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -39,8 +39,6 @@ import scala.util.control.NoStackTrace class StreamCombinatorsSuite extends Fs2Suite { override def munitIOTimeout = 1.minute - override def munitIOTimeout: FiniteDuration = 1.minute - group("awakeEvery") { test("basic") { Stream From 98587dcbb28ff83e5445c16558229f4e522d5dc3 Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Sun, 26 Mar 2023 17:31:10 +0100 Subject: [PATCH 06/16] * permit bugfixes * handling edge cases * refactoring * adding tests --- core/shared/src/main/scala/fs2/Stream.scala | 74 ++++++++++--------- core/shared/src/test/scala/fs2/Fs2Suite.scala | 3 + .../scala/fs2/StreamCombinatorsSuite.scala | 57 +++++++++++++- 3 files changed, 98 insertions(+), 36 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 29ce774302..35fabd4697 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1372,12 +1372,12 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, } /** Splits this stream into a stream of chunks of elements, such that - * 1. each chunk in the output has at most `outputSize` elements, and + * 1. each chunk in the output has at most `chunkSize` elements, and * 2. the concatenation of those chunks, which is obtained by calling * `unchunks`, yields the same element sequence as this stream. * - * As `this` stream emits input elements, the result stream them in a - * waiting buffer, until it has enough elements to emit next chunk. + * As `this` stream ingests input elements, they will be collected in a + * waiting buffer, until it has enough elements to emit the next chunk. * * To avoid holding input elements for too long, this method takes a * `timeout`. This timeout is reset after each output chunk is emitted. @@ -1397,6 +1397,9 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, * When the input stream terminates, any accumulated elements are emitted * immediately in a chunk, even if `timeout` has not expired. * + * If the chunkSize is equal to zero the stream will block until the + * timeout expires at which point it will terminate. + * * @param chunkSize the maximum size of chunks emitted by resulting stream. * @param timeout maximum time that input elements are held in the buffer * before being emitted by the resulting stream. @@ -1405,58 +1408,63 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, chunkSize: Int, timeout: FiniteDuration )(implicit F: Temporal[F2]): Stream[F2, Chunk[O]] = - if (timeout.toNanos == 0 || chunkSize == 1) chunkN(chunkSize) + if (chunkSize == 0) Stream.sleep_[F2](timeout) + else if (timeout.toNanos == 0 || chunkSize == 1) chunkN(chunkSize) else Stream.force { for { supply <- Semaphore[F2](0) supplyEnded <- SignallingRef[F2].of(false) buffer <- Queue.bounded[F2, O](chunkSize) // buffering and backpressure - awaitFirstChunk = Stream.fromQueueUnterminated(buffer, chunkSize).chunkMin(1).head } yield { - // note: it can produce an empty chunk - val emitChunk: F2[Chunk[O]] = buffer.tryTakeN(Some(chunkSize)).map(Chunk.seq) - // we need to check the buffer size, rather than the available supply since - // the supply is increased at the end so it won't always report the buffer size accurately - val isBufferEmpty: F2[Boolean] = buffer.size.map(_ == 0) + def bufferSizeIs(n: Int): F2[Boolean] = + buffer.size.map(_ == n) - val streamExhausted: F2[Boolean] = (isBufferEmpty, supplyEnded.get).mapN(_ && _) + val supplyUnavailable: F2[Boolean] = supply.available.map(_ == 0) - // "subscribing" to the buffer waiting for the first chunk. Note: we - // can't wait forever: at most we can wait until the supply has ended - val awaitChunk: F2[Chunk[O]] = { + val emitChunk: F2[Chunk[O]] = + buffer.tryTakeN(Some(chunkSize)).map(Chunk.seq) - val waitForOne = Stream.eval(supply.acquire).interruptWhen(supplyEnded).compile.drain + val streamExhausted: F2[Boolean] = + (bufferSizeIs(0), supplyEnded.get).mapN(_ && _) - // we need to check again after waiting: (we might have just received the final chunk, - // in that case we need to flush any residual elements in the buffer without blocking - waitForOne *> F.ifM(supplyEnded.get)(emitChunk, awaitFirstChunk.compile.lastOrError) - } + // in order to ensure prompt termination on interruption even when the timeout has not + // kicked in yet or we haven't seen enough elements we need to max out the supply + val endSupply: F2[Unit] = + supplyEnded.set(true) *> supply.releaseN(Int.MaxValue.toLong + chunkSize) - // releasing a number of permits equal to {chunkSize} is enough in most cases, but in - // order to ensure prompt termination of the consumer on interruption even when the timeout - // has not kicked in yet nor we've seen enough elements we need to max out the supply - val maxOutSupply: F2[Unit] = supply.releaseN(Int.MaxValue.toLong + chunkSize) + val enqueue: F2[Unit] = + foreach(buffer.offer(_) *> supply.release).compile.drain.guarantee(endSupply) - // enabling termination of the consumer stream when the producer completes naturally - // (i.e runs out of elements) or when the combined stream (consumer + producer) is interrupted - val endSupply: F2[Unit] = supplyEnded.set(true) *> maxOutSupply + val awaitChunk: F2[Chunk[O]] = { + // we can't wait forever: at most we can wait until the supply has ended + val waitForOne = Stream.eval(supply.acquire).interruptWhen(supplyEnded).compile.drain - val enqueue: F2[Unit] = - foreach(buffer.offer(_) <* supply.release).compile.drain.guarantee(endSupply) + // "subscribing" to the buffer pulling the first chunk when it becomes available + val pullChunk = Stream.fromQueueUnterminated(buffer, chunkSize).chunks.head.compile.lastOrError + + // we need to check the supply after waiting: (we might have just received the final chunk, + // in that case we need to flush any residual elements in the buffer without blocking) + waitForOne *> F.ifM(supplyEnded.get)(emitChunk, pullChunk) + } // emit immediately or wait before doing so, subsequently lowering the supply by however // many elements have been flushed (excluding the element already awaited, if needed) - val emitNextChunk: F2[Chunk[O]] = for { - mustWait <- isBufferEmpty - flushed <- if (mustWait) awaitChunk else emitChunk - awaitedCount = if (mustWait) 1 else 0 + val emitWhenAvailableAndLowerSupply: F2[Chunk[O]] = for { + noSupply <- supplyUnavailable + flushed <- if (noSupply) awaitChunk else emitChunk + awaitedCount = if (noSupply) 1 else 0 _ <- supply.acquireN((flushed.size.toLong - awaitedCount).max(0)) } yield flushed - val onTimeout: F2[Chunk[O]] = + // edge case: supply semaphore loses the race, but acquires the permits. In such scenario + // we emit the chunk without lowering the supply, since it has already been lowered + val onTimeout: F2[Chunk[O]] = { + val edgeCase = (supplyUnavailable, bufferSizeIs(chunkSize)).mapN(_ && _) + val emitNextChunk = F.ifM(edgeCase)(emitChunk, emitWhenAvailableAndLowerSupply) F.ifM(streamExhausted)(F.pure(Chunk.empty[O]), emitNextChunk) + } val dequeue: F2[Chunk[O]] = F.race(supply.acquireN(chunkSize.toLong), F.sleep(timeout)).flatMap { diff --git a/core/shared/src/test/scala/fs2/Fs2Suite.scala b/core/shared/src/test/scala/fs2/Fs2Suite.scala index e50396ff67..a79fd699b7 100644 --- a/core/shared/src/test/scala/fs2/Fs2Suite.scala +++ b/core/shared/src/test/scala/fs2/Fs2Suite.scala @@ -89,6 +89,9 @@ abstract class Fs2Suite expect <- expected.compile.toList } yield assertEquals(actual.toSet, expect.toSet) + def assertCompletes: IO[Unit] = + str.compile.drain.assert + def intercept[T <: Throwable](implicit T: ClassTag[T], loc: Location): IO[T] = str.compile.drain.intercept[T] } diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index 9253e5121c..a79dec7e43 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -39,6 +39,8 @@ import scala.util.control.NoStackTrace class StreamCombinatorsSuite extends Fs2Suite { override def munitIOTimeout = 1.minute + // override def scalaCheckInitialSeed = "4_9X5VOJxLTr_rDxGix4ltoYWqEHXslYbXxF8wkta_O=" + group("awakeEvery") { test("basic") { Stream @@ -780,7 +782,7 @@ class StreamCombinatorsSuite extends Fs2Suite { val t = 200.millis val size = 5 val sleep = Stream.sleep_[IO](2 * t) - val longSleep = Stream.sleep_[IO](10 * t) + val longSleep = sleep.repeatN(5) def chunk(from: Int, size: Int) = Stream.range(from, from + size).chunkAll.unchunks @@ -866,17 +868,43 @@ class StreamCombinatorsSuite extends Fs2Suite { .assertEquals(0.millis) } - test("stress test: all elements are processed") { + test("stress test (short execution): all elements are processed") { val rangeLength = 100000 Stream - .eval(Ref.of[IO, Int](0)) + .eval(Ref[IO].of(0)) .flatMap { counter => Stream .range(0, rangeLength) .covary[IO] + .groupWithin(256, 100.micros) + .evalTap(ch => counter.update(_ + ch.size)) *> Stream.eval(counter.get) + } + .compile + .lastOrError + .assertEquals(rangeLength) + + } + + // ignoring because it's a long running test (around 30 minutes), but it's a useful test to have + // to asses the validity of permits management and timeout logic over an extended period of time + test("stress test (long execution): all elements are processed".ignore) { + + val rangeLength = 5000000 + + Stream + .eval(Ref[IO].of(0)) + .flatMap { counter => + Stream + .range(0, rangeLength) + .covary[IO] + .evalTap(d => IO.sleep((d % 500 + 2).micros)) .groupWithin(4096, 100.micros) + .zipWithIndex + .evalMap { case (ch, idx) => + IO.println(s"chunk # $idx, size is: ${ch.size}").whenA(idx % 1000 == 0).as(ch) + } .evalTap(ch => counter.update(_ + ch.size)) *> Stream.eval(counter.get) } .compile @@ -937,6 +965,29 @@ class StreamCombinatorsSuite extends Fs2Suite { (sourceTimeout, List(0, 1, 2, 3, 4, 5)) ) } + + test( + "if the buffer fills up at the same time when the timeout expires there won't be a deadlock" + ) { + + forAllF { (s0: Stream[Pure, Int], b: Byte) => + TestControl + .executeEmbed { + + // preventing empty or singleton streams that would bypass the logic being tested + val n = b.max(2) + val s = s0 ++ Stream.range(0, n) + + // every n seconds there will be n elements in the buffer at same time when the timeout expires + s + .covary[IO] + .metered(1.second) + .groupWithin(n, n.seconds) + .map(_.toList) + .assertCompletes + } + } + } } property("head")(forAll((s: Stream[Pure, Int]) => assertEquals(s.head.toList, s.toList.take(1)))) From a6ee359209ce80724265977c11268c6ceb7f5794 Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Mon, 27 Mar 2023 21:21:25 +0100 Subject: [PATCH 07/16] scalafmt --- core/shared/src/main/scala/fs2/Stream.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 35fabd4697..5eff169677 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1442,7 +1442,8 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, val waitForOne = Stream.eval(supply.acquire).interruptWhen(supplyEnded).compile.drain // "subscribing" to the buffer pulling the first chunk when it becomes available - val pullChunk = Stream.fromQueueUnterminated(buffer, chunkSize).chunks.head.compile.lastOrError + val pullChunk = + Stream.fromQueueUnterminated(buffer, chunkSize).chunks.head.compile.lastOrError // we need to check the supply after waiting: (we might have just received the final chunk, // in that case we need to flush any residual elements in the buffer without blocking) From 0ca754fc0a7449029e09b44d38ec07002433eaca Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Mon, 27 Mar 2023 21:48:17 +0100 Subject: [PATCH 08/16] converting byte to int --- core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index a79dec7e43..f0ae7d88a9 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -975,7 +975,7 @@ class StreamCombinatorsSuite extends Fs2Suite { .executeEmbed { // preventing empty or singleton streams that would bypass the logic being tested - val n = b.max(2) + val n = b.max(2).toInt val s = s0 ++ Stream.range(0, n) // every n seconds there will be n elements in the buffer at same time when the timeout expires From 56ba786197427e10080e548d1ba1b474e369f628 Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Mon, 27 Mar 2023 22:11:43 +0100 Subject: [PATCH 09/16] minor --- core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index f0ae7d88a9..bb0ee44191 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -39,8 +39,6 @@ import scala.util.control.NoStackTrace class StreamCombinatorsSuite extends Fs2Suite { override def munitIOTimeout = 1.minute - // override def scalaCheckInitialSeed = "4_9X5VOJxLTr_rDxGix4ltoYWqEHXslYbXxF8wkta_O=" - group("awakeEvery") { test("basic") { Stream From dce718a7173413d8667b2b120ec5664ea5f3fa31 Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Tue, 28 Mar 2023 08:29:02 +0100 Subject: [PATCH 10/16] using test control, improving test spec, comments and readability --- core/shared/src/main/scala/fs2/Stream.scala | 27 ++++++------ .../scala/fs2/StreamCombinatorsSuite.scala | 42 +++++++++---------- 2 files changed, 35 insertions(+), 34 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 5eff169677..28fd9775bb 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1421,32 +1421,32 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, def bufferSizeIs(n: Int): F2[Boolean] = buffer.size.map(_ == n) - val supplyUnavailable: F2[Boolean] = supply.available.map(_ == 0) - - val emitChunk: F2[Chunk[O]] = - buffer.tryTakeN(Some(chunkSize)).map(Chunk.seq) + val supplyUnavailable: F2[Boolean] = + supply.available.map(_ == 0) val streamExhausted: F2[Boolean] = (bufferSizeIs(0), supplyEnded.get).mapN(_ && _) - // in order to ensure prompt termination on interruption even when the timeout has not - // kicked in yet or we haven't seen enough elements we need to max out the supply + // emitting a chunk without blocking (might produce an empty chunk) + val emitChunk: F2[Chunk[O]] = + buffer.tryTakeN(Some(chunkSize)).map(Chunk.seq) + + // in order to ensure prompt termination on interruption even when the timeout has not + // kicked in yet or we haven't seen enough elements we need to max out the supply val endSupply: F2[Unit] = supplyEnded.set(true) *> supply.releaseN(Int.MaxValue.toLong + chunkSize) - val enqueue: F2[Unit] = - foreach(buffer.offer(_) *> supply.release).compile.drain.guarantee(endSupply) - val awaitChunk: F2[Chunk[O]] = { // we can't wait forever: at most we can wait until the supply has ended val waitForOne = Stream.eval(supply.acquire).interruptWhen(supplyEnded).compile.drain - // "subscribing" to the buffer pulling the first chunk when it becomes available + // "subscribing" to the buffer blocking until the first chunk becomes available val pullChunk = Stream.fromQueueUnterminated(buffer, chunkSize).chunks.head.compile.lastOrError - // we need to check the supply after waiting: (we might have just received the final chunk, - // in that case we need to flush any residual elements in the buffer without blocking) + // we need to check if the supply has ended after waiting: (we might have just reached + // the end of the stream, with or without receiving any element: since we don't know + // whether or not we have elements to flush, we must emit a chunk without blocking) waitForOne *> F.ifM(supplyEnded.get)(emitChunk, pullChunk) } @@ -1467,6 +1467,9 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, F.ifM(streamExhausted)(F.pure(Chunk.empty[O]), emitNextChunk) } + val enqueue: F2[Unit] = + foreach(buffer.offer(_) *> supply.release).compile.drain.guarantee(endSupply) + val dequeue: F2[Chunk[O]] = F.race(supply.acquireN(chunkSize.toLong), F.sleep(timeout)).flatMap { case Left(_) => emitChunk diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index bb0ee44191..3820131c8f 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -885,30 +885,27 @@ class StreamCombinatorsSuite extends Fs2Suite { } - // ignoring because it's a long running test (around 30 minutes), but it's a useful test to have + // ignoring because it's a long running test (around 15 minutes), but it's a useful test to have // to asses the validity of permits management and timeout logic over an extended period of time test("stress test (long execution): all elements are processed".ignore) { - val rangeLength = 5000000 - - Stream - .eval(Ref[IO].of(0)) - .flatMap { counter => - Stream - .range(0, rangeLength) - .covary[IO] - .evalTap(d => IO.sleep((d % 500 + 2).micros)) - .groupWithin(4096, 100.micros) - .zipWithIndex - .evalMap { case (ch, idx) => - IO.println(s"chunk # $idx, size is: ${ch.size}").whenA(idx % 1000 == 0).as(ch) - } - .evalTap(ch => counter.update(_ + ch.size)) *> Stream.eval(counter.get) - } - .compile - .lastOrError - .assertEquals(rangeLength) + TestControl.executeEmbed { + val rangeLength = 5000000 + Stream + .eval(Ref[IO].of(0)) + .flatMap { counter => + Stream + .range(0, rangeLength) + .covary[IO] + .evalTap(d => IO.sleep((d % 500 + 2).micros)) + .groupWithin(4096, 100.micros) + .evalTap(ch => counter.update(_ + ch.size)) *> Stream.eval(counter.get) + } + .compile + .lastOrError + .assertEquals(rangeLength) + } } test("upstream failures are propagated downstream") { @@ -965,7 +962,7 @@ class StreamCombinatorsSuite extends Fs2Suite { } test( - "if the buffer fills up at the same time when the timeout expires there won't be a deadlock" + "Edge case: if the buffer fills up and timeout expires at the same time there won't be a deadlock" ) { forAllF { (s0: Stream[Pure, Int], b: Byte) => @@ -976,7 +973,8 @@ class StreamCombinatorsSuite extends Fs2Suite { val n = b.max(2).toInt val s = s0 ++ Stream.range(0, n) - // every n seconds there will be n elements in the buffer at same time when the timeout expires + // the buffer will reach its full capacity every + // n seconds exactly when the timeout expires s .covary[IO] .metered(1.second) From 52be6053de9ea239cb20fdbb9925f7275b142c1b Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Sat, 1 Apr 2023 10:39:27 +0100 Subject: [PATCH 11/16] simplification --- core/shared/src/main/scala/fs2/Stream.scala | 22 +++++-------------- .../scala/fs2/StreamCombinatorsSuite.scala | 2 +- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 28fd9775bb..d258c015e2 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1436,27 +1436,17 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, val endSupply: F2[Unit] = supplyEnded.set(true) *> supply.releaseN(Int.MaxValue.toLong + chunkSize) - val awaitChunk: F2[Chunk[O]] = { - // we can't wait forever: at most we can wait until the supply has ended - val waitForOne = Stream.eval(supply.acquire).interruptWhen(supplyEnded).compile.drain - - // "subscribing" to the buffer blocking until the first chunk becomes available - val pullChunk = - Stream.fromQueueUnterminated(buffer, chunkSize).chunks.head.compile.lastOrError - - // we need to check if the supply has ended after waiting: (we might have just reached - // the end of the stream, with or without receiving any element: since we don't know - // whether or not we have elements to flush, we must emit a chunk without blocking) - waitForOne *> F.ifM(supplyEnded.get)(emitChunk, pullChunk) - } + // we can't wait forever: at most we can wait until the supply has ended + val waitForOne: F2[Unit] = + Stream.eval(supply.acquire).interruptWhen(supplyEnded).compile.drain // emit immediately or wait before doing so, subsequently lowering the supply by however // many elements have been flushed (excluding the element already awaited, if needed) val emitWhenAvailableAndLowerSupply: F2[Chunk[O]] = for { noSupply <- supplyUnavailable - flushed <- if (noSupply) awaitChunk else emitChunk - awaitedCount = if (noSupply) 1 else 0 - _ <- supply.acquireN((flushed.size.toLong - awaitedCount).max(0)) + flushed <- if (noSupply) waitForOne *> emitChunk else emitChunk + awaitedCount = if (noSupply) 1L else 0L + _ <- supply.acquireN((flushed.size - awaitedCount).max(0)) } yield flushed // edge case: supply semaphore loses the race, but acquires the permits. In such scenario diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index 3820131c8f..2e9ef7d26d 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -885,7 +885,7 @@ class StreamCombinatorsSuite extends Fs2Suite { } - // ignoring because it's a long running test (around 15 minutes), but it's a useful test to have + // ignoring because it's a long running test (around 8-12 minutes), but it's a useful test to have // to asses the validity of permits management and timeout logic over an extended period of time test("stress test (long execution): all elements are processed".ignore) { From 8fe5d326546f47f8fc75265a7de73f2d70d9fe67 Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Sat, 1 Apr 2023 11:36:35 +0100 Subject: [PATCH 12/16] removing unnecessary check since it's always false --- core/shared/src/main/scala/fs2/Stream.scala | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 1afd671486..8666bf193b 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1418,16 +1418,12 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, buffer <- Queue.bounded[F2, O](chunkSize) // buffering and backpressure } yield { - def bufferSizeIs(n: Int): F2[Boolean] = - buffer.size.map(_ == n) + val bufferFull: F2[Boolean] = + buffer.size.map(_ == chunkSize) val supplyUnavailable: F2[Boolean] = supply.available.map(_ == 0) - val streamExhausted: F2[Boolean] = - (bufferSizeIs(0), supplyEnded.get).mapN(_ && _) - - // emitting a chunk without blocking (might produce an empty chunk) val emitChunk: F2[Chunk[O]] = buffer.tryTakeN(Some(chunkSize)).map(Chunk.seq) @@ -1452,9 +1448,8 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, // edge case: supply semaphore loses the race, but acquires the permits. In such scenario // we emit the chunk without lowering the supply, since it has already been lowered val onTimeout: F2[Chunk[O]] = { - val edgeCase = (supplyUnavailable, bufferSizeIs(chunkSize)).mapN(_ && _) - val emitNextChunk = F.ifM(edgeCase)(emitChunk, emitWhenAvailableAndLowerSupply) - F.ifM(streamExhausted)(F.pure(Chunk.empty[O]), emitNextChunk) + val edgeCase = (supplyUnavailable, bufferFull).mapN(_ && _) + F.ifM(edgeCase)(emitChunk, emitWhenAvailableAndLowerSupply) } val enqueue: F2[Unit] = From 871652916a99b63537f3184757366601de903da2 Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Sat, 1 Apr 2023 15:43:59 +0100 Subject: [PATCH 13/16] restoring the notion of time --- .../scala/fs2/StreamCombinatorsSuite.scala | 33 +++++++++---------- 1 file changed, 15 insertions(+), 18 deletions(-) diff --git a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala index 2e9ef7d26d..26623bb51c 100644 --- a/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala @@ -885,27 +885,24 @@ class StreamCombinatorsSuite extends Fs2Suite { } - // ignoring because it's a long running test (around 8-12 minutes), but it's a useful test to have + // ignoring because it's a (relatively) long running test (around 3/4 minutes), but it's useful // to asses the validity of permits management and timeout logic over an extended period of time test("stress test (long execution): all elements are processed".ignore) { + val rangeLength = 10000000 - TestControl.executeEmbed { - val rangeLength = 5000000 - - Stream - .eval(Ref[IO].of(0)) - .flatMap { counter => - Stream - .range(0, rangeLength) - .covary[IO] - .evalTap(d => IO.sleep((d % 500 + 2).micros)) - .groupWithin(4096, 100.micros) - .evalTap(ch => counter.update(_ + ch.size)) *> Stream.eval(counter.get) - } - .compile - .lastOrError - .assertEquals(rangeLength) - } + Stream + .eval(Ref[IO].of(0)) + .flatMap { counter => + Stream + .range(0, rangeLength) + .covary[IO] + .evalTap(d => IO.sleep((d % 10 + 2).micros)) + .groupWithin(275, 5.millis) + .evalTap(ch => counter.update(_ + ch.size)) *> Stream.eval(counter.get) + } + .compile + .lastOrError + .assertEquals(rangeLength) } test("upstream failures are propagated downstream") { From e35a3203a31e373d33b7b14c92abaae084f7b69e Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Sun, 2 Apr 2023 12:19:07 +0100 Subject: [PATCH 14/16] restoreing awaitchunk logic --- core/shared/src/main/scala/fs2/Stream.scala | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 8666bf193b..fc3b727f31 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1432,15 +1432,25 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, val endSupply: F2[Unit] = supplyEnded.set(true) *> supply.releaseN(Int.MaxValue.toLong + chunkSize) - // we can't wait forever: at most we can wait until the supply has ended - val waitForOne: F2[Unit] = - Stream.eval(supply.acquire).interruptWhen(supplyEnded).compile.drain + val awaitChunk: F2[Chunk[O]] = { + // we can't wait forever: at most we can wait until the supply has ended + val waitForOne = Stream.eval(supply.acquire).interruptWhen(supplyEnded).compile.drain + + // "subscribing" to the buffer blocking until the first chunk becomes available + val pullChunk = + Stream.fromQueueUnterminated(buffer, chunkSize).chunks.head.compile.lastOrError + + // we need to check if the supply has ended after waiting: (we might have just reached + // the end of the stream, with or without receiving any element: since we don't know + // whether or not we have elements to flush, we must emit a chunk without blocking) + waitForOne *> F.ifM(supplyEnded.get)(emitChunk, pullChunk) + } // emit immediately or wait before doing so, subsequently lowering the supply by however // many elements have been flushed (excluding the element already awaited, if needed) val emitWhenAvailableAndLowerSupply: F2[Chunk[O]] = for { noSupply <- supplyUnavailable - flushed <- if (noSupply) waitForOne *> emitChunk else emitChunk + flushed <- if (noSupply) awaitChunk else emitChunk awaitedCount = if (noSupply) 1L else 0L _ <- supply.acquireN((flushed.size - awaitedCount).max(0)) } yield flushed From 17df799ecbe385e1784c5a13de1e90b74e0172f0 Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Tue, 11 Apr 2023 01:58:48 +0100 Subject: [PATCH 15/16] replacing concurrent queue with vector, checking if supply is empty once, minor refactoring --- core/shared/src/main/scala/fs2/Stream.scala | 64 +++++++++------------ 1 file changed, 26 insertions(+), 38 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index fc3b727f31..d9d8ce2c48 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1414,66 +1414,54 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, Stream.force { for { supply <- Semaphore[F2](0) + buffer <- Ref[F2].empty[Vector[O]] + backpressure <- Semaphore[F2](chunkSize.toLong) supplyEnded <- SignallingRef[F2].of(false) - buffer <- Queue.bounded[F2, O](chunkSize) // buffering and backpressure } yield { - val bufferFull: F2[Boolean] = - buffer.size.map(_ == chunkSize) + def push(o: O): F2[Unit] = + backpressure.acquire *> buffer.update(_.appended(o)) - val supplyUnavailable: F2[Boolean] = - supply.available.map(_ == 0) + val flush: F2[Vector[O]] = + buffer.getAndSet(Vector.empty).flatTap(os => backpressure.releaseN(os.size.toLong)) - val emitChunk: F2[Chunk[O]] = - buffer.tryTakeN(Some(chunkSize)).map(Chunk.seq) + // wait until the first chunk becomes available or when we reach the end of the stream. + val awaitSupply: F2[Unit] = + Stream.exec(supply.acquire).interruptWhen(supplyEnded).compile.drain - // in order to ensure prompt termination on interruption even when the timeout has not - // kicked in yet or we haven't seen enough elements we need to max out the supply - val endSupply: F2[Unit] = - supplyEnded.set(true) *> supply.releaseN(Int.MaxValue.toLong + chunkSize) + // in order to ensure prompt termination on interruption when the timeout has not kicked + // in yet or if we haven't seen enough elements we need provide enough supply for 2 iterations + val endSupply: F2[Unit] = supplyEnded.set(true) *> supply.releaseN(chunkSize * 2L) - val awaitChunk: F2[Chunk[O]] = { - // we can't wait forever: at most we can wait until the supply has ended - val waitForOne = Stream.eval(supply.acquire).interruptWhen(supplyEnded).compile.drain - - // "subscribing" to the buffer blocking until the first chunk becomes available - val pullChunk = - Stream.fromQueueUnterminated(buffer, chunkSize).chunks.head.compile.lastOrError - - // we need to check if the supply has ended after waiting: (we might have just reached - // the end of the stream, with or without receiving any element: since we don't know - // whether or not we have elements to flush, we must emit a chunk without blocking) - waitForOne *> F.ifM(supplyEnded.get)(emitChunk, pullChunk) - } - - // emit immediately or wait before doing so, subsequently lowering the supply by however + // flush immediately or wait before doing so, subsequently lowering the supply by however // many elements have been flushed (excluding the element already awaited, if needed) - val emitWhenAvailableAndLowerSupply: F2[Chunk[O]] = for { - noSupply <- supplyUnavailable - flushed <- if (noSupply) awaitChunk else emitChunk + def flushOnSupplyReceived(noSupply: Boolean): F2[Vector[O]] = for { + flushed <- awaitSupply.whenA(noSupply) *> flush awaitedCount = if (noSupply) 1L else 0L _ <- supply.acquireN((flushed.size - awaitedCount).max(0)) } yield flushed // edge case: supply semaphore loses the race, but acquires the permits. In such scenario - // we emit the chunk without lowering the supply, since it has already been lowered - val onTimeout: F2[Chunk[O]] = { - val edgeCase = (supplyUnavailable, bufferFull).mapN(_ && _) - F.ifM(edgeCase)(emitChunk, emitWhenAvailableAndLowerSupply) - } + // we flush the buffer without lowering the supply, since it has already been lowered + val onTimeout: F2[Vector[O]] = for { + bufferFull <- buffer.get.map(_.size == chunkSize) + noSupply <- supply.available.map(_ == 0) + edgeCase = bufferFull && noSupply + flushed <- if (edgeCase) flush else flushOnSupplyReceived(noSupply) + } yield flushed val enqueue: F2[Unit] = - foreach(buffer.offer(_) *> supply.release).compile.drain.guarantee(endSupply) + foreach(push(_) *> supply.release).compile.drain.guarantee(endSupply) - val dequeue: F2[Chunk[O]] = + val dequeue: F2[Vector[O]] = F.race(supply.acquireN(chunkSize.toLong), F.sleep(timeout)).flatMap { - case Left(_) => emitChunk + case Left(_) => flush case Right(_) => onTimeout } Stream .repeatEval(dequeue) - .collectWhile { case os if os.nonEmpty => os } + .collectWhile { case os if os.nonEmpty => Chunk.vector(os) } .concurrently(Stream.eval(enqueue)) } } From bdce57385ef797da3c1d57a3f56674b5570cca2f Mon Sep 17 00:00:00 2001 From: Angelo Oparah <16281580+Angel-O@users.noreply.github.com> Date: Sat, 22 Apr 2023 11:22:43 +0100 Subject: [PATCH 16/16] replacing appended with symbolic method, fails on ci for some reason --- core/shared/src/main/scala/fs2/Stream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index d9d8ce2c48..09c4993d5e 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1420,7 +1420,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, } yield { def push(o: O): F2[Unit] = - backpressure.acquire *> buffer.update(_.appended(o)) + backpressure.acquire *> buffer.update(_ :+ o) val flush: F2[Vector[O]] = buffer.getAndSet(Vector.empty).flatTap(os => backpressure.releaseN(os.size.toLong))