From 56072a027bbbc3d3da2d769057da5332ff5d0f71 Mon Sep 17 00:00:00 2001 From: Andreas Muttscheller Date: Mon, 7 May 2018 14:23:06 +0200 Subject: [PATCH] Add methods to suspend HandlerPool --- .../com/phaller/rasync/HandlerPool.scala | 25 ++- .../com/phaller/rasync/test/PsSuite.scala | 187 ++++++++++++++++++ 2 files changed, 210 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/com/phaller/rasync/HandlerPool.scala b/core/src/main/scala/com/phaller/rasync/HandlerPool.scala index 595f279..0db794f 100644 --- a/core/src/main/scala/com/phaller/rasync/HandlerPool.scala +++ b/core/src/main/scala/com/phaller/rasync/HandlerPool.scala @@ -1,13 +1,12 @@ package com.phaller.rasync -import java.util.concurrent.ForkJoinPool +import java.util.concurrent.{ CountDownLatch, ForkJoinPool } import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec import scala.util.control.NonFatal import scala.concurrent.{ Await, Future, Promise } import scala.concurrent.duration._ - import lattice.{ DefaultKey, Key, Updater } import org.opalj.graphs._ @@ -28,6 +27,9 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable => private val cellsNotDone = new AtomicReference[Map[Cell[_, _], Queue[SequentialCallbackRunnable[_, _]]]](Map()) // use `values` to store all pending sequential triggers + @volatile private var suspendLatch = new CountDownLatch(1) + @volatile private var isSuspended = false + /** * Returns a new cell in this HandlerPool. * @@ -315,6 +317,9 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable => pool.execute(new Runnable { def run(): Unit = { try { + if (isSuspended) { + suspendLatch.await() + } task.run() } catch { case NonFatal(e) => @@ -444,4 +449,20 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable => def reportFailure(t: Throwable): Unit = t.printStackTrace() + + /** + * Suspend the computation of cells. This handler pool can be resumed using the `resume` method. + */ + def suspend(): Unit = { + isSuspended = true + } + + /** + * Resume the computation if the execution was suspended. Don't do anything if the execution was not suspended. + */ + def resume(): Unit = { + isSuspended = false + suspendLatch.countDown() + suspendLatch = new CountDownLatch(1) + } } diff --git a/core/src/test/scala/com/phaller/rasync/test/PsSuite.scala b/core/src/test/scala/com/phaller/rasync/test/PsSuite.scala index 9388be5..4ff4df8 100644 --- a/core/src/test/scala/com/phaller/rasync/test/PsSuite.scala +++ b/core/src/test/scala/com/phaller/rasync/test/PsSuite.scala @@ -1,6 +1,8 @@ package com.phaller.rasync package test +import java.util.concurrent.CountDownLatch + import lattice._ import org.scalatest.FunSuite @@ -103,4 +105,189 @@ class PsSuite extends FunSuite { Await.ready(fut, 2.seconds) } + test("HandlerPool must be able to suspend - concurrent") { + implicit val pool = new HandlerPool(parallelism = 8) + val completer1 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey()) + val completer2 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey()) + val cell1 = completer1.cell + val cell2 = completer2.cell + + cell2.whenNext(cell1, v => { + NextOutcome(v) + }) + + pool.suspend() + Thread.sleep(200) + completer1.putNext(10) + + assert(cell2.getResult() == 0) + + pool.resume() + + val fut = pool.quiescentResolveDefaults + Await.ready(fut, 2.seconds) + + assert(cell2.getResult() == 10) + } + + test("HandlerPool must be able to suspend - sequential") { + implicit val pool = new HandlerPool(parallelism = 8) + val completer1 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey()) + val completer2 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey()) + val cell1 = completer1.cell + val cell2 = completer2.cell + + cell2.whenNextSequential(cell1, v => { + NextOutcome(v) + }) + + pool.suspend() + Thread.sleep(200) + completer1.putNext(10) + + assert(cell2.getResult() == 0) + + pool.resume() + + val fut = pool.quiescentResolveDefaults + Await.ready(fut, 2.seconds) + + assert(cell2.getResult() == 10) + } + + test("HandlerPool must be able to suspend multiple tasks - concurrent") { + implicit val pool = new HandlerPool(parallelism = 8) + val completer1 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey()) + val completer2 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey()) + val completer3 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey()) + val cell1 = completer1.cell + val cell2 = completer2.cell + val cell3 = completer3.cell + + cell2.whenNext(cell1, v => { + NextOutcome(v) + }) + cell3.whenNext(cell1, v => { + NextOutcome(v) + }) + + pool.suspend() + Thread.sleep(200) + completer1.putNext(10) + + assert(cell2.getResult() == 0) + assert(cell3.getResult() == 0) + + pool.resume() + + val fut = pool.quiescentResolveDefaults + Await.ready(fut, 2.seconds) + + assert(cell2.getResult() == 10) + assert(cell3.getResult() == 10) + } + + // TODO Include test once performance/pull is merged. Currently no new task is scheduled for sequential callbacks + // when `completer1.putNext(10)` is called. It is executed in the same thread without scheduling a new task. + ignore("HandlerPool must be able to suspend multiple tasks - sequential") { + implicit val pool = new HandlerPool(parallelism = 8) + val completer1 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey()) + val completer2 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey()) + val completer3 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey()) + val cell1 = completer1.cell + val cell2 = completer2.cell + val cell3 = completer3.cell + + cell2.whenNextSequential(cell1, v => { + NextOutcome(v) + }) + cell3.whenNextSequential(cell1, v => { + NextOutcome(v) + }) + + pool.suspend() + Thread.sleep(200) + completer1.putNext(10) + + assert(cell2.getResult() == 0) + assert(cell3.getResult() == 0) + + pool.resume() + + val fut = pool.quiescentResolveDefaults + Await.ready(fut, 2.seconds) + + assert(cell2.getResult() == 10) + assert(cell3.getResult() == 10) + } + + test("HandlerPool must be able to suspend, but finish executing running tasks - concurrent") { + implicit val pool = new HandlerPool(parallelism = 8) + val completer1 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey()) + val completer2 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey()) + val cell1 = completer1.cell + val cell2 = completer2.cell + + val latch1 = new CountDownLatch(1) + val latch2 = new CountDownLatch(1) + + cell2.whenNext(cell1, v => { + latch1.countDown() + latch2.await() + NextOutcome(v) + }) + + completer1.putNext(10) + latch1.await() + pool.suspend() + latch2.countDown() + + Thread.sleep(50) // Wait for thread to save NextOutcome + + assert(cell2.getResult() == 10) + + completer1.putNext(20) + + pool.resume() + + val fut = pool.quiescentResolveDefaults + Await.ready(fut, 2.seconds) + + assert(cell2.getResult() == 20) + } + + test("HandlerPool must be able to suspend, but finish executing running tasks - sequential") { + implicit val pool = new HandlerPool(parallelism = 8) + val completer1 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey()) + val completer2 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey()) + val cell1 = completer1.cell + val cell2 = completer2.cell + + val latch1 = new CountDownLatch(1) + val latch2 = new CountDownLatch(1) + + cell2.whenNextSequential(cell1, v => { + latch1.countDown() + latch2.await() + NextOutcome(v) + }) + + completer1.putNext(10) + latch1.await() + pool.suspend() + latch2.countDown() + + Thread.sleep(50) // Wait for thread to save NextOutcome + + assert(cell2.getResult() == 10) + + completer1.putNext(20) + + pool.resume() + + val fut = pool.quiescentResolveDefaults + Await.ready(fut, 2.seconds) + + assert(cell2.getResult() == 20) + } }