-
Couldn't load subscription status.
- Fork 19
Updating values
Hazelcast supports a number of ways to update existing entries in an IMap. The most naive approach, is to simply get an entry, update the value, and set it back:
/** Increment and return number. */
def updateNumber(key: String, delta: Int): Option[Int] = {
Option(map.get(key)) map { current =>
val updated = current.copy(number = current.number + delta)
map.set(key, updated)
updated.number
}
}This obviously isn't thread-safe, so we could fix that by using the java.util.concurrent.ConcurrentMap methods and a CAS loop (recursive):
def updateNumber(key: String, delta: Int): Option[Int] = {
Option(map.get(key)) map { current =>
val updated = current.copy(number = current.number + delta)
val success = map.replace(key, current, updated)
if (success) updated.number
else updateNumber(key, delta)
}
}This works AND is thread-safe. However, it still consists of moving the entire object across the network at least three times, once on get and at least twice on replace (old and new both must be passed). If the objects are small and the updates infrequent, this may not be an issue.
But then again, why go through all that when we have a much better way that is both lock and CAS free. The Scala API for Hazelcast supports the following delta update functions:
updateupsertupsertAndGetupdateAndGetupdateAndGetIfupdateIfgetAndUpsertgetAndUpdategetAndUpdateIf
This can simplify our code to this:
def updateNumber(key: String, delta: Int): Option[Int] = {
map.updateAndGet(key) { current =>
current.copy(number = current.number + delta)
} map (_.number)
}Thread-safe and no CAS retries as the update happens on the partition thread, ensuring serial access. Network cost is the delta value itself, and the full object going back. If using update, nothing would be returned.
This is still inefficient, because we're clearly not interested in the full value, only the updated number. In that case, let's grab the more flexible execute method, which gives us much more freedom. Using execute is generally a batch operation, and will go through all entries that match the filter. However if the filter is a single distinct key, you will always get a single callback of Entry[K, Option[V]], where the optional value indicates existence of the entry.
So, in this example, it would look like this:
def updateNumber(key: String, delta: Int): Option[Int] = {
map.execute(OnKey(key)) { entry =>
entry.value = entry.value map { current => current.copy(number = current.number + delta) }
entry.value map (_.number)
}
}This is slightly more verbose than the last example, but it achieves our purpose, namely only passing Int values over the network, first the delta, going to the partition, and then the updated number returned.
Are we not interested in any return value, it becomes much simpler and we can just use the update method:
def updateNumber(key: String, delta: Int): Boolean = {
map.update(key) { current =>
current.copy(number = current.number + delta)
}
}All the examples shown use the blocking API. To use a non-blocking approach replace map with map.async, e.g.
def updateNumber(key: String, delta: Int): Future[Boolean] = {
map.async.update(key) { current =>
current.copy(number = current.number + delta)
}
}Sometimes we only want to update, if a given condition is met. And often that condition depends on the data we want to update. Does that mean we are back to using CAS replacement?
/** Update number, if status is active. */
def updateNumber(key: String, delta: Int): Future[Boolean] = {
map.async.get(key) flatMap {
case None => Future successful false
case Some(current) =>
if (current.isActive) {
val updated = current.copy(number = current.number + delta)
val success = map.replace(key, current, updated)
if (success) Future successful true
else updateNumber(key, delta)
} else {
Future successful false
}
}
}No, we can use one of the three conditional update methods updateIf, updateAndGetIf, or getAndUpdateIf:
def updateNumber(key: String, delta: Int): Future[Boolean] = {
map.async.updateIf(_.isActive, key) { active =>
active.copy(number = active.number + delta)
}
}