diff --git a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala index d3b6d616a29..84a8bf77b3e 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Codecs.scala @@ -242,7 +242,8 @@ private[remote] object Decoder { recipientPath: String, inboundEnvelope: InboundEnvelope) - private object Tick + private case object Tick + private case object EvictActorRefResolveCache /** Materialized value of [[Encoder]] which allows safely calling into the operator to interfact with compression tables. */ private[remote] trait InboundCompressionAccess { @@ -359,7 +360,8 @@ private[remote] class Decoder( inEnvelopePool: ObjectPool[ReusableInboundEnvelope]) extends GraphStageWithMaterializedValue[FlowShape[EnvelopeBuffer, InboundEnvelope], InboundCompressionAccess] { - import Decoder.Tick + import Decoder.{ EvictActorRefResolveCache, Tick } + val in: Inlet[EnvelopeBuffer] = Inlet("Artery.Decoder.in") val out: Outlet[InboundEnvelope] = Outlet("Artery.Decoder.out") val shape: FlowShape[EnvelopeBuffer, InboundEnvelope] = FlowShape(in, out) @@ -395,6 +397,9 @@ private[remote] class Decoder( val tickDelay = 1.seconds scheduleWithFixedDelay(Tick, tickDelay, tickDelay) + val evictDelay = 60.seconds // FIXME config + scheduleWithFixedDelay(EvictActorRefResolveCache, evictDelay, evictDelay) + if (settings.Advanced.Compression.ActorRefs.Enabled) { val d = settings.Advanced.Compression.ActorRefs.AdvertisementInterval scheduleWithFixedDelay(AdvertiseActorRefsCompressionTable, d, d) @@ -584,6 +589,9 @@ private[remote] class Decoder( tickMessageCount = messageCount tickTimestamp = now + case EvictActorRefResolveCache => + actorRefResolver.clearRemovedAssociations() + case AdvertiseActorRefsCompressionTable => compressions .runNextActorRefAdvertisement() // TODO: optimise these operations, otherwise they stall the hotpath diff --git a/akka-remote/src/main/scala/akka/remote/artery/LruBoundedCache.scala b/akka-remote/src/main/scala/akka/remote/artery/LruBoundedCache.scala index 59e8d4832e2..8a7f205d5ac 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/LruBoundedCache.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/LruBoundedCache.scala @@ -59,6 +59,9 @@ private[akka] abstract class LruBoundedCache[K: ClassTag, V <: AnyRef: ClassTag] find(position = h & Mask, probeDistance = 0) } + final def valuesIterator(): Iterator[V] = + values.iterator.filterNot(_ eq null) + final def stats: CacheStatistics = { var i = 0 var sum = 0 diff --git a/akka-remote/src/main/scala/akka/remote/serialization/ActorRefResolveCache.scala b/akka-remote/src/main/scala/akka/remote/serialization/ActorRefResolveCache.scala index 410938bb9d3..47df0a7adb3 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/ActorRefResolveCache.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/ActorRefResolveCache.scala @@ -4,6 +4,8 @@ package akka.remote.serialization +import scala.concurrent.duration.Deadline +import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag import akka.actor.ActorRef @@ -35,12 +37,20 @@ private[akka] object ActorRefResolveThreadLocalCache override def createExtension(system: ExtendedActorSystem): ActorRefResolveThreadLocalCache = new ActorRefResolveThreadLocalCache(system) + + private final case class ActorRefResolveCacheHolder(cache: ActorRefResolveCache, evictDeadine: Deadline) } /** * INTERNAL API */ private[akka] class ActorRefResolveThreadLocalCache(val system: ExtendedActorSystem) extends Extension { + import ActorRefResolveThreadLocalCache.ActorRefResolveCacheHolder + + val evictInterval: FiniteDuration = { + import scala.concurrent.duration._ + 60.seconds // FIXME config + } private val provider = system.provider match { case r: RemoteActorRefProvider => r @@ -50,12 +60,21 @@ private[akka] class ActorRefResolveThreadLocalCache(val system: ExtendedActorSys s"not with ${system.provider.getClass}") } - private val current = new ThreadLocal[ActorRefResolveCache] { - override def initialValue: ActorRefResolveCache = new ActorRefResolveCache(provider) + private val current = new ThreadLocal[ActorRefResolveCacheHolder] { + override def initialValue: ActorRefResolveCacheHolder = { + ActorRefResolveCacheHolder(new ActorRefResolveCache(provider), Deadline.now + evictInterval) + } } - def threadLocalCache(@unused provider: RemoteActorRefProvider): ActorRefResolveCache = - current.get + def threadLocalCache(@unused provider: RemoteActorRefProvider): ActorRefResolveCache = { + val holder = current.get + val cache = holder.cache + if (holder.evictDeadine.isOverdue()) { + cache.clearRemovedAssociations() + current.set(ActorRefResolveCacheHolder(cache, Deadline.now + evictInterval)) + } + cache + } } @@ -91,6 +110,19 @@ private[akka] abstract class AbstractActorRefResolveCache[R <: ActorRef: ClassTa ref } + /** + * Invalidate cachedAssociation in all RemoteActorRef entries where the `Association` is removed. + */ + def clearRemovedAssociations(): Unit = { + valuesIterator().foreach { + case r: RemoteActorRef => + val cachedAssociation = r.cachedAssociation + if (cachedAssociation != null && cachedAssociation.isRemovedAfterQuarantined()) + r.cachedAssociation = null + case _ => + } + } + override protected def compute(k: String): R override protected def hash(k: String): Int = Unsafe.fastHash(k)