Skip to content

Commit 5808dbd

Browse files
committed
Register set of deps
1 parent 6102d22 commit 5808dbd

File tree

1 file changed

+341
-0
lines changed

1 file changed

+341
-0
lines changed
Lines changed: 341 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,341 @@
1+
package com.phaller.rasync
2+
package test
3+
4+
import java.util.concurrent.atomic.AtomicInteger
5+
import java.util.concurrent.{CountDownLatch, TimeUnit}
6+
7+
import com.phaller.rasync.lattice._
8+
import com.phaller.rasync.lattice.lattices.{NaturalNumberKey, NaturalNumberLattice}
9+
import com.phaller.rasync.test.lattice._
10+
import org.scalatest.FunSuite
11+
12+
import scala.concurrent.duration._
13+
import scala.concurrent.{Await, Promise}
14+
import scala.util.{Failure, Success}
15+
16+
class WhenMultiSuite extends FunSuite {
17+
18+
implicit val stringIntUpdater: Updater[Int] = new StringIntUpdater
19+
20+
test("when: values passed to callback") {
21+
val latch1 = new CountDownLatch(1)
22+
val latch2 = new CountDownLatch(1)
23+
24+
implicit val pool = new HandlerPool
25+
val completer1 = CellCompleter[StringIntKey, Int]("somekey")
26+
val completer2 = CellCompleter[StringIntKey, Int]("someotherkey")
27+
28+
val cell1 = completer1.cell
29+
cell1.whenMulti(List(completer2.cell), () => {
30+
Outcome(completer2.cell.getResult(), completer2.cell.isComplete) // complete, if completer2 is completed
31+
})
32+
33+
assert(cell1.numNextDependencies == 1)
34+
assert(cell1.numTotalDependencies == 1)
35+
36+
cell1.onNext {
37+
case Success(x) =>
38+
assert((x === 8 && !cell1.isComplete) || x === 10)
39+
latch1.countDown()
40+
case Failure(e) =>
41+
assert(false)
42+
latch1.countDown()
43+
}
44+
45+
cell1.onComplete {
46+
case Success(x) =>
47+
assert(x === 10)
48+
latch2.countDown()
49+
case Failure(e) =>
50+
assert(false)
51+
latch2.countDown()
52+
}
53+
54+
completer1.putNext(8)
55+
latch1.await()
56+
57+
assert(!cell1.isComplete)
58+
59+
completer2.putFinal(10)
60+
latch2.await()
61+
62+
assert(cell1.isComplete)
63+
64+
pool.onQuiescenceShutdown()
65+
}
66+
67+
test("whenSequential: values passed to callback") {
68+
val latch1 = new CountDownLatch(1)
69+
val latch2 = new CountDownLatch(1)
70+
71+
implicit val pool = new HandlerPool
72+
val completer1 = CellCompleter[StringIntKey, Int]("somekey")
73+
val completer2 = CellCompleter[StringIntKey, Int]("someotherkey")
74+
75+
val cell1 = completer1.cell
76+
cell1.whenSequentialMulti(List(completer2.cell), () => {
77+
Outcome(completer2.cell.getResult(), completer2.cell.isComplete) // complete, if completer2 is completed
78+
})
79+
80+
assert(cell1.numNextDependencies == 1)
81+
assert(cell1.numTotalDependencies == 1)
82+
83+
cell1.onNext {
84+
case Success(x) =>
85+
assert((x === 8 && !cell1.isComplete) || x === 10)
86+
latch1.countDown()
87+
case Failure(e) =>
88+
assert(false)
89+
latch1.countDown()
90+
}
91+
92+
cell1.onComplete {
93+
case Success(x) =>
94+
assert(x === 10)
95+
latch2.countDown()
96+
case Failure(e) =>
97+
assert(false)
98+
latch2.countDown()
99+
}
100+
101+
completer1.putNext(8)
102+
latch1.await()
103+
104+
assert(!cell1.isComplete)
105+
106+
completer2.putFinal(10)
107+
latch2.await()
108+
109+
assert(cell1.isComplete)
110+
111+
pool.onQuiescenceShutdown()
112+
}
113+
114+
115+
test("DefaultKey.resolve") {
116+
implicit val pool = new HandlerPool
117+
val k = new DefaultKey[Int]
118+
val completer1 = CellCompleter[DefaultKey[Int], Int](k)
119+
val completer2 = CellCompleter[DefaultKey[Int], Int](k)
120+
completer1.cell.whenMulti(List(completer2.cell), () => NextOutcome(completer2.cell.getResult()))
121+
completer2.cell.whenMulti(List(completer1.cell), () => NextOutcome(completer1.cell.getResult()))
122+
completer1.putNext(5)
123+
Await.ready(pool.quiescentResolveCycles, 2.seconds)
124+
assert(completer1.cell.isComplete)
125+
assert(completer2.cell.isComplete)
126+
assert(completer1.cell.getResult() == 5)
127+
assert(completer2.cell.getResult() == 5)
128+
pool.shutdown()
129+
}
130+
131+
test("quiescent incomplete cells") {
132+
implicit val pool = new HandlerPool
133+
val completer1 = CellCompleter[StringIntKey, Int]("key1")
134+
val completer2 = CellCompleter[StringIntKey, Int]("key2")
135+
val cell1 = completer1.cell
136+
val cell2 = completer2.cell
137+
cell1.whenMulti(List(cell2), () => NoOutcome)
138+
cell2.whenMulti(List(cell1), () => NoOutcome)
139+
val incompleteFut = pool.quiescentIncompleteCells
140+
val cells = Await.result(incompleteFut, 2.seconds)
141+
assert(cells.map(_.key).toList.toString == "List(key1, key2)")
142+
}
143+
144+
test("quiescent resolve cycle") {
145+
implicit val pool = new HandlerPool
146+
val completer1 = CellCompleter[StringIntKey, Int]("key1")
147+
val completer2 = CellCompleter[StringIntKey, Int]("key2")
148+
val cell1 = completer1.cell
149+
val cell2 = completer2.cell
150+
cell1.whenMulti(List(cell2), () => NoOutcome)
151+
cell2.whenMulti(List(cell1), () => NoOutcome)
152+
val qfut = pool.quiescentResolveCell
153+
Await.ready(qfut, 2.seconds)
154+
val incompleteFut = pool.quiescentIncompleteCells
155+
val cells = Await.result(incompleteFut, 2.seconds)
156+
assert(cells.size == 0)
157+
}
158+
159+
test("whenComplete: cycle with additional incoming dep") {
160+
sealed trait Value
161+
case object Bottom extends Value
162+
case object Resolved extends Value
163+
case object Fallback extends Value
164+
case object OK extends Value
165+
case object ShouldNotHappen extends Value
166+
167+
implicit object ValueUpdater extends Updater[Value] {
168+
override def update(v1: Value, v2: Value): Value = v2
169+
override val initial: Value = Bottom
170+
}
171+
172+
object TheKey extends DefaultKey[Value] {
173+
override def resolve[K <: Key[Value]](cells: Iterable[Cell[K, Value]]): Iterable[(Cell[K, Value], Value)] = {
174+
cells.map(cell => (cell, Resolved))
175+
}
176+
override def fallback[K <: Key[Value]](cells: Iterable[Cell[K, Value]]): Iterable[(Cell[K, Value], Value)] = {
177+
cells.map(cell => (cell, Fallback))
178+
}
179+
}
180+
181+
implicit val pool = new HandlerPool
182+
val completer1 = CellCompleter[TheKey.type, Value](TheKey)
183+
val completer2 = CellCompleter[TheKey.type, Value](TheKey)
184+
val cell1 = completer1.cell
185+
val cell2 = completer2.cell
186+
val in = CellCompleter[TheKey.type, Value](TheKey)
187+
188+
// let `cell1` and `cell2` form a cycle
189+
cell1.whenMulti(List(cell2), () => NextOutcome(ShouldNotHappen))
190+
cell2.whenMulti(List(cell1), () => NextOutcome(ShouldNotHappen))
191+
192+
// the cycle is dependent on incoming information from `in`
193+
cell2.whenComplete(in.cell, v => { NextOutcome(ShouldNotHappen) })
194+
195+
// resolve the independent cell `in` and the cycle
196+
val fut = pool.quiescentResolveCell
197+
Await.ready(fut, 1.minutes)
198+
199+
pool.onQuiescenceShutdown()
200+
201+
assert(cell1.getResult() != ShouldNotHappen)
202+
assert(cell2.getResult() != ShouldNotHappen)
203+
assert(in.cell.getResult() == Fallback)
204+
}
205+
206+
test("whenSequential: calling sequentially") {
207+
val n = 1000
208+
209+
val runningCallbacks = new AtomicInteger(0)
210+
val latch = new CountDownLatch(1)
211+
val random = new scala.util.Random()
212+
213+
implicit val pool = new HandlerPool
214+
val completer1 = CellCompleter[NaturalNumberKey.type, Int](NaturalNumberKey)(Updater.latticeToUpdater(new NaturalNumberLattice), pool)
215+
val completer2 = CellCompleter[NaturalNumberKey.type, Int](NaturalNumberKey)(Updater.latticeToUpdater(new NaturalNumberLattice), pool)
216+
217+
val cell1 = completer1.cell
218+
cell1.whenSequentialMulti(List(completer2.cell), () => {
219+
assert(runningCallbacks.incrementAndGet() == 1)
220+
val x = completer2.cell.getResult()
221+
Thread.`yield`()
222+
try {
223+
Thread.sleep(random.nextInt(3))
224+
} catch {
225+
case _: InterruptedException => /* ignore */
226+
}
227+
assert(runningCallbacks.decrementAndGet() == 0)
228+
Outcome(x * n, x == n)
229+
})
230+
231+
cell1.onComplete(_ => {
232+
latch.countDown()
233+
})
234+
235+
for (i <- 1 to n)
236+
pool.execute(() => completer2.putNext(i))
237+
238+
latch.await()
239+
240+
assert(cell1.getResult() == n * n)
241+
assert(completer2.cell.getResult() == n)
242+
243+
pool.onQuiescenceShutdown()
244+
}
245+
246+
test("whenSequential: state") {
247+
// cell1 has deps to 1000 cells. All callbacks
248+
// share a counter (i.e. state) that must not be
249+
// incremented concurrently
250+
val n = 1000
251+
var count = Set[Int]()
252+
253+
class PowerSetLattice[T] extends Lattice[Set[T]] {
254+
255+
def join(left: Set[T], right: Set[T]): Set[T] =
256+
left ++ right
257+
258+
val bottom: Set[T] =
259+
Set[T]()
260+
261+
}
262+
263+
val theUpdater = Updater.latticeToUpdater(new PowerSetLattice[Int])
264+
265+
val latch = new CountDownLatch(1)
266+
val random = new scala.util.Random()
267+
268+
implicit val pool = new HandlerPool
269+
val theKey = new DefaultKey[Set[Int]]
270+
val completer1 = CellCompleter[DefaultKey[Set[Int]], Set[Int]](theKey)(theUpdater, pool)
271+
val cell1 = completer1.cell
272+
273+
cell1.onComplete(_ => {
274+
latch.countDown()
275+
})
276+
277+
for (i <- 1 to n) {
278+
val completer2 = CellCompleter[DefaultKey[Set[Int]], Set[Int]](theKey)(theUpdater, pool)
279+
val completer3 = CellCompleter[DefaultKey[Set[Int]], Set[Int]](theKey)(theUpdater, pool)
280+
cell1.whenSequentialMulti(List(completer2.cell, completer3.cell), () => {
281+
count = count ++ Set(count.size)
282+
Thread.`yield`()
283+
try {
284+
Thread.sleep(random.nextInt(3))
285+
} catch {
286+
case _: InterruptedException => /* ignore */
287+
}
288+
Outcome(count, count.size == 2*n)
289+
})
290+
pool.execute(() => completer2.putNext(Set(2*i)))
291+
pool.execute(() => completer3.putNext(Set(2*i+1)))
292+
}
293+
294+
latch.await()
295+
296+
assert(cell1.getResult().size == 2*n)
297+
298+
pool.onQuiescenceShutdown()
299+
}
300+
301+
test("whenCompleteSequential: discard callbacks on completion") {
302+
val latch1 = new CountDownLatch(1)
303+
val latch2 = new CountDownLatch(1)
304+
305+
implicit val pool = new HandlerPool
306+
307+
val completer1 = CellCompleter[NaturalNumberKey.type, Int](NaturalNumberKey)
308+
val completer2 = CellCompleter[NaturalNumberKey.type, Int](NaturalNumberKey)
309+
val completer3 = CellCompleter[NaturalNumberKey.type, Int](NaturalNumberKey)
310+
val cell1 = completer1.cell
311+
val cell2 = completer2.cell
312+
val cell3 = completer3.cell
313+
cell1.trigger()
314+
315+
cell1.whenSequentialMulti(List(cell2), () => {
316+
latch1.await() // wait for some puts/triggers
317+
FinalOutcome(10)
318+
})
319+
cell1.whenSequentialMulti(List(cell3), () => NextOutcome(cell3.getResult()))
320+
321+
completer2.putFinal(3)
322+
completer3.putNext(2)
323+
completer3.putNext(3)
324+
latch1.countDown()
325+
326+
pool.onQuiescent(() => {
327+
pool.onQuiescenceShutdown()
328+
latch2.countDown()
329+
})
330+
// pool needs to reach quiescence, even if cell1 is completed early:
331+
latch2.await()
332+
333+
assert(cell1.getResult() == 10)
334+
assert(cell2.getResult() == 3)
335+
assert(cell3.getResult() == 3)
336+
assert(cell1.isComplete)
337+
assert(cell2.isComplete)
338+
assert(!cell3.isComplete)
339+
}
340+
341+
}

0 commit comments

Comments
 (0)