Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 145 additions & 90 deletions core/src/main/scala/com/phaller/rasync/Cell.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.phaller.rasync

import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.atomic.{ AtomicInteger, AtomicReference }
import java.util.concurrent.{ CountDownLatch, ExecutionException }

import scala.annotation.tailrec
Expand All @@ -26,7 +26,7 @@ trait Cell[K <: Key[V], V] {
def isComplete: Boolean

/**
* Adds a dependency on some `other` cell.
* Adds a dependency on some `other` cell, if such dependency does not exist yet.
*
* Example:
* {{{
Expand All @@ -38,12 +38,13 @@ trait Cell[K <: Key[V], V] {
*
* @param other Cell that `this` Cell depends on.
* @param valueCallback Callback that receives the final value of `other` and returns an `Outcome` for `this` cell.
* @return Returns true, iff the dependency was registered successfully.
*/
def whenComplete(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit
def whenCompleteSequential(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit
def whenComplete(other: Cell[K, V], valueCallback: V => Outcome[V]): Boolean
def whenCompleteSequential(other: Cell[K, V], valueCallback: V => Outcome[V]): Boolean

/**
* Adds a dependency on some `other` cell.
* Adds a dependency on some `other` cell, if such dependency does not exist yet.
*
* Example:
* {{{
Expand All @@ -55,9 +56,10 @@ trait Cell[K <: Key[V], V] {
*
* @param other Cell that `this` Cell depends on.
* @param valueCallback Callback that receives the new value of `other` and returns an `Outcome` for `this` cell.
* @return Returns true, iff the dependency was registered successfully.
*/
def whenNext(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit
def whenNextSequential(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit
def whenNext(other: Cell[K, V], valueCallback: V => Outcome[V]): Boolean
def whenNextSequential(other: Cell[K, V], valueCallback: V => Outcome[V]): Boolean

/**
* Adds a dependency on some `other` cell.
Expand All @@ -73,9 +75,10 @@ trait Cell[K <: Key[V], V] {
*
* @param other Cell that `this` Cell depends on.
* @param valueCallback Callback that receives the new value of `other` and returns an `Outcome` for `this` cell.
* @return Returns true, iff the dependency was registered successfully.
*/
def when(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Unit
def whenSequential(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Unit
def when(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Boolean
def whenSequential(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Boolean

def zipFinal(that: Cell[K, V]): Cell[DefaultKey[(V, V)], (V, V)]

Expand Down Expand Up @@ -118,6 +121,9 @@ trait Cell[K <: Key[V], V] {
private[rasync] def removeAllCallbacks(cells: Seq[Cell[K, V]]): Unit

def isADependee(): Boolean

private[rasync] def incIncomingCallbacks(): Unit
private[rasync] def decIncomingCallbacks(): Unit
}

object Cell {
Expand Down Expand Up @@ -197,6 +203,11 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
*/
private val state = new AtomicReference[AnyRef](State.empty[K, V](updater))

/* first element is the number of incoming callbacks,
* second element is the value that has been propagated to dependent cells (or initially bottom)
*/
private val numIncomingCallbacks = new AtomicReference[(Int, V)]((0, updater.bottom))

// `CellCompleter` and corresponding `Cell` are the same run-time object.
override def cell: Cell[K, V] = this

Expand Down Expand Up @@ -333,90 +344,87 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
this.putFinal(value)
}

override def when(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Unit = {
this.whenNext(other, valueCallback(_, false))
this.whenComplete(other, valueCallback(_, true))
override def when(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Boolean = {
this.whenNext(other, valueCallback(_, false)) && this.whenComplete(other, valueCallback(_, true))
}

override def whenSequential(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Unit = {
this.whenNextSequential(other, valueCallback(_, false))
this.whenCompleteSequential(other, valueCallback(_, true))
override def whenSequential(other: Cell[K, V], valueCallback: (V, Boolean) => Outcome[V]): Boolean = {
this.whenNextSequential(other, valueCallback(_, false)) && this.whenCompleteSequential(other, valueCallback(_, true))
}

override def whenNext(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit = {
override def whenNext(other: Cell[K, V], valueCallback: V => Outcome[V]): Boolean = {
this.whenNext(other, valueCallback, sequential = false)
}

override def whenNextSequential(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit = {
override def whenNextSequential(other: Cell[K, V], valueCallback: V => Outcome[V]): Boolean = {
this.whenNext(other, valueCallback, sequential = true)
}

private def whenNext(other: Cell[K, V], valueCallback: V => Outcome[V], sequential: Boolean): Unit = {
var success = false
while (!success) {
state.get() match {
case finalRes: Try[_] => // completed with final result
// do not add dependency
// in fact, do nothing
success = true

case raw: State[_, _] => // not completed
val newDep: NextDepRunnable[K, V] =
if (sequential) new NextSequentialDepRunnable(pool, this, other, valueCallback)
else new NextConcurrentDepRunnable(pool, this, other, valueCallback)

val current = raw.asInstanceOf[State[K, V]]
val depRegistered =
if (current.nextDeps.contains(other)) true
else {
val newState = new State(current.res, current.tasksActive, current.completeDeps, current.completeCallbacks, current.nextDeps + other, current.nextCallbacks)
state.compareAndSet(current, newState)
}
if (depRegistered) {
success = true
other.addNextCallback(newDep, this)
pool.triggerExecution(other)
}
@tailrec
private def whenNext(other: Cell[K, V], valueCallback: V => Outcome[V], sequential: Boolean): Boolean = state.get() match {
case finalRes: Try[_] => // completed with final result
// do not add dependency
// in fact, do nothing
false

case raw: State[_, _] => // not completed
val current = raw.asInstanceOf[State[K, V]]

if (current.nextDeps.contains(other))
// Do not register a dependency on the same cell twice
false
else {
val newDep: NextDepRunnable[K, V] =
if (sequential) new NextSequentialDepRunnable(pool, this, other, valueCallback)
else new NextConcurrentDepRunnable(pool, this, other, valueCallback)

val newState = new State(current.res, current.tasksActive, current.completeDeps, current.completeCallbacks, current.nextDeps + other, current.nextCallbacks)

if (state.compareAndSet(current, newState)) {
other.addNextCallback(newDep, this)
pool.triggerExecution(other)
true
} else {
whenNext(other, valueCallback, sequential)
}
}
}
}

override def whenComplete(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit = {
override def whenComplete(other: Cell[K, V], valueCallback: V => Outcome[V]): Boolean = {
this.whenComplete(other, valueCallback, false)
}

override def whenCompleteSequential(other: Cell[K, V], valueCallback: V => Outcome[V]): Unit = {
override def whenCompleteSequential(other: Cell[K, V], valueCallback: V => Outcome[V]): Boolean = {
this.whenComplete(other, valueCallback, true)
}

private def whenComplete(other: Cell[K, V], valueCallback: V => Outcome[V], sequential: Boolean): Unit = {
var success = false
while (!success) {
state.get() match {
case finalRes: Try[_] => // completed with final result
// do not add dependency
// in fact, do nothing
success = true

case raw: State[_, _] => // not completed
val newDep: CompleteDepRunnable[K, V] =
if (sequential) new CompleteSequentialDepRunnable(pool, this, other, valueCallback)
else new CompleteConcurrentDepRunnable(pool, this, other, valueCallback)

val current = raw.asInstanceOf[State[K, V]]
val depRegistered =
if (current.completeDeps.contains(other)) true
else {
val newState = new State(current.res, current.tasksActive, current.completeDeps + other, current.completeCallbacks, current.nextDeps, current.nextCallbacks)
state.compareAndSet(current, newState)
}
if (depRegistered) {
success = true
other.addCompleteCallback(newDep, this)
pool.triggerExecution(other)
}
@tailrec
private def whenComplete(other: Cell[K, V], valueCallback: V => Outcome[V], sequential: Boolean): Boolean = state.get() match {
case finalRes: Try[_] => // completed with final result
// do not add dependency
// in fact, do nothing
false

case raw: State[_, _] => // not completed
val current = raw.asInstanceOf[State[K, V]]

if (current.completeDeps.contains(other))
// Do not register a dependency on the same cell twice
false
else {
val newDep: CompleteDepRunnable[K, V] =
if (sequential) new CompleteSequentialDepRunnable(pool, this, other, valueCallback)
else new CompleteConcurrentDepRunnable(pool, this, other, valueCallback)
val newState = new State(current.res, current.tasksActive, current.completeDeps + other, current.completeCallbacks, current.nextDeps, current.nextCallbacks)

if (state.compareAndSet(current, newState)) {
other.addCompleteCallback(newDep, this)
pool.triggerExecution(other)
true
} else {
whenComplete(other, valueCallback, sequential)
}
}
}
}

override private[rasync] def addCompleteCallback(callback: CompleteCallbackRunnable[K, V], cell: Cell[K, V]): Unit = {
Expand Down Expand Up @@ -461,10 +469,11 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
if (!state.compareAndSet(current, newState)) {
tryNewState(value)
} else {
// CAS was successful, so there was a point in time where `newVal` was in the cell
current.nextCallbacks.values.foreach { callbacks =>
callbacks.foreach(callback => callback.execute())
}
// If we came here via a direct putNext (instead of Outcome of a whenNextCallback)
// this incoming change has not been counted. So we need to manually start outgoing callbacks.
// (This might lead to duplicate invocation.)
if (numIncomingCallbacks.get()._1 <= 0)
triggerNextCallbacks()
true
}
} else true
Expand Down Expand Up @@ -503,21 +512,12 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
res

case (pre: State[K, V], newVal: Try[V]) =>
val nextCallbacks = pre.nextCallbacks
val completeCallbacks = pre.completeCallbacks

if (nextCallbacks.nonEmpty)
nextCallbacks.values.foreach { callbacks =>
callbacks.foreach(callback => callback.execute())
}
if (completeCallbacks.nonEmpty)
completeCallbacks.values.foreach { callbacks =>
callbacks.foreach(callback => callback.execute())
}
triggerCompleteCallbacks(pre)

val depsCells = pre.completeDeps
val nextDepsCells = pre.nextDeps

// Other cells do not need to call us any more
if (depsCells.nonEmpty)
depsCells.foreach(_.removeCompleteCallbacks(this))
if (nextDepsCells.nonEmpty)
Expand All @@ -531,6 +531,29 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
res
}

// We need to pass the list of callbacks here, because they
// might have already been deleted in `state` while the cell has
// been set to its final value.
private def triggerCompleteCallbacks(pre: State[K, V]): Unit = {
pre.completeCallbacks.values.foreach { callbacks =>
callbacks.foreach(callback => callback.execute())
}
pre.nextCallbacks.values.foreach { callbacks =>
callbacks.foreach(callback => callback.execute())
}
}

private def triggerNextCallbacks(): Unit = {
state.get() match {
case _: Try[_] => /* Meanwhile, cell has been completed. No need to trigger `next` callbacks any more. */
case raw: State[_, _] =>
val current = raw.asInstanceOf[State[K, V]]
current.nextCallbacks.values.foreach { callbacks =>
callbacks.foreach(callback => callback.execute())
}
}
}

@tailrec
override private[rasync] final def removeDep(cell: Cell[K, V]): Unit = {
state.get() match {
Expand All @@ -544,7 +567,7 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
else if (newDeps.isEmpty)
nodepslatch.countDown()

case _ => /* do nothing */
case _ => /* `this` has been completed and therefore does not have any deps stored. */
}
}

Expand All @@ -561,7 +584,7 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
else if (newNextDeps.isEmpty)
nonextdepslatch.countDown()

case _ => /* do nothing */
case _ => /* `this` has been completed and therefore does not have any deps stored. */
}
}

Expand Down Expand Up @@ -735,4 +758,36 @@ private class CellImpl[K <: Key[V], V](pool: HandlerPool, val key: K, updater: U
numCompleteCallbacks > 0 || numNextCallbacks > 0
}

/** Called, when a CallbackRunnable r with r.dependentCell == this has been started. */
@tailrec
override final private[rasync] def incIncomingCallbacks(): Unit = {
val current = numIncomingCallbacks.get()
val next = (current._1 + 1, current._2)

if (!numIncomingCallbacks.compareAndSet(current, next))
incIncomingCallbacks()
}

/**
* Called, when a CallbackRunnable r with r.dependentCell == this has been completed.
* Triggers all outgoing callbacks, if no more incoming callbacks are running.
*/
@tailrec
override final private[rasync] def decIncomingCallbacks(): Unit = {
val current = numIncomingCallbacks.get()

// If we drop to zero, store the current result as the latest propagated value
val next =
if (current._1 == 1)
(0, getResult())
else
(current._1 - 1, current._2)

if (numIncomingCallbacks.compareAndSet(current, next)) {
// CAS was successfull. Call dependent cells, if we dropped to zero and
// have new information to propagated
if (current._1 == 1 && next._2 != current._2)
triggerNextCallbacks()
} else decIncomingCallbacks()
}
}
2 changes: 2 additions & 0 deletions core/src/main/scala/com/phaller/rasync/HandlerPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ class HandlerPool(parallelism: Int = 8, unhandledExceptionHandler: Throwable =>
// Submit task to the pool
incSubmittedTasks()

// println(s"added task $task")

// Run the task
pool.execute(new Runnable {
def run(): Unit = {
Expand Down
Loading