Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 31 additions & 23 deletions core/src/main/scala/com/phaller/rasync/HandlerPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -380,32 +380,40 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable =>
}

private def callSequentialCallback[K <: Key[V], V](dependentCell: Cell[K, V]): Unit = {
pool.execute(() => {
val registered = cellsNotDone.get()

// only call the callback, if the cell has not been completed
if (registered.contains(dependentCell)) {
val tasks = registered(dependentCell)
/*
Pop an element from the queue only if it is completely done!
That way, one can always start running sequential callbacks, if the list has been empty.
*/
val task = tasks.head // The queue must not be empty! Caller has to assert this.
val registered = cellsNotDone.get()

try {
task.run()
} catch {
case NonFatal(e) =>
unhandledExceptionHandler(e)
} finally {
decSubmittedTasks()
// only call the callback, if the cell has not been completed
if (registered.contains(dependentCell)) {
val tasks = registered(dependentCell)
/*
Pop an element from the queue only if it is completely done!
That way, one can always start running sequential callbacks, if the list has been empty.
*/
val task = tasks.head // The queue must not be empty! Caller has to assert this.

if (task.compareAndSetPropagatedValue())
pool.execute(() => {
try {
task.run()
} catch {
case NonFatal(e) =>
unhandledExceptionHandler(e)
} finally {
decSubmittedTasks()

// The task has been run. Remove it. If the new list is not empty, callSequentialCallback(cell)
if (dequeueSequentialCallback(dependentCell).nonEmpty)
callSequentialCallback(dependentCell)
}
})
else {
decSubmittedTasks()

// The task has been run. Remove it. If the new list is not empty, callSequentialCallback(cell)
if (dequeueSequentialCallback(dependentCell).nonEmpty)
callSequentialCallback(dependentCell)
}
// The task has been run. Remove it. If the new list is not empty, callSequentialCallback(cell)
if (dequeueSequentialCallback(dependentCell).nonEmpty)
callSequentialCallback(dependentCell)
}
})
}
}

/**
Expand Down
23 changes: 21 additions & 2 deletions core/src/main/scala/com/phaller/rasync/callbackRunnable.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.phaller.rasync

import java.util.concurrent.atomic.AtomicReference

import lattice.Key

import scala.concurrent.OnCompleteRunnable
Expand All @@ -25,6 +27,22 @@ private[rasync] trait CallbackRunnable[K <: Key[V], V] extends Runnable with OnC

/** Essentially, call the callback. */
override def run(): Unit

private val lastPropagatedValue: AtomicReference[Option[V]] = new AtomicReference[Option[V]](None)

/** Returns TRUE, if `otherCell`'s value has changed since the last invocation. */
private[rasync] def compareAndSetPropagatedValue(): Boolean = {
// Use CAS to store (anapproximation of) the value that will be propagated.
// (otherCell.getResult() might change in the meantime, but then this method
// returns TRUE anyway, so the callback is going to be called.
var oldV: Option[V] = null // we could use bottom here, if otherCell.updater was accessible.
do {
oldV = lastPropagatedValue.get()
} while (!lastPropagatedValue.compareAndSet(oldV, Some(otherCell.getResult())))

// Is the value that would be propagated an improvement?
!oldV.contains(otherCell.getResult())
}
}

/**
Expand All @@ -34,8 +52,9 @@ private[rasync] trait CallbackRunnable[K <: Key[V], V] extends Runnable with OnC
private[rasync] trait ConcurrentCallbackRunnable[K <: Key[V], V] extends CallbackRunnable[K, V] {
/** Add this CallbackRunnable to its handler pool such that it is run concurrently. */
def execute(): Unit =
try pool.execute(this)
catch { case NonFatal(t) => pool reportFailure t }
if (compareAndSetPropagatedValue())
try pool.execute(this)
catch { case NonFatal(t) => pool reportFailure t }
}

/**
Expand Down
23 changes: 12 additions & 11 deletions core/src/test/scala/com/phaller/rasync/test/base.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2293,28 +2293,28 @@ class BaseSuite extends FunSuite {

test("whenNextSequential: state") {
val n = 1000
var count = 0
var max = 0

val runningCallbacks = new AtomicInteger(0)
val latch = new CountDownLatch(1)
val random = new scala.util.Random()

implicit val pool = new HandlerPool
val completer1 = CellCompleter[StringIntKey, Int]("somekey")
val completer2 = CellCompleter[StringIntKey, Int]("someotherkey")
val completer1 = CellCompleter[lattice.NaturalNumberKey.type, Int](lattice.NaturalNumberKey)(Updater.latticeToUpdater(new NaturalNumberLattice), pool)
val completer2 = CellCompleter[lattice.NaturalNumberKey.type, Int](lattice.NaturalNumberKey)(Updater.latticeToUpdater(new NaturalNumberLattice), pool)

val cell1 = completer1.cell
cell1.whenNextSequential(completer2.cell, (x: Int) => {
assert(runningCallbacks.incrementAndGet() == 1)
count += 1
max = Math.max(max, x)
Thread.`yield`()
try {
Thread.sleep(random.nextInt(3))
} catch {
case _: InterruptedException => /* ignore */
}
assert(runningCallbacks.decrementAndGet() == 0)
Outcome(count, count == n)
Outcome(max, max == n)
})

cell1.onComplete(_ => {
Expand All @@ -2327,25 +2327,26 @@ class BaseSuite extends FunSuite {
latch.await()

assert(cell1.getResult() == n)
assert(max == n)

pool.onQuiescenceShutdown()
pool.onQuiescent(() => pool.onQuiescenceShutdown())
}

test("whenCompleteSequential: state") {
val n = 1000
var count = 0
var max = 0

val runningCallbacks = new AtomicInteger(0)
val latch = new CountDownLatch(1)
var otherLatches: Set[CountDownLatch] = Set.empty
val random = new scala.util.Random()

implicit val pool = new HandlerPool
val completer1 = CellCompleter[StringIntKey, Int]("somekey")
val completer1 = CellCompleter[lattice.NaturalNumberKey.type, Int](lattice.NaturalNumberKey)(Updater.latticeToUpdater(new NaturalNumberLattice), pool)
val cell1 = completer1.cell

for (i <- 1 to n) {
val completer2 = CellCompleter[StringIntKey, Int]("someotherkey")
val completer2 = CellCompleter[lattice.NaturalNumberKey.type, Int](lattice.NaturalNumberKey)(Updater.latticeToUpdater(new NaturalNumberLattice), pool)

val latch2 = new CountDownLatch(1)
otherLatches = otherLatches + latch2
Expand All @@ -2356,15 +2357,15 @@ class BaseSuite extends FunSuite {

cell1.whenNextSequential(completer2.cell, (x: Int) => {
assert(runningCallbacks.incrementAndGet() == 1)
count += 1
max = Math.max(max, x)
Thread.`yield`()
try {
Thread.sleep(random.nextInt(3))
} catch {
case _: InterruptedException => /* ignore */
}
assert(runningCallbacks.decrementAndGet() == 0)
Outcome(count, count == n)
Outcome(max, max == n)
})

cell1.onComplete(_ => {
Expand Down