From 227323ae535744709449674e6667261be6c57db6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20K=C3=B6lzer?= Date: Fri, 9 Mar 2018 22:30:08 +0100 Subject: [PATCH] Avoid repeated invocation of callbacks with same value. Store (approximation) of last propagated value and compare. --- .../com/phaller/rasync/HandlerPool.scala | 54 +++++++++++-------- .../com/phaller/rasync/callbackRunnable.scala | 23 +++++++- .../scala/com/phaller/rasync/test/base.scala | 23 ++++---- 3 files changed, 64 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/com/phaller/rasync/HandlerPool.scala b/core/src/main/scala/com/phaller/rasync/HandlerPool.scala index 595f279..f1caec9 100644 --- a/core/src/main/scala/com/phaller/rasync/HandlerPool.scala +++ b/core/src/main/scala/com/phaller/rasync/HandlerPool.scala @@ -380,32 +380,40 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable => } private def callSequentialCallback[K <: Key[V], V](dependentCell: Cell[K, V]): Unit = { - pool.execute(() => { - val registered = cellsNotDone.get() - - // only call the callback, if the cell has not been completed - if (registered.contains(dependentCell)) { - val tasks = registered(dependentCell) - /* - Pop an element from the queue only if it is completely done! - That way, one can always start running sequential callbacks, if the list has been empty. - */ - val task = tasks.head // The queue must not be empty! Caller has to assert this. + val registered = cellsNotDone.get() - try { - task.run() - } catch { - case NonFatal(e) => - unhandledExceptionHandler(e) - } finally { - decSubmittedTasks() + // only call the callback, if the cell has not been completed + if (registered.contains(dependentCell)) { + val tasks = registered(dependentCell) + /* + Pop an element from the queue only if it is completely done! + That way, one can always start running sequential callbacks, if the list has been empty. + */ + val task = tasks.head // The queue must not be empty! Caller has to assert this. + + if (task.compareAndSetPropagatedValue()) + pool.execute(() => { + try { + task.run() + } catch { + case NonFatal(e) => + unhandledExceptionHandler(e) + } finally { + decSubmittedTasks() + + // The task has been run. Remove it. If the new list is not empty, callSequentialCallback(cell) + if (dequeueSequentialCallback(dependentCell).nonEmpty) + callSequentialCallback(dependentCell) + } + }) + else { + decSubmittedTasks() - // The task has been run. Remove it. If the new list is not empty, callSequentialCallback(cell) - if (dequeueSequentialCallback(dependentCell).nonEmpty) - callSequentialCallback(dependentCell) - } + // The task has been run. Remove it. If the new list is not empty, callSequentialCallback(cell) + if (dequeueSequentialCallback(dependentCell).nonEmpty) + callSequentialCallback(dependentCell) } - }) + } } /** diff --git a/core/src/main/scala/com/phaller/rasync/callbackRunnable.scala b/core/src/main/scala/com/phaller/rasync/callbackRunnable.scala index bf1646a..0dc44ad 100644 --- a/core/src/main/scala/com/phaller/rasync/callbackRunnable.scala +++ b/core/src/main/scala/com/phaller/rasync/callbackRunnable.scala @@ -1,5 +1,7 @@ package com.phaller.rasync +import java.util.concurrent.atomic.AtomicReference + import lattice.Key import scala.concurrent.OnCompleteRunnable @@ -25,6 +27,22 @@ private[rasync] trait CallbackRunnable[K <: Key[V], V] extends Runnable with OnC /** Essentially, call the callback. */ override def run(): Unit + + private val lastPropagatedValue: AtomicReference[Option[V]] = new AtomicReference[Option[V]](None) + + /** Returns TRUE, if `otherCell`'s value has changed since the last invocation. */ + private[rasync] def compareAndSetPropagatedValue(): Boolean = { + // Use CAS to store (anapproximation of) the value that will be propagated. + // (otherCell.getResult() might change in the meantime, but then this method + // returns TRUE anyway, so the callback is going to be called. + var oldV: Option[V] = null // we could use bottom here, if otherCell.updater was accessible. + do { + oldV = lastPropagatedValue.get() + } while (!lastPropagatedValue.compareAndSet(oldV, Some(otherCell.getResult()))) + + // Is the value that would be propagated an improvement? + !oldV.contains(otherCell.getResult()) + } } /** @@ -34,8 +52,9 @@ private[rasync] trait CallbackRunnable[K <: Key[V], V] extends Runnable with OnC private[rasync] trait ConcurrentCallbackRunnable[K <: Key[V], V] extends CallbackRunnable[K, V] { /** Add this CallbackRunnable to its handler pool such that it is run concurrently. */ def execute(): Unit = - try pool.execute(this) - catch { case NonFatal(t) => pool reportFailure t } + if (compareAndSetPropagatedValue()) + try pool.execute(this) + catch { case NonFatal(t) => pool reportFailure t } } /** diff --git a/core/src/test/scala/com/phaller/rasync/test/base.scala b/core/src/test/scala/com/phaller/rasync/test/base.scala index 4abf9fb..faece98 100644 --- a/core/src/test/scala/com/phaller/rasync/test/base.scala +++ b/core/src/test/scala/com/phaller/rasync/test/base.scala @@ -2293,20 +2293,20 @@ class BaseSuite extends FunSuite { test("whenNextSequential: state") { val n = 1000 - var count = 0 + var max = 0 val runningCallbacks = new AtomicInteger(0) val latch = new CountDownLatch(1) val random = new scala.util.Random() implicit val pool = new HandlerPool - val completer1 = CellCompleter[StringIntKey, Int]("somekey") - val completer2 = CellCompleter[StringIntKey, Int]("someotherkey") + val completer1 = CellCompleter[lattice.NaturalNumberKey.type, Int](lattice.NaturalNumberKey)(Updater.latticeToUpdater(new NaturalNumberLattice), pool) + val completer2 = CellCompleter[lattice.NaturalNumberKey.type, Int](lattice.NaturalNumberKey)(Updater.latticeToUpdater(new NaturalNumberLattice), pool) val cell1 = completer1.cell cell1.whenNextSequential(completer2.cell, (x: Int) => { assert(runningCallbacks.incrementAndGet() == 1) - count += 1 + max = Math.max(max, x) Thread.`yield`() try { Thread.sleep(random.nextInt(3)) @@ -2314,7 +2314,7 @@ class BaseSuite extends FunSuite { case _: InterruptedException => /* ignore */ } assert(runningCallbacks.decrementAndGet() == 0) - Outcome(count, count == n) + Outcome(max, max == n) }) cell1.onComplete(_ => { @@ -2327,13 +2327,14 @@ class BaseSuite extends FunSuite { latch.await() assert(cell1.getResult() == n) + assert(max == n) - pool.onQuiescenceShutdown() + pool.onQuiescent(() => pool.onQuiescenceShutdown()) } test("whenCompleteSequential: state") { val n = 1000 - var count = 0 + var max = 0 val runningCallbacks = new AtomicInteger(0) val latch = new CountDownLatch(1) @@ -2341,11 +2342,11 @@ class BaseSuite extends FunSuite { val random = new scala.util.Random() implicit val pool = new HandlerPool - val completer1 = CellCompleter[StringIntKey, Int]("somekey") + val completer1 = CellCompleter[lattice.NaturalNumberKey.type, Int](lattice.NaturalNumberKey)(Updater.latticeToUpdater(new NaturalNumberLattice), pool) val cell1 = completer1.cell for (i <- 1 to n) { - val completer2 = CellCompleter[StringIntKey, Int]("someotherkey") + val completer2 = CellCompleter[lattice.NaturalNumberKey.type, Int](lattice.NaturalNumberKey)(Updater.latticeToUpdater(new NaturalNumberLattice), pool) val latch2 = new CountDownLatch(1) otherLatches = otherLatches + latch2 @@ -2356,7 +2357,7 @@ class BaseSuite extends FunSuite { cell1.whenNextSequential(completer2.cell, (x: Int) => { assert(runningCallbacks.incrementAndGet() == 1) - count += 1 + max = Math.max(max, x) Thread.`yield`() try { Thread.sleep(random.nextInt(3)) @@ -2364,7 +2365,7 @@ class BaseSuite extends FunSuite { case _: InterruptedException => /* ignore */ } assert(runningCallbacks.decrementAndGet() == 0) - Outcome(count, count == n) + Outcome(max, max == n) }) cell1.onComplete(_ => {