diff --git a/core/src/main/scala/com/phaller/rasync/Cell.scala b/core/src/main/scala/com/phaller/rasync/Cell.scala index e86d087..2eed0a0 100644 --- a/core/src/main/scala/com/phaller/rasync/Cell.scala +++ b/core/src/main/scala/com/phaller/rasync/Cell.scala @@ -83,6 +83,9 @@ trait Cell[K <: Key[V], V] { def when(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Unit def whenSequential(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Unit + def whenSequentialMulti(other: List[Cell[K, V]], valueCallback: () => Outcome[V]): Unit + def whenMulti(other: List[Cell[K, V]], valueCallback: () => Outcome[V]): Unit + def zipFinal(that: Cell[K, V]): Cell[DefaultKey[(V, V)], (V, V)] // internal API @@ -257,7 +260,7 @@ private class IntermediateState[K <: Key[V], V]( /** A list of cells that `this` cell depends on mapped to the callbacks to call, if those cells change. */ val nextCallbacks: Map[Cell[K, V], NextCallbackRunnable[K, V]], /** A list of cells that `this` cell depends on mapped to the callbacks to call, if those cells change. */ - val combinedCallbacks: Map[Cell[K, V], CombinedCallbackRunnable[K, V]]) extends State[V] + val combinedCallbacks: Map[Cell[K, V], CallbackRunnable[K, V]]) extends State[V] private object IntermediateState { def empty[K <: Key[V], V](updater: Updater[V]): IntermediateState[K, V] = @@ -481,6 +484,44 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U } } } + override def whenSequentialMulti(other: List[Cell[K, V]], valueCallback: () => Outcome[V]): Unit = + whenMulti(other, valueCallback, true) + + override def whenMulti(other: List[Cell[K, V]], valueCallback: () => Outcome[V]): Unit = + whenMulti(other, valueCallback, false) + + + private def whenMulti(other: List[Cell[K, V]], valueCallback: () => Outcome[V], sequential: Boolean): Unit = { + var success = false + while (!success) { // repeat until compareAndSet succeeded (or the dependency is outdated) + state.get() match { + case _: FinalState[K, V] => // completed with final result + // do not add dependency + // in fact, do nothing + success = true + + case raw: IntermediateState[_, _] => // not completed + val current = raw.asInstanceOf[IntermediateState[K, V]] + val toRegister = other.diff[Cell[K, V]](current.combinedCallbacks.keys.toSeq) + if (toRegister.isEmpty) + success = true // another combined dependency has been registered already. Ignore the new (duplicate) one. + else { + val newCallback: MultiCallbackRunnable[K, V] = + if (sequential) new MultiSequentialCallbackRunnable(pool, this, other, valueCallback) + else new MultiConcurrentCallbackRunnable(pool, this, other, valueCallback) + + val newState = new IntermediateState(current.res, current.tasksActive, current.completeDependentCells, current.completeCallbacks, current.nextDependentCells, current.nextCallbacks, current.combinedCallbacks ++ other.map((_, newCallback))) + if (state.compareAndSet(current, newState)) { + success = true + // Inform `other` that this cell depends on its updates. + other.foreach(_.addCombinedDependentCell(this)) + // start calculations on `other` so that we eventually get its updates. + other.foreach(pool.triggerExecution) + } + } + } + } + } override def whenNext(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit = { this.whenNext(other, valueCallback, sequential = false) diff --git a/core/src/main/scala/com/phaller/rasync/callbackRunnable.scala b/core/src/main/scala/com/phaller/rasync/callbackRunnable.scala index 4e5c115..658d4d7 100644 --- a/core/src/main/scala/com/phaller/rasync/callbackRunnable.scala +++ b/core/src/main/scala/com/phaller/rasync/callbackRunnable.scala @@ -24,15 +24,34 @@ private[rasync] trait CallbackRunnable[K <: Key[V], V] extends Runnable with OnC /** The cell that awaits this callback. */ val dependentCompleter: CellCompleter[K, V] - /** The cell that triggers the callback. */ - val otherCell: Cell[K, V] - protected val completeDep: Boolean protected val sequential: Boolean /** The callback to be called. It retrieves an updated value of otherCell and returns an Outcome for dependentCompleter. */ val callback: Any // TODO Is there a better supertype for (a) (V, Bool)=>Outcome[V] and (b) V=>Outcome[V]. Nothing=>Outcome[V] does not work. + + /** Call the callback and use update dependentCompleter according to the callback's result. */ + override def run(): Unit + +} + +private[rasync] trait SingleDepCallbackRunnable[K <: Key[V], V] extends CallbackRunnable[K, V] { //extends Runnable with OnCompleteRunnable { + /** The handler pool that runs the callback function. */ + override val pool: HandlerPool + + /** The cell that awaits this callback. */ + override val dependentCompleter: CellCompleter[K, V] + + /** The cell that triggers the callback. */ + val otherCell: Cell[K, V] + + override protected val completeDep: Boolean + override protected val sequential: Boolean + + /** The callback to be called. It retrieves an updated value of otherCell and returns an Outcome for dependentCompleter. */ + override val callback: Any // TODO Is there a better supertype for (a) (V, Bool)=>Outcome[V] and (b) V=>Outcome[V]. Nothing=>Outcome[V] does not work. + /** Add this CallbackRunnable to its handler pool. */ // This method is not needed currently, as all callbackRunnables for one update are run in the same thread. // Also, this is the only use of the peekFor method, so if tests show that the current implementation is @@ -74,7 +93,7 @@ private[rasync] abstract class CompleteCallbackRunnable[K <: Key[V], V]( override val dependentCompleter: CellCompleter[K, V], // needed to not call whenNext callback, if whenComplete callback exists. override val otherCell: Cell[K, V], override val callback: V => Outcome[V]) - extends CallbackRunnable[K, V] { + extends SingleDepCallbackRunnable[K, V] { override protected final val completeDep = true // must be filled in before running it @@ -135,7 +154,7 @@ private[rasync] abstract class NextCallbackRunnable[K <: Key[V], V]( override val dependentCompleter: CellCompleter[K, V], // needed to not call whenNext callback, if whenComplete callback exists. override val otherCell: Cell[K, V], override val callback: V => Outcome[V]) - extends CallbackRunnable[K, V] { + extends SingleDepCallbackRunnable[K, V] { override protected final val completeDep = false @@ -190,7 +209,7 @@ private[rasync] abstract class CombinedCallbackRunnable[K <: Key[V], V]( override val dependentCompleter: CellCompleter[K, V], // needed to not call whenNext callback, if whenComplete callback exists. override val otherCell: Cell[K, V], override val callback: (V, Boolean) => Outcome[V]) - extends CallbackRunnable[K, V] { + extends SingleDepCallbackRunnable[K, V] { override protected final val completeDep = false @@ -232,3 +251,51 @@ private[rasync] class CombinedConcurrentCallbackRunnable[K <: Key[V], V](overrid private[rasync] class CombinedSequentialCallbackRunnable[K <: Key[V], V](override val pool: HandlerPool, override val dependentCompleter: CellCompleter[K, V], override val otherCell: Cell[K, V], override val callback: (V, Boolean) => Outcome[V]) extends CombinedCallbackRunnable[K, V](pool, dependentCompleter, otherCell, callback) with SequentialCallbackRunnable[K, V] + +/* To be run when `otherCell` gets an update. +* @param pool The handler pool that runs the callback function +* @param dependentCompleter The cell, that depends on `otherCell`. +* @param otherCell Cell that triggers this callback. +* @param callback Callback function that is triggered on an onNext event +*/ +private[rasync] abstract class MultiCallbackRunnable[K <: Key[V], V]( + override val pool: HandlerPool, + override val dependentCompleter: CellCompleter[K, V], // needed to not call whenNext callback, if whenComplete callback exists. + val otherCells: List[Cell[K, V]], + override val callback: () => Outcome[V]) + extends CallbackRunnable[K, V] { + + override protected final val completeDep = false + + def run(): Unit = { + if (sequential) { + dependentCompleter.sequential { + callCallback() + } + } else { + callCallback() + } + } + + protected def callCallback(): Unit = { + if (dependentCompleter.cell.isComplete) { + return ; + } + + callback() match { + case NextOutcome(v) => + dependentCompleter.putNext(v) + case FinalOutcome(v) => + dependentCompleter.putFinal(v) + case _ => /* do nothing, the value of */ + } + + dependentCompleter.cell.removeAllCallbacks(otherCells.filter(_.isComplete)) + } +} + +private[rasync] class MultiConcurrentCallbackRunnable[K <: Key[V], V](override val pool: HandlerPool, override val dependentCompleter: CellCompleter[K, V], override val otherCells: List[Cell[K, V]], override val callback: () => Outcome[V]) + extends MultiCallbackRunnable[K, V](pool, dependentCompleter, otherCells, callback) with ConcurrentCallbackRunnable[K, V] + +private[rasync] class MultiSequentialCallbackRunnable[K <: Key[V], V](override val pool: HandlerPool, override val dependentCompleter: CellCompleter[K, V], override val otherCells: List[Cell[K, V]], override val callback: () => Outcome[V]) + extends MultiCallbackRunnable[K, V](pool, dependentCompleter, otherCells, callback) with SequentialCallbackRunnable[K, V] diff --git a/core/src/test/scala/com/phaller/rasync/test/WhenMultiSuite.scala b/core/src/test/scala/com/phaller/rasync/test/WhenMultiSuite.scala new file mode 100644 index 0000000..721b7b6 --- /dev/null +++ b/core/src/test/scala/com/phaller/rasync/test/WhenMultiSuite.scala @@ -0,0 +1,341 @@ +package com.phaller.rasync +package test + +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import com.phaller.rasync.lattice._ +import com.phaller.rasync.lattice.lattices.{NaturalNumberKey, NaturalNumberLattice} +import com.phaller.rasync.test.lattice._ +import org.scalatest.FunSuite + +import scala.concurrent.duration._ +import scala.concurrent.{Await, Promise} +import scala.util.{Failure, Success} + +class WhenMultiSuite extends FunSuite { + + implicit val stringIntUpdater: Updater[Int] = new StringIntUpdater + + test("when: values passed to callback") { + val latch1 = new CountDownLatch(1) + val latch2 = new CountDownLatch(1) + + implicit val pool = new HandlerPool + val completer1 = CellCompleter[StringIntKey, Int]("somekey") + val completer2 = CellCompleter[StringIntKey, Int]("someotherkey") + + val cell1 = completer1.cell + cell1.whenMulti(List(completer2.cell), () => { + Outcome(completer2.cell.getResult(), completer2.cell.isComplete) // complete, if completer2 is completed + }) + + assert(cell1.numNextDependencies == 1) + assert(cell1.numTotalDependencies == 1) + + cell1.onNext { + case Success(x) => + assert((x === 8 && !cell1.isComplete) || x === 10) + latch1.countDown() + case Failure(e) => + assert(false) + latch1.countDown() + } + + cell1.onComplete { + case Success(x) => + assert(x === 10) + latch2.countDown() + case Failure(e) => + assert(false) + latch2.countDown() + } + + completer1.putNext(8) + latch1.await() + + assert(!cell1.isComplete) + + completer2.putFinal(10) + latch2.await() + + assert(cell1.isComplete) + + pool.onQuiescenceShutdown() + } + + test("whenSequential: values passed to callback") { + val latch1 = new CountDownLatch(1) + val latch2 = new CountDownLatch(1) + + implicit val pool = new HandlerPool + val completer1 = CellCompleter[StringIntKey, Int]("somekey") + val completer2 = CellCompleter[StringIntKey, Int]("someotherkey") + + val cell1 = completer1.cell + cell1.whenSequentialMulti(List(completer2.cell), () => { + Outcome(completer2.cell.getResult(), completer2.cell.isComplete) // complete, if completer2 is completed + }) + + assert(cell1.numNextDependencies == 1) + assert(cell1.numTotalDependencies == 1) + + cell1.onNext { + case Success(x) => + assert((x === 8 && !cell1.isComplete) || x === 10) + latch1.countDown() + case Failure(e) => + assert(false) + latch1.countDown() + } + + cell1.onComplete { + case Success(x) => + assert(x === 10) + latch2.countDown() + case Failure(e) => + assert(false) + latch2.countDown() + } + + completer1.putNext(8) + latch1.await() + + assert(!cell1.isComplete) + + completer2.putFinal(10) + latch2.await() + + assert(cell1.isComplete) + + pool.onQuiescenceShutdown() + } + + + test("DefaultKey.resolve") { + implicit val pool = new HandlerPool + val k = new DefaultKey[Int] + val completer1 = CellCompleter[DefaultKey[Int], Int](k) + val completer2 = CellCompleter[DefaultKey[Int], Int](k) + completer1.cell.whenMulti(List(completer2.cell), () => NextOutcome(completer2.cell.getResult())) + completer2.cell.whenMulti(List(completer1.cell), () => NextOutcome(completer1.cell.getResult())) + completer1.putNext(5) + Await.ready(pool.quiescentResolveCycles, 2.seconds) + assert(completer1.cell.isComplete) + assert(completer2.cell.isComplete) + assert(completer1.cell.getResult() == 5) + assert(completer2.cell.getResult() == 5) + pool.shutdown() + } + + test("quiescent incomplete cells") { + implicit val pool = new HandlerPool + val completer1 = CellCompleter[StringIntKey, Int]("key1") + val completer2 = CellCompleter[StringIntKey, Int]("key2") + val cell1 = completer1.cell + val cell2 = completer2.cell + cell1.whenMulti(List(cell2), () => NoOutcome) + cell2.whenMulti(List(cell1), () => NoOutcome) + val incompleteFut = pool.quiescentIncompleteCells + val cells = Await.result(incompleteFut, 2.seconds) + assert(cells.map(_.key).toList.toString == "List(key1, key2)") + } + + test("quiescent resolve cycle") { + implicit val pool = new HandlerPool + val completer1 = CellCompleter[StringIntKey, Int]("key1") + val completer2 = CellCompleter[StringIntKey, Int]("key2") + val cell1 = completer1.cell + val cell2 = completer2.cell + cell1.whenMulti(List(cell2), () => NoOutcome) + cell2.whenMulti(List(cell1), () => NoOutcome) + val qfut = pool.quiescentResolveCell + Await.ready(qfut, 2.seconds) + val incompleteFut = pool.quiescentIncompleteCells + val cells = Await.result(incompleteFut, 2.seconds) + assert(cells.size == 0) + } + + test("whenComplete: cycle with additional incoming dep") { + sealed trait Value + case object Bottom extends Value + case object Resolved extends Value + case object Fallback extends Value + case object OK extends Value + case object ShouldNotHappen extends Value + + implicit object ValueUpdater extends Updater[Value] { + override def update(v1: Value, v2: Value): Value = v2 + override val initial: Value = Bottom + } + + object TheKey extends DefaultKey[Value] { + override def resolve[K <: Key[Value]](cells: Iterable[Cell[K, Value]]): Iterable[(Cell[K, Value], Value)] = { + cells.map(cell => (cell, Resolved)) + } + override def fallback[K <: Key[Value]](cells: Iterable[Cell[K, Value]]): Iterable[(Cell[K, Value], Value)] = { + cells.map(cell => (cell, Fallback)) + } + } + + implicit val pool = new HandlerPool + val completer1 = CellCompleter[TheKey.type, Value](TheKey) + val completer2 = CellCompleter[TheKey.type, Value](TheKey) + val cell1 = completer1.cell + val cell2 = completer2.cell + val in = CellCompleter[TheKey.type, Value](TheKey) + + // let `cell1` and `cell2` form a cycle + cell1.whenMulti(List(cell2), () => NextOutcome(ShouldNotHappen)) + cell2.whenMulti(List(cell1), () => NextOutcome(ShouldNotHappen)) + + // the cycle is dependent on incoming information from `in` + cell2.whenComplete(in.cell, v => { NextOutcome(ShouldNotHappen) }) + + // resolve the independent cell `in` and the cycle + val fut = pool.quiescentResolveCell + Await.ready(fut, 1.minutes) + + pool.onQuiescenceShutdown() + + assert(cell1.getResult() != ShouldNotHappen) + assert(cell2.getResult() != ShouldNotHappen) + assert(in.cell.getResult() == Fallback) + } + + test("whenSequential: calling sequentially") { + val n = 1000 + + val runningCallbacks = new AtomicInteger(0) + val latch = new CountDownLatch(1) + val random = new scala.util.Random() + + implicit val pool = new HandlerPool + val completer1 = CellCompleter[NaturalNumberKey.type, Int](NaturalNumberKey)(Updater.latticeToUpdater(new NaturalNumberLattice), pool) + val completer2 = CellCompleter[NaturalNumberKey.type, Int](NaturalNumberKey)(Updater.latticeToUpdater(new NaturalNumberLattice), pool) + + val cell1 = completer1.cell + cell1.whenSequentialMulti(List(completer2.cell), () => { + assert(runningCallbacks.incrementAndGet() == 1) + val x = completer2.cell.getResult() + Thread.`yield`() + try { + Thread.sleep(random.nextInt(3)) + } catch { + case _: InterruptedException => /* ignore */ + } + assert(runningCallbacks.decrementAndGet() == 0) + Outcome(x * n, x == n) + }) + + cell1.onComplete(_ => { + latch.countDown() + }) + + for (i <- 1 to n) + pool.execute(() => completer2.putNext(i)) + + latch.await() + + assert(cell1.getResult() == n * n) + assert(completer2.cell.getResult() == n) + + pool.onQuiescenceShutdown() + } + + test("whenSequential: state") { + // cell1 has deps to 1000 cells. All callbacks + // share a counter (i.e. state) that must not be + // incremented concurrently + val n = 1000 + var count = Set[Int]() + + class PowerSetLattice[T] extends Lattice[Set[T]] { + + def join(left: Set[T], right: Set[T]): Set[T] = + left ++ right + + val bottom: Set[T] = + Set[T]() + + } + + val theUpdater = Updater.latticeToUpdater(new PowerSetLattice[Int]) + + val latch = new CountDownLatch(1) + val random = new scala.util.Random() + + implicit val pool = new HandlerPool + val theKey = new DefaultKey[Set[Int]] + val completer1 = CellCompleter[DefaultKey[Set[Int]], Set[Int]](theKey)(theUpdater, pool) + val cell1 = completer1.cell + + cell1.onComplete(_ => { + latch.countDown() + }) + + for (i <- 1 to n) { + val completer2 = CellCompleter[DefaultKey[Set[Int]], Set[Int]](theKey)(theUpdater, pool) + val completer3 = CellCompleter[DefaultKey[Set[Int]], Set[Int]](theKey)(theUpdater, pool) + cell1.whenSequentialMulti(List(completer2.cell, completer3.cell), () => { + count = count ++ Set(count.size) + Thread.`yield`() + try { + Thread.sleep(random.nextInt(3)) + } catch { + case _: InterruptedException => /* ignore */ + } + Outcome(count, count.size == 2*n) + }) + pool.execute(() => completer2.putNext(Set(2*i))) + pool.execute(() => completer3.putNext(Set(2*i+1))) + } + + latch.await() + + assert(cell1.getResult().size == 2*n) + + pool.onQuiescenceShutdown() + } + + test("whenCompleteSequential: discard callbacks on completion") { + val latch1 = new CountDownLatch(1) + val latch2 = new CountDownLatch(1) + + implicit val pool = new HandlerPool + + val completer1 = CellCompleter[NaturalNumberKey.type, Int](NaturalNumberKey) + val completer2 = CellCompleter[NaturalNumberKey.type, Int](NaturalNumberKey) + val completer3 = CellCompleter[NaturalNumberKey.type, Int](NaturalNumberKey) + val cell1 = completer1.cell + val cell2 = completer2.cell + val cell3 = completer3.cell + cell1.trigger() + + cell1.whenSequentialMulti(List(cell2), () => { + latch1.await() // wait for some puts/triggers + FinalOutcome(10) + }) + cell1.whenSequentialMulti(List(cell3), () => NextOutcome(cell3.getResult())) + + completer2.putFinal(3) + completer3.putNext(2) + completer3.putNext(3) + latch1.countDown() + + pool.onQuiescent(() => { + pool.onQuiescenceShutdown() + latch2.countDown() + }) + // pool needs to reach quiescence, even if cell1 is completed early: + latch2.await() + + assert(cell1.getResult() == 10) + assert(cell2.getResult() == 3) + assert(cell3.getResult() == 3) + assert(cell1.isComplete) + assert(cell2.isComplete) + assert(!cell3.isComplete) + } + +}