Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions core/src/main/scala/com/phaller/rasync/HandlerPool.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package com.phaller.rasync

import java.util.concurrent.ForkJoinPool
import java.util.concurrent.{ CountDownLatch, ForkJoinPool }
import java.util.concurrent.atomic.AtomicReference

import scala.annotation.tailrec
import scala.util.control.NonFatal
import scala.concurrent.{ Await, Future, Promise }
import scala.concurrent.duration._

import lattice.{ DefaultKey, Key, Updater }
import org.opalj.graphs._

Expand All @@ -28,6 +27,9 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable =>

private val cellsNotDone = new AtomicReference[Map[Cell[_, _], Queue[SequentialCallbackRunnable[_, _]]]](Map()) // use `values` to store all pending sequential triggers

@volatile private var suspendLatch = new CountDownLatch(1)
@volatile private var isSuspended = false

/**
* Returns a new cell in this HandlerPool.
*
Expand Down Expand Up @@ -315,6 +317,9 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable =>
pool.execute(new Runnable {
def run(): Unit = {
try {
if (isSuspended) {
suspendLatch.await()
}
task.run()
} catch {
case NonFatal(e) =>
Expand Down Expand Up @@ -444,4 +449,20 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable =>

def reportFailure(t: Throwable): Unit =
t.printStackTrace()

/**
* Suspend the computation of cells. This handler pool can be resumed using the `resume` method.
*/
def suspend(): Unit = {
isSuspended = true
}

/**
* Resume the computation if the execution was suspended. Don't do anything if the execution was not suspended.
*/
def resume(): Unit = {
isSuspended = false
suspendLatch.countDown()
suspendLatch = new CountDownLatch(1)
}
}
187 changes: 187 additions & 0 deletions core/src/test/scala/com/phaller/rasync/test/PsSuite.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.phaller.rasync
package test

import java.util.concurrent.CountDownLatch

import lattice._
import org.scalatest.FunSuite

Expand Down Expand Up @@ -103,4 +105,189 @@ class PsSuite extends FunSuite {
Await.ready(fut, 2.seconds)
}

test("HandlerPool must be able to suspend - concurrent") {
implicit val pool = new HandlerPool(parallelism = 8)
val completer1 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey())
val completer2 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey())
val cell1 = completer1.cell
val cell2 = completer2.cell

cell2.whenNext(cell1, v => {
NextOutcome(v)
})

pool.suspend()
Thread.sleep(200)
completer1.putNext(10)

assert(cell2.getResult() == 0)

pool.resume()

val fut = pool.quiescentResolveDefaults
Await.ready(fut, 2.seconds)

assert(cell2.getResult() == 10)
}

test("HandlerPool must be able to suspend - sequential") {
implicit val pool = new HandlerPool(parallelism = 8)
val completer1 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey())
val completer2 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey())
val cell1 = completer1.cell
val cell2 = completer2.cell

cell2.whenNextSequential(cell1, v => {
NextOutcome(v)
})

pool.suspend()
Thread.sleep(200)
completer1.putNext(10)

assert(cell2.getResult() == 0)

pool.resume()

val fut = pool.quiescentResolveDefaults
Await.ready(fut, 2.seconds)

assert(cell2.getResult() == 10)
}

test("HandlerPool must be able to suspend multiple tasks - concurrent") {
implicit val pool = new HandlerPool(parallelism = 8)
val completer1 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey())
val completer2 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey())
val completer3 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey())
val cell1 = completer1.cell
val cell2 = completer2.cell
val cell3 = completer3.cell

cell2.whenNext(cell1, v => {
NextOutcome(v)
})
cell3.whenNext(cell1, v => {
NextOutcome(v)
})

pool.suspend()
Thread.sleep(200)
completer1.putNext(10)

assert(cell2.getResult() == 0)
assert(cell3.getResult() == 0)

pool.resume()

val fut = pool.quiescentResolveDefaults
Await.ready(fut, 2.seconds)

assert(cell2.getResult() == 10)
assert(cell3.getResult() == 10)
}

// TODO Include test once performance/pull is merged. Currently no new task is scheduled for sequential callbacks
// when `completer1.putNext(10)` is called. It is executed in the same thread without scheduling a new task.
ignore("HandlerPool must be able to suspend multiple tasks - sequential") {
implicit val pool = new HandlerPool(parallelism = 8)
val completer1 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey())
val completer2 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey())
val completer3 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey())
val cell1 = completer1.cell
val cell2 = completer2.cell
val cell3 = completer3.cell

cell2.whenNextSequential(cell1, v => {
NextOutcome(v)
})
cell3.whenNextSequential(cell1, v => {
NextOutcome(v)
})

pool.suspend()
Thread.sleep(200)
completer1.putNext(10)

assert(cell2.getResult() == 0)
assert(cell3.getResult() == 0)

pool.resume()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to add a test case where more than one task is interrupted. Also, there should be a test case, where only one of two tasks is interrupted, because the interrupt is too late for the first task.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more tests


val fut = pool.quiescentResolveDefaults
Await.ready(fut, 2.seconds)

assert(cell2.getResult() == 10)
assert(cell3.getResult() == 10)
}

test("HandlerPool must be able to suspend, but finish executing running tasks - concurrent") {
implicit val pool = new HandlerPool(parallelism = 8)
val completer1 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey())
val completer2 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey())
val cell1 = completer1.cell
val cell2 = completer2.cell

val latch1 = new CountDownLatch(1)
val latch2 = new CountDownLatch(1)

cell2.whenNext(cell1, v => {
latch1.countDown()
latch2.await()
NextOutcome(v)
})

completer1.putNext(10)
latch1.await()
pool.suspend()
latch2.countDown()

Thread.sleep(50) // Wait for thread to save NextOutcome

assert(cell2.getResult() == 10)

completer1.putNext(20)

pool.resume()

val fut = pool.quiescentResolveDefaults
Await.ready(fut, 2.seconds)

assert(cell2.getResult() == 20)
}

test("HandlerPool must be able to suspend, but finish executing running tasks - sequential") {
implicit val pool = new HandlerPool(parallelism = 8)
val completer1 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey())
val completer2 = CellCompleter[ReactivePropertyStoreKey, Int](new ReactivePropertyStoreKey())
val cell1 = completer1.cell
val cell2 = completer2.cell

val latch1 = new CountDownLatch(1)
val latch2 = new CountDownLatch(1)

cell2.whenNextSequential(cell1, v => {
latch1.countDown()
latch2.await()
NextOutcome(v)
})

completer1.putNext(10)
latch1.await()
pool.suspend()
latch2.countDown()

Thread.sleep(50) // Wait for thread to save NextOutcome

assert(cell2.getResult() == 10)

completer1.putNext(20)

pool.resume()

val fut = pool.quiescentResolveDefaults
Await.ready(fut, 2.seconds)

assert(cell2.getResult() == 20)
}
}