11package cell
22
3- import java .util .concurrent .atomic .AtomicReference
3+ import java .util .concurrent .atomic .{ AtomicInteger , AtomicReference }
44import java .util .concurrent .{ CountDownLatch , ExecutionException }
55
66import scala .annotation .tailrec
@@ -106,6 +106,9 @@ trait Cell[K <: Key[V], V] {
106106
107107 private [cell] def removeCompleteCallbacks (cell : Cell [K , V ]): Unit
108108 private [cell] def removeNextCallbacks (cell : Cell [K , V ]): Unit
109+
110+ private [cell] def incIncomingCallbacks (): Int
111+ private [cell] def decIncomingCallbacks (): Int
109112}
110113
111114object Cell {
@@ -181,6 +184,7 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, lattice: L
181184 * Assumes that dependencies need to be kept until a final result is known.
182185 */
183186 private val state = new AtomicReference [AnyRef ](State .empty[K , V ](lattice))
187+ private val numIncomingCallbacks = new AtomicInteger (0 ) // Should this be included into `state`?
184188
185189 // `CellCompleter` and corresponding `Cell` are the same run-time object.
186190 override def cell : Cell [K , V ] = this
@@ -428,10 +432,10 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, lattice: L
428432 if (! state.compareAndSet(current, newState)) {
429433 tryNewState(value)
430434 } else {
431- // CAS was successful, so there was a point in time where `newVal` was in the cell
432- current.nextCallbacks.values.foreach { callbacks =>
433- callbacks.foreach(callback => callback.execute() )
434- }
435+ // If we came here via a direct putNext (instead of Outcome of a whenNextCallback)
436+ // this incoming change has not been counted. So we need to manually start outgoing callbacks.
437+ // (This might lead to duplicate invocation. )
438+ if (numIncomingCallbacks.get() == 0 ) triggerNextCallbacks() //
435439 true
436440 }
437441 } else true
@@ -468,28 +472,12 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, lattice: L
468472 res
469473
470474 case (pre : State [K , V ], newVal : Try [V ]) =>
475+ triggerCompleteCallbacks(pre)
476+
471477 val depsCells = pre.completeDeps
472478 val nextDepsCells = pre.nextDeps
473- if (pre.completeCallbacks.isEmpty) {
474- // call callbacks
475- pre.nextCallbacks.values.foreach { callbacks =>
476- callbacks.foreach(callback => callback.execute())
477- }
478- } else {
479- // onNext callbacks with these cells should not be triggered, because they have
480- // onComplete callbacks which are triggered instead.
481- pre.completeCallbacks.values.foreach { callbacks =>
482- callbacks.foreach(callback => callback.execute())
483- }
484- // call (concurrent) callbacks
485- pre.nextCallbacks.values.foreach { callbacks =>
486- callbacks.foreach { callback =>
487- if (! pre.completeCallbacks.contains(callback.dependentCell))
488- callback.execute()
489- }
490- }
491- }
492479
480+ // Other cells do not need to call as any more
493481 if (depsCells.nonEmpty)
494482 depsCells.foreach(_.removeCompleteCallbacks(this ))
495483 if (nextDepsCells.nonEmpty)
@@ -503,6 +491,42 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, lattice: L
503491 res
504492 }
505493
494+ private def triggerNextCallbacks (): Unit = {
495+ state.get() match {
496+ case _ : Try [_] => /* Meanwhile, cell has been completed. No need to trigger `next` callbacks any more. */
497+ case raw : State [_, _] =>
498+ val current = raw.asInstanceOf [State [K , V ]]
499+ current.nextCallbacks.values.foreach { callbacks =>
500+ callbacks.foreach(callback => callback.execute())
501+ }
502+ }
503+ }
504+
505+ // We need to pass the list of callbacks here, because they
506+ // might have already been deleted in `state` while the cell has
507+ // been set to its final value.
508+ private def triggerCompleteCallbacks (pre : State [K , V ]): Unit = {
509+ if (pre.completeCallbacks.isEmpty) {
510+ // call callbacks
511+ pre.nextCallbacks.values.foreach { callbacks =>
512+ callbacks.foreach(callback => callback.execute())
513+ }
514+ } else {
515+ // onNext callbacks with these cells should not be triggered, because they have
516+ // onComplete callbacks which are triggered instead.
517+ pre.completeCallbacks.values.foreach { callbacks =>
518+ callbacks.foreach(callback => callback.execute())
519+ }
520+ // call (concurrent) callbacks
521+ pre.nextCallbacks.values.foreach { callbacks =>
522+ callbacks.foreach { callback =>
523+ if (! pre.completeCallbacks.contains(callback.dependentCell))
524+ callback.execute()
525+ }
526+ }
527+ }
528+ }
529+
506530 @ tailrec
507531 override private [cell] final def removeDep (cell : Cell [K , V ]): Unit = {
508532 state.get() match {
@@ -516,7 +540,7 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, lattice: L
516540 else if (newDeps.isEmpty)
517541 nodepslatch.countDown()
518542
519- case _ => /* do nothing */
543+ case _ => /* `this` has been completed and therefore does not have any deps stored. */
520544 }
521545 }
522546
@@ -533,7 +557,7 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, lattice: L
533557 else if (newNextDeps.isEmpty)
534558 nonextdepslatch.countDown()
535559
536- case _ => /* do nothing */
560+ case _ => /* `this` has been completed and therefore does not have any deps stored. */
537561 }
538562 }
539563
@@ -643,4 +667,17 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, lattice: L
643667 case t => Failure (t)
644668 }
645669
670+ /** Called, when a CallbackRunnable r with r.dependentCell == this has been started. */
671+ override private [cell] def incIncomingCallbacks (): Int = {
672+ numIncomingCallbacks.incrementAndGet()
673+ }
674+
675+ /** Called, when a CallbackRunnable r with r.dependentCell == this has been completed.
676+ * Triggers all outgoing callbacks, if no more incoming callbacks are running.
677+ */
678+ override private [cell] def decIncomingCallbacks (): Int = {
679+ val newValue = numIncomingCallbacks.decrementAndGet()
680+ if (newValue == 0 ) triggerNextCallbacks()
681+ newValue
682+ }
646683}
0 commit comments