Skip to content

Commit 02f03bc

Browse files
committed
Add smart propagation prevention
Do not inform dependent cells about updates, if another update is in sight.
1 parent 15891d4 commit 02f03bc

File tree

2 files changed

+60
-13
lines changed

2 files changed

+60
-13
lines changed

core/src/main/scala/cell/Cell.scala

Lines changed: 50 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package cell
22

3-
import java.util.concurrent.atomic.AtomicReference
3+
import java.util.concurrent.atomic.{ AtomicInteger, AtomicReference }
44
import java.util.concurrent.{ CountDownLatch, ExecutionException }
55

66
import scala.annotation.tailrec
@@ -111,6 +111,9 @@ trait Cell[K <: Key[V], V] {
111111

112112
def removeCompleteCallbacks(cell: Cell[K, V]): Unit
113113
def removeNextCallbacks(cell: Cell[K, V]): Unit
114+
115+
private[cell] def incIncomingCallbacks(): Int
116+
private[cell] def decIncomingCallbacks(): Int
114117
}
115118

116119
object Cell {
@@ -189,6 +192,7 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
189192
* Assumes that dependencies need to be kept until a final result is known.
190193
*/
191194
private val state = new AtomicReference[AnyRef](State.empty[K, V](updater))
195+
private val numIncomingCallbacks = new AtomicInteger(0) // Should this be included into `state`?
192196

193197
// `CellCompleter` and corresponding `Cell` are the same run-time object.
194198
override def cell: Cell[K, V] = this
@@ -439,10 +443,10 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
439443
if (!state.compareAndSet(current, newState)) {
440444
tryNewState(value)
441445
} else {
442-
// CAS was successful, so there was a point in time where `newVal` was in the cell
443-
current.nextCallbacks.values.foreach { callbacks =>
444-
callbacks.foreach(callback => callback.execute())
445-
}
446+
// If we came here via a direct putNext (instead of Outcome of a whenNextCallback)
447+
// this incoming change has not been counted. So we need to manually start outgoing callbacks.
448+
// (This might lead to duplicate invocation.)
449+
if (numIncomingCallbacks.get() == 0) triggerNextCallbacks()
446450
true
447451
}
448452
} else true
@@ -479,16 +483,12 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
479483
res
480484

481485
case (pre: State[K, V], newVal: Try[V]) =>
482-
pre.nextCallbacks.values.foreach { callbacks =>
483-
callbacks.foreach(callback => callback.execute())
484-
}
485-
pre.completeCallbacks.values.foreach { callbacks =>
486-
callbacks.foreach(callback => callback.execute())
487-
}
486+
triggerCompleteCallbacks(pre)
488487

489488
val depsCells = pre.completeDeps
490489
val nextDepsCells = pre.nextDeps
491490

491+
// Other cells do not need to call us any more
492492
if (depsCells.nonEmpty)
493493
depsCells.foreach(_.removeCompleteCallbacks(this))
494494
if (nextDepsCells.nonEmpty)
@@ -502,6 +502,29 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
502502
res
503503
}
504504

505+
// We need to pass the list of callbacks here, because they
506+
// might have already been deleted in `state` while the cell has
507+
// been set to its final value.
508+
private def triggerCompleteCallbacks(pre: State[K, V]): Unit = {
509+
pre.nextCallbacks.values.foreach { callbacks =>
510+
callbacks.foreach(callback => callback.execute())
511+
}
512+
pre.completeCallbacks.values.foreach { callbacks =>
513+
callbacks.foreach(callback => callback.execute())
514+
}
515+
}
516+
517+
private def triggerNextCallbacks(): Unit = {
518+
state.get() match {
519+
case _: Try[_] => /* Meanwhile, cell has been completed. No need to trigger `next` callbacks any more. */
520+
case raw: State[_, _] =>
521+
val current = raw.asInstanceOf[State[K, V]]
522+
current.nextCallbacks.values.foreach { callbacks =>
523+
callbacks.foreach(callback => callback.execute())
524+
}
525+
}
526+
}
527+
505528
@tailrec
506529
override private[cell] final def removeDep(cell: Cell[K, V]): Unit = {
507530
state.get() match {
@@ -515,7 +538,7 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
515538
else if (newDeps.isEmpty)
516539
nodepslatch.countDown()
517540

518-
case _ => /* do nothing */
541+
case _ => /* `this` has been completed and therefore does not have any deps stored. */
519542
}
520543
}
521544

@@ -532,7 +555,7 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
532555
else if (newNextDeps.isEmpty)
533556
nonextdepslatch.countDown()
534557

535-
case _ => /* do nothing */
558+
case _ => /* `this` has been completed and therefore does not have any deps stored. */
536559
}
537560
}
538561

@@ -666,4 +689,18 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
666689
case t => Failure(t)
667690
}
668691

692+
/** Called, when a CallbackRunnable r with r.dependentCell == this has been started. */
693+
override private[cell] def incIncomingCallbacks(): Int = {
694+
numIncomingCallbacks.incrementAndGet()
695+
}
696+
697+
/**
698+
* Called, when a CallbackRunnable r with r.dependentCell == this has been completed.
699+
* Triggers all outgoing callbacks, if no more incoming callbacks are running.
700+
*/
701+
override private[cell] def decIncomingCallbacks(): Int = {
702+
val newValue = numIncomingCallbacks.decrementAndGet()
703+
if (newValue == 0) triggerNextCallbacks()
704+
newValue
705+
}
669706
}

core/src/main/scala/cell/callbackRunnable.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ private[cell] trait Dependency[K <: Key[V], V] {
6565

6666
/**
6767
* To be run when `otherCell` gets its final update.
68+
*
6869
* @param pool The handler pool that runs the callback function
6970
* @param dependentCell The cell, that depends on `otherCell`.
7071
* @param otherCell Cell that triggers this callback.
@@ -83,7 +84,12 @@ private[cell] abstract class CompleteCallbackRunnable[K <: Key[V], V](
8384
def run(): Unit = {
8485
require(!started) // can't complete it twice
8586
started = true
87+
88+
if (dependentCell != null) // dependentCell == null for oncomplete callbacks
89+
dependentCell.incIncomingCallbacks()
8690
callback(Success(otherCell.getResult()))
91+
if (dependentCell != null)
92+
dependentCell.decIncomingCallbacks()
8793
}
8894
}
8995

@@ -166,7 +172,11 @@ private[cell] abstract class NextCallbackRunnable[K <: Key[V], V](
166172
extends CallbackRunnable[K, V] {
167173

168174
def run(): Unit = {
175+
if (dependentCell != null) // dependetCell == null for onnext callbacks
176+
dependentCell.incIncomingCallbacks()
169177
callback(Success(otherCell.getResult()))
178+
if (dependentCell != null)
179+
dependentCell.decIncomingCallbacks()
170180
}
171181
}
172182

0 commit comments

Comments
 (0)