Skip to content

Commit 367f4bb

Browse files
committed
Add pool.onQuiescent(cell)
Fixes #89
1 parent 227c93d commit 367f4bb

File tree

2 files changed

+96
-7
lines changed

2 files changed

+96
-7
lines changed

core/src/main/scala/com/phaller/rasync/HandlerPool.scala

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,17 @@ import lattice.{ DefaultKey, Key, Updater }
1212
import org.opalj.graphs._
1313

1414
import scala.collection.immutable.Queue
15+
import scala.util.{ Success, Try }
1516

1617
/* Need to have reference equality for CAS.
18+
*
19+
* quiescenceCellHandlers use NEXTCallbackRunnable, because (a) pool might reach quiescence
20+
* repeatedly and (b) cell might not be completed, when quiescence is reached.
1721
*/
18-
private class PoolState(val handlers: List[() => Unit] = List(), val submittedTasks: Int = 0) {
22+
private class PoolState(
23+
val quiescenceHandlers: List[() => Unit] = List(),
24+
val quiescenceCellHandlers: List[NextCallbackRunnable[_, _]] = List(),
25+
val submittedTasks: Int = 0) {
1926
def isQuiescent(): Boolean =
2027
submittedTasks == 0
2128
}
@@ -63,13 +70,27 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable =>
6370
if (state.isQuiescent) {
6471
execute(new Runnable { def run(): Unit = handler() })
6572
} else {
66-
val newState = new PoolState(handler :: state.handlers, state.submittedTasks)
73+
val newState = new PoolState(handler :: state.quiescenceHandlers, state.quiescenceCellHandlers, state.submittedTasks)
6774
val success = poolState.compareAndSet(state, newState)
6875
if (!success)
6976
onQuiescent(handler)
7077
}
7178
}
7279

80+
@tailrec
81+
final def onQuiescent[K <: Key[V], V](cell: Cell[K, V])(handler: Try[V] => Unit): Unit = {
82+
val state = poolState.get()
83+
if (state.isQuiescent) {
84+
execute(new Runnable { def run(): Unit = handler(Success(cell.getResult())) })
85+
} else {
86+
val runnable = new NextConcurrentCallbackRunnable(this, null, cell, handler)
87+
val newState = new PoolState(state.quiescenceHandlers, runnable :: state.quiescenceCellHandlers, state.submittedTasks)
88+
val success = poolState.compareAndSet(state, newState)
89+
if (!success)
90+
onQuiescent(cell)(handler)
91+
}
92+
}
93+
7394
/**
7495
* Register a cell with this HandlerPool.
7596
*
@@ -265,7 +286,7 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable =>
265286
var submitSuccess = false
266287
while (!submitSuccess) {
267288
val state = poolState.get()
268-
val newState = new PoolState(state.handlers, state.submittedTasks + 1)
289+
val newState = new PoolState(state.quiescenceHandlers, state.quiescenceCellHandlers, state.submittedTasks + 1)
269290
submitSuccess = poolState.compareAndSet(state, newState)
270291
}
271292
}
@@ -281,10 +302,10 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable =>
281302
val state = poolState.get()
282303
if (state.submittedTasks > i) {
283304
handlersToRun = None
284-
val newState = new PoolState(state.handlers, state.submittedTasks - i)
305+
val newState = new PoolState(state.quiescenceHandlers, state.quiescenceCellHandlers, state.submittedTasks - i)
285306
success = poolState.compareAndSet(state, newState)
286307
} else if (state.submittedTasks == 1) {
287-
handlersToRun = Some(state.handlers)
308+
handlersToRun = Some(state.quiescenceHandlers)
288309
val newState = new PoolState()
289310
success = poolState.compareAndSet(state, newState)
290311
} else {

core/src/test/scala/com/phaller/rasync/test/pool.scala

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
package com.phaller.rasync
22
package test
33

4-
import java.util.concurrent.ConcurrentHashMap
4+
import java.util.concurrent.{ ConcurrentHashMap, CountDownLatch }
55

66
import org.scalatest.FunSuite
77

88
import scala.concurrent.{ Await, Promise }
99
import scala.concurrent.duration._
10-
import lattice.{ Lattice, StringIntKey, StringIntUpdater, Updater }
10+
11+
import scala.util.{ Failure, Success }
12+
import lattice.{ StringIntKey, StringIntUpdater, Updater }
1113

1214
class PoolSuite extends FunSuite {
1315
test("onQuiescent") {
@@ -69,4 +71,70 @@ class PoolSuite extends FunSuite {
6971
assert(regCells.size === 1000)
7072
}
7173

74+
test("onQuiescent(cell): incomplete cell") {
75+
val latch = new CountDownLatch(1)
76+
77+
val pool = new HandlerPool
78+
val completer1 = CellCompleter[StringIntKey, Int]("somekey")(new StringIntUpdater, pool)
79+
80+
var i = 0
81+
while (i < 10000) {
82+
val p1 = Promise[Boolean]()
83+
val p2 = Promise[Boolean]()
84+
pool.execute { () => { p1.success(true); () } }
85+
pool.onQuiescent { () => p2.success(true) }
86+
try {
87+
Await.result(p2.future, 1.seconds)
88+
} catch {
89+
case t: Throwable =>
90+
assert(false, s"failure after $i iterations")
91+
}
92+
i += 1
93+
}
94+
95+
pool.onQuiescent(completer1.cell) {
96+
case Success(x) =>
97+
assert(x === 0)
98+
latch.countDown()
99+
case Failure(_) => assert(false)
100+
}
101+
102+
latch.await()
103+
104+
pool.shutdown()
105+
}
106+
107+
test("onQuiescent(cell): completed cell") {
108+
val latch = new CountDownLatch(1)
109+
110+
val pool = new HandlerPool
111+
val completer1 = CellCompleter.completed[Int](10)(new StringIntUpdater, pool)
112+
113+
var i = 0
114+
while (i < 10000) {
115+
val p1 = Promise[Boolean]()
116+
val p2 = Promise[Boolean]()
117+
pool.execute { () => { p1.success(true); () } }
118+
pool.onQuiescent { () => p2.success(true) }
119+
try {
120+
Await.result(p2.future, 1.seconds)
121+
} catch {
122+
case t: Throwable =>
123+
assert(false, s"failure after $i iterations")
124+
}
125+
i += 1
126+
}
127+
128+
pool.onQuiescent(completer1.cell) {
129+
case Success(x) =>
130+
assert(x === 10)
131+
latch.countDown()
132+
case Failure(_) => assert(false)
133+
}
134+
135+
latch.await()
136+
137+
pool.shutdown()
138+
}
139+
72140
}

0 commit comments

Comments
 (0)