From 2cb1eb487e7a44e24c9219db23c313a53fc8e990 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 30 Jan 2025 01:21:28 +0100 Subject: [PATCH 01/12] init work on RandomNumberGenerator --- .../typed/internal/StubbedActorContext.scala | 9 ++-- .../typed/javadsl/BehaviorTestKit.scala | 6 +-- .../testkit/typed/javadsl/TestInbox.scala | 7 ++- .../typed/scaladsl/BehaviorTestKit.scala | 5 ++- .../testkit/typed/scaladsl/TestInbox.scala | 5 +-- .../WorkPullingProducerControllerImpl.scala | 5 +-- .../actor/typed/internal/Supervision.scala | 6 +-- .../typed/internal/routing/RoutingLogic.scala | 5 +-- .../org/apache/pekko/actor/ActorCell.scala | 4 +- .../org/apache/pekko/actor/ActorSystem.scala | 2 +- .../org/apache/pekko/io/dns/IdGenerator.scala | 15 +++++-- .../pekko/pattern/BackoffSupervisor.scala | 4 +- .../apache/pekko/pattern/CircuitBreaker.scala | 2 +- .../routing/OptimalSizeExploringResizer.scala | 4 +- .../org/apache/pekko/routing/Random.scala | 5 +-- .../pekko/routing/SmallestMailbox.scala | 5 +-- .../pekko/util/RandomNumberGenerator.scala | 44 +++++++++++++++++++ .../metrics/ClusterMetricsCollector.scala | 5 +-- .../metrics/ClusterMetricsRouting.scala | 6 +-- .../typed/ReplicatedShardingTest.java | 2 +- .../pubsub/DistributedPubSubMediator.scala | 4 +- .../pekko/cluster/MembershipState.scala | 9 ++-- .../pekko/cluster/ddata/Replicator.scala | 7 ++- .../FailureInjectorTransportAdapter.scala | 5 +-- .../stream/impl/fusing/GraphInterpreter.scala | 5 +-- 25 files changed, 107 insertions(+), 69 deletions(-) create mode 100644 actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/StubbedActorContext.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/StubbedActorContext.scala index 51314acafcb..cb1325e2a01 100644 --- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/StubbedActorContext.scala +++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/StubbedActorContext.scala @@ -19,12 +19,11 @@ import pekko.actor.typed._ import pekko.actor.typed.internal._ import pekko.actor.{ ActorPath, ActorRefProvider, InvalidMessageException } import pekko.annotation.InternalApi -import pekko.util.Helpers +import pekko.util.{ Helpers, RandomNumberGenerator} import pekko.{ actor => classic } import org.slf4j.{ Logger, Marker } import org.slf4j.helpers.{ MessageFormatter, SubstituteLoggerFactory } -import java.util.concurrent.ThreadLocalRandom.{ current => rnd } import scala.collection.immutable.TreeMap import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration.FiniteDuration @@ -75,7 +74,7 @@ private[pekko] final class FunctionRef[-T](override val path: ActorPath, send: ( extends ActorContextImpl[T] { def this(system: ActorSystemStub, name: String, currentBehaviorProvider: () => Behavior[T]) = { - this(system, (system.path / name).withUid(rnd().nextInt()), currentBehaviorProvider) + this(system, (system.path / name).withUid(RandomNumberGenerator.get().nextInt()), currentBehaviorProvider) } def this(name: String, currentBehaviorProvider: () => Behavior[T]) = { @@ -111,7 +110,7 @@ private[pekko] final class FunctionRef[-T](override val path: ActorPath, send: ( override def spawnAnonymous[U](behavior: Behavior[U], props: Props = Props.empty): ActorRef[U] = { checkCurrentActorThread() - val btk = new BehaviorTestKitImpl[U](system, (path / childName.next()).withUid(rnd().nextInt()), behavior) + val btk = new BehaviorTestKitImpl[U](system, (path / childName.next()).withUid(RandomNumberGenerator.get().nextInt()), behavior) _children += btk.context.self.path.name -> btk btk.context.self } @@ -120,7 +119,7 @@ private[pekko] final class FunctionRef[-T](override val path: ActorPath, send: ( _children.get(name) match { case Some(_) => throw classic.InvalidActorNameException(s"actor name $name is already taken") case None => - val btk = new BehaviorTestKitImpl[U](system, (path / name).withUid(rnd().nextInt()), behavior) + val btk = new BehaviorTestKitImpl[U](system, (path / name).withUid(RandomNumberGenerator.get().nextInt()), behavior) _children += name -> btk btk.context.self } diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/BehaviorTestKit.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/BehaviorTestKit.scala index 385b6549e2f..b875148c967 100644 --- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/BehaviorTestKit.scala +++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/BehaviorTestKit.scala @@ -19,9 +19,9 @@ import pekko.actor.testkit.typed.{ CapturedLogEvent, Effect } import pekko.actor.typed.receptionist.Receptionist import pekko.actor.typed.{ ActorRef, Behavior, Signal } import pekko.annotation.{ ApiMayChange, DoNotInherit } -import com.typesafe.config.Config +import pekko.util.RandomNumberGenerator -import java.util.concurrent.ThreadLocalRandom +import com.typesafe.config.Config object BehaviorTestKit { @@ -37,7 +37,7 @@ object BehaviorTestKit { @ApiMayChange def create[T](initialBehavior: Behavior[T], name: String, config: Config): BehaviorTestKit[T] = { val system = new ActorSystemStub("StubbedActorContext", config) - val uid = ThreadLocalRandom.current().nextInt() + val uid = RandomNumberGenerator.get().nextInt() new BehaviorTestKitImpl(system, (system.path / name).withUid(uid), initialBehavior) } diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/TestInbox.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/TestInbox.scala index 6bd0a30f5af..3b5da4ffc83 100644 --- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/TestInbox.scala +++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/javadsl/TestInbox.scala @@ -13,8 +13,6 @@ package org.apache.pekko.actor.testkit.typed.javadsl -import java.util.concurrent.ThreadLocalRandom - import scala.collection.immutable import org.apache.pekko @@ -22,16 +20,17 @@ import pekko.actor.testkit.typed.internal.TestInboxImpl import pekko.actor.typed.ActorRef import pekko.annotation.DoNotInherit import pekko.util.ccompat.JavaConverters._ +import pekko.util.RandomNumberGenerator object TestInbox { import pekko.actor.testkit.typed.scaladsl.TestInbox.address def create[T](name: String): TestInbox[T] = { - val uid = ThreadLocalRandom.current().nextInt() + val uid = RandomNumberGenerator.get().nextInt() new TestInboxImpl((address / name).withUid(uid)) } def create[T](): TestInbox[T] = { - val uid = ThreadLocalRandom.current().nextInt() + val uid = RandomNumberGenerator.get().nextInt() new TestInboxImpl((address / "inbox").withUid(uid)) } } diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/BehaviorTestKit.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/BehaviorTestKit.scala index a69b05778b6..1a8ffc23ac5 100644 --- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/BehaviorTestKit.scala +++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/BehaviorTestKit.scala @@ -19,9 +19,10 @@ import pekko.actor.testkit.typed.{ CapturedLogEvent, Effect } import pekko.actor.typed.receptionist.Receptionist import pekko.actor.typed.{ ActorRef, Behavior, Signal, TypedActorContext } import pekko.annotation.{ ApiMayChange, DoNotInherit } +import pekko.util.RandomNumberGenerator + import com.typesafe.config.Config -import java.util.concurrent.ThreadLocalRandom import scala.collection.immutable import scala.reflect.ClassTag @@ -32,7 +33,7 @@ object BehaviorTestKit { def apply[T](initialBehavior: Behavior[T], name: String, config: Config): BehaviorTestKit[T] = { val system = new ActorSystemStub("StubbedActorContext", config) - val uid = ThreadLocalRandom.current().nextInt() + val uid = RandomNumberGenerator.get().nextInt() new BehaviorTestKitImpl(system, (system.path / name).withUid(uid), initialBehavior) } def apply[T](initialBehavior: Behavior[T], name: String): BehaviorTestKit[T] = { diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/TestInbox.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/TestInbox.scala index 9f66b97a43a..9e0a767c557 100644 --- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/TestInbox.scala +++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/scaladsl/TestInbox.scala @@ -13,8 +13,6 @@ package org.apache.pekko.actor.testkit.typed.scaladsl -import java.util.concurrent.ThreadLocalRandom - import scala.collection.immutable import org.apache.pekko @@ -22,11 +20,12 @@ import pekko.actor.{ Address, RootActorPath } import pekko.actor.testkit.typed.internal.TestInboxImpl import pekko.actor.typed.ActorRef import pekko.annotation.{ ApiMayChange, DoNotInherit } +import pekko.util.RandomNumberGenerator @ApiMayChange object TestInbox { def apply[T](name: String = "inbox"): TestInbox[T] = { - val uid = ThreadLocalRandom.current().nextInt() + val uid = RandomNumberGenerator.get().nextInt() new TestInboxImpl((address / name).withUid(uid)) } diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala index 9b2c05034bd..23069bedb27 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/delivery/internal/WorkPullingProducerControllerImpl.scala @@ -14,7 +14,6 @@ package org.apache.pekko.actor.typed.delivery.internal import java.util.UUID -import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.TimeoutException import scala.reflect.ClassTag @@ -39,7 +38,7 @@ import pekko.actor.typed.scaladsl.Behaviors import pekko.actor.typed.scaladsl.LoggerOps import pekko.actor.typed.scaladsl.StashBuffer import pekko.annotation.InternalApi -import pekko.util.Timeout +import pekko.util.{ RandomNumberGenerator, Timeout } /** * INTERNAL API @@ -404,7 +403,7 @@ private class WorkPullingProducerControllerImpl[A: ClassTag]( if (workers.isEmpty) { None } else { - val i = ThreadLocalRandom.current().nextInt(workers.size) + val i = RandomNumberGenerator.get().nextInt(workers.size) Some(workers(i)) } } diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/Supervision.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/Supervision.scala index bbc4366cb26..ce7803cf12e 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/Supervision.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/Supervision.scala @@ -14,8 +14,6 @@ package org.apache.pekko.actor.typed package internal -import java.util.concurrent.ThreadLocalRandom - import scala.concurrent.duration.Deadline import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag @@ -33,7 +31,7 @@ import pekko.actor.typed.scaladsl.Behaviors import pekko.actor.typed.scaladsl.StashBuffer import pekko.annotation.InternalApi import pekko.event.Logging -import pekko.util.OptionVal +import pekko.util.{ OptionVal, RandomNumberGenerator } import pekko.util.unused import scala.util.Try @@ -187,7 +185,7 @@ private object RestartSupervisor { minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double): FiniteDuration = { - val rnd = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor + val rnd = 1.0 + RandomNumberGenerator.get().nextDouble() * randomFactor val calculatedDuration = Try(maxBackoff.min(minBackoff * math.pow(2, restartCount)) * rnd).getOrElse(maxBackoff) calculatedDuration match { case f: FiniteDuration => f diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/routing/RoutingLogic.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/routing/RoutingLogic.scala index e06b1e598ec..397f51bb8df 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/routing/RoutingLogic.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/routing/RoutingLogic.scala @@ -13,13 +13,12 @@ package org.apache.pekko.actor.typed.internal.routing -import java.util.concurrent.ThreadLocalRandom - import org.apache.pekko import pekko.actor.Address import pekko.actor.typed.ActorRef import pekko.annotation.InternalApi import pekko.routing.ConsistentHash +import pekko.util.RandomNumberGenerator /** * Kept in the behavior, not shared between instances, meant to be stateful. @@ -89,7 +88,7 @@ private[pekko] object RoutingLogics { private var currentRoutees: Array[ActorRef[T]] = _ override def selectRoutee(msg: T): ActorRef[T] = { - val selectedIdx = ThreadLocalRandom.current().nextInt(currentRoutees.length) + val selectedIdx = RandomNumberGenerator.get().nextInt(currentRoutees.length) currentRoutees(selectedIdx) } diff --git a/actor/src/main/scala/org/apache/pekko/actor/ActorCell.scala b/actor/src/main/scala/org/apache/pekko/actor/ActorCell.scala index 26131a979cc..33b86cd336c 100644 --- a/actor/src/main/scala/org/apache/pekko/actor/ActorCell.scala +++ b/actor/src/main/scala/org/apache/pekko/actor/ActorCell.scala @@ -14,7 +14,6 @@ package org.apache.pekko.actor import java.io.{ NotSerializableException, ObjectOutputStream } -import java.util.concurrent.ThreadLocalRandom import scala.annotation.{ switch, tailrec } import scala.annotation.nowarn @@ -31,6 +30,7 @@ import pekko.dispatch.sysmsg._ import pekko.event.Logging.{ Debug, Error, LogEvent } import pekko.japi.Procedure import pekko.util.unused +import pekko.util.RandomNumberGenerator /** * The actor context - the view of the actor cell from the actor. @@ -392,7 +392,7 @@ private[pekko] object ActorCell { @tailrec final def newUid(): Int = { // Note that this uid is also used as hashCode in ActorRef, so be careful // to not break hashing if you change the way uid is generated - val uid = ThreadLocalRandom.current.nextInt() + val uid = RandomNumberGenerator.get().nextInt() if (uid == undefinedUid) newUid() else uid } diff --git a/actor/src/main/scala/org/apache/pekko/actor/ActorSystem.scala b/actor/src/main/scala/org/apache/pekko/actor/ActorSystem.scala index c879ff116e9..97ed54eaa8d 100644 --- a/actor/src/main/scala/org/apache/pekko/actor/ActorSystem.scala +++ b/actor/src/main/scala/org/apache/pekko/actor/ActorSystem.scala @@ -814,7 +814,7 @@ private[pekko] class ActorSystemImpl( setup: ActorSystemSetup) extends ExtendedActorSystem { - val uid: Long = ThreadLocalRandom.current.nextLong() + val uid: Long = RandomNumberGenerator.get().nextLong() if (!name.matches("""^[a-zA-Z0-9][a-zA-Z0-9-_]*$""")) throw new IllegalArgumentException( diff --git a/actor/src/main/scala/org/apache/pekko/io/dns/IdGenerator.scala b/actor/src/main/scala/org/apache/pekko/io/dns/IdGenerator.scala index 68856aa40b8..dfa9f64f129 100644 --- a/actor/src/main/scala/org/apache/pekko/io/dns/IdGenerator.scala +++ b/actor/src/main/scala/org/apache/pekko/io/dns/IdGenerator.scala @@ -17,11 +17,12 @@ package org.apache.pekko.io.dns -import org.apache.pekko.annotation.InternalApi +import org.apache.pekko +import pekko.annotation.InternalApi +import pekko.util.RandomNumberGenerator import java.security.SecureRandom import java.util.Random -import java.util.concurrent.ThreadLocalRandom /** * INTERNAL API @@ -52,12 +53,12 @@ private[pekko] object IdGenerator { } def apply(policy: Policy): IdGenerator = policy match { - case Policy.ThreadLocalRandom => random(ThreadLocalRandom.current()) + case Policy.ThreadLocalRandom => random(RandomNumberGenerator.get()) case Policy.SecureRandom => random(new SecureRandom()) case Policy.EnhancedDoubleHashRandom => new EnhancedDoubleHashGenerator(new SecureRandom()) } - def apply(): IdGenerator = random(ThreadLocalRandom.current()) + def apply(): IdGenerator = random(RandomNumberGenerator.get()) /** * @return a random sequence of ids for production @@ -65,6 +66,12 @@ private[pekko] object IdGenerator { def random(rand: java.util.Random): IdGenerator = () => (rand.nextInt(UnsignedShortBound) + Short.MinValue).toShort + /** + * @return a random sequence of ids for production + */ + private def random(rand: RandomNumberGenerator): IdGenerator = + () => (rand.nextInt(UnsignedShortBound) + Short.MinValue).toShort + private[pekko] class EnhancedDoubleHashGenerator(seed: Random) extends IdGenerator { /** diff --git a/actor/src/main/scala/org/apache/pekko/pattern/BackoffSupervisor.scala b/actor/src/main/scala/org/apache/pekko/pattern/BackoffSupervisor.scala index d8525ca9903..0debcd992ad 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/BackoffSupervisor.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/BackoffSupervisor.scala @@ -14,7 +14,6 @@ package org.apache.pekko.pattern import java.util.Optional -import java.util.concurrent.ThreadLocalRandom import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.util.Try @@ -24,6 +23,7 @@ import pekko.actor.{ ActorRef, DeadLetterSuppression, OneForOneStrategy, Props, import pekko.annotation.InternalApi import pekko.pattern.internal.BackoffOnStopSupervisor import pekko.util.JavaDurationConverters._ +import pekko.util.RandomNumberGenerator object BackoffSupervisor { @@ -324,7 +324,7 @@ object BackoffSupervisor { minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double): FiniteDuration = { - val rnd = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor + val rnd = 1.0 + RandomNumberGenerator.get().nextDouble() * randomFactor val calculatedDuration = Try(maxBackoff.min(minBackoff * math.pow(2, restartCount)) * rnd).getOrElse(maxBackoff) calculatedDuration match { case f: FiniteDuration => f diff --git a/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala b/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala index 00dcbeb2609..f80d3f51738 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala @@ -1107,7 +1107,7 @@ class CircuitBreaker( scheduler.scheduleOnce(currentResetTimeout) { attemptReset() } - val rnd = 1.0 + ThreadLocalRandom.current().nextDouble() * randomFactor + val rnd = 1.0 + RandomNumberGenerator.get().nextDouble() * randomFactor val nextResetTimeout = currentResetTimeout * exponentialBackoffFactor * rnd match { case f: FiniteDuration => f case _ => currentResetTimeout diff --git a/actor/src/main/scala/org/apache/pekko/routing/OptimalSizeExploringResizer.scala b/actor/src/main/scala/org/apache/pekko/routing/OptimalSizeExploringResizer.scala index 6d366122423..220d7e6f498 100644 --- a/actor/src/main/scala/org/apache/pekko/routing/OptimalSizeExploringResizer.scala +++ b/actor/src/main/scala/org/apache/pekko/routing/OptimalSizeExploringResizer.scala @@ -14,7 +14,6 @@ package org.apache.pekko.routing import java.time.LocalDateTime -import java.util.concurrent.ThreadLocalRandom import scala.collection.immutable import scala.concurrent.duration._ @@ -26,6 +25,7 @@ import org.apache.pekko import pekko.actor._ import pekko.annotation.InternalApi import pekko.util.JavaDurationConverters._ +import pekko.util.RandomNumberGenerator trait OptimalSizeExploringResizer extends Resizer { @@ -163,7 +163,7 @@ case class DefaultOptimalSizeExploringResizer( @InternalApi private[routing] var stopExploring = false - private def random = ThreadLocalRandom.current() + private def random = RandomNumberGenerator.get() private def checkParamAsProbability(value: Double, paramName: String): Unit = if (value < 0 || value > 1) diff --git a/actor/src/main/scala/org/apache/pekko/routing/Random.scala b/actor/src/main/scala/org/apache/pekko/routing/Random.scala index 9da3b228d01..6ad7adbda39 100644 --- a/actor/src/main/scala/org/apache/pekko/routing/Random.scala +++ b/actor/src/main/scala/org/apache/pekko/routing/Random.scala @@ -13,8 +13,6 @@ package org.apache.pekko.routing -import java.util.concurrent.ThreadLocalRandom - import scala.collection.immutable import scala.annotation.nowarn @@ -25,6 +23,7 @@ import pekko.actor.ActorSystem import pekko.actor.SupervisorStrategy import pekko.dispatch.Dispatchers import pekko.japi.Util.immutableSeq +import pekko.util.RandomNumberGenerator object RandomRoutingLogic { def apply(): RandomRoutingLogic = new RandomRoutingLogic @@ -38,7 +37,7 @@ object RandomRoutingLogic { final class RandomRoutingLogic extends RoutingLogic { override def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee = if (routees.isEmpty) NoRoutee - else routees(ThreadLocalRandom.current.nextInt(routees.size)) + else routees(RandomNumberGenerator.get().nextInt(routees.size)) } /** diff --git a/actor/src/main/scala/org/apache/pekko/routing/SmallestMailbox.scala b/actor/src/main/scala/org/apache/pekko/routing/SmallestMailbox.scala index a549d66bd5a..8aa7d6c87c5 100644 --- a/actor/src/main/scala/org/apache/pekko/routing/SmallestMailbox.scala +++ b/actor/src/main/scala/org/apache/pekko/routing/SmallestMailbox.scala @@ -13,8 +13,6 @@ package org.apache.pekko.routing -import java.util.concurrent.ThreadLocalRandom - import scala.annotation.tailrec import scala.collection.immutable @@ -27,6 +25,7 @@ import pekko.actor.ActorRefWithCell import pekko.actor.ActorSystem import pekko.actor.SupervisorStrategy import pekko.dispatch.Dispatchers +import pekko.util.RandomNumberGenerator object SmallestMailboxRoutingLogic { def apply(): SmallestMailboxRoutingLogic = new SmallestMailboxRoutingLogic @@ -71,7 +70,7 @@ class SmallestMailboxRoutingLogic extends RoutingLogic { NoRoutee else if (at >= targets.size) { if (deep) { - if (isTerminated(proposedTarget)) targets(ThreadLocalRandom.current.nextInt(targets.size)) else proposedTarget + if (isTerminated(proposedTarget)) targets(RandomNumberGenerator.get().nextInt(targets.size)) else proposedTarget } else selectNext(targets, proposedTarget, currentScore, 0, deep = true) } else { val target = targets(at) diff --git a/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala b/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala new file mode 100644 index 00000000000..334ad1308c6 --- /dev/null +++ b/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.util + +import org.apache.pekko +import pekko.annotation.InternalApi + +import java.util.concurrent.ThreadLocalRandom + +@InternalApi +private[pekko] trait RandomNumberGenerator { + def nextInt(): Int + def nextInt(n: Int): Int + def nextLong(): Long + def nextDouble(): Double +} + +@InternalApi +private[pekko] object ThreadLocalRandomNumberGenerator extends RandomNumberGenerator { + override def nextInt(): Int = ThreadLocalRandom.current().nextInt() + override def nextInt(bound: Int): Int = ThreadLocalRandom.current().nextInt(bound) + override def nextLong(): Long = ThreadLocalRandom.current().nextLong() + override def nextDouble(): Double = ThreadLocalRandom.current().nextDouble() +} + +@InternalApi +private[pekko] object RandomNumberGenerator { + def get(): RandomNumberGenerator = ThreadLocalRandomNumberGenerator +} diff --git a/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsCollector.scala b/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsCollector.scala index 8f85fa884ef..837c87cea4d 100644 --- a/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsCollector.scala +++ b/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsCollector.scala @@ -13,8 +13,6 @@ package org.apache.pekko.cluster.metrics -import java.util.concurrent.ThreadLocalRandom - import scala.collection.immutable import scala.annotation.nowarn @@ -29,6 +27,7 @@ import pekko.cluster.Cluster import pekko.cluster.ClusterEvent import pekko.cluster.Member import pekko.cluster.MemberStatus +import pekko.util.RandomNumberGenerator /** * Runtime collection management commands. @@ -280,7 +279,7 @@ private[metrics] class ClusterMetricsCollector extends Actor with ActorLogging { context.actorSelection(self.path.toStringWithAddress(address)) ! envelope def selectRandomNode(addresses: immutable.IndexedSeq[Address]): Option[Address] = - if (addresses.isEmpty) None else Some(addresses(ThreadLocalRandom.current.nextInt(addresses.size))) + if (addresses.isEmpty) None else Some(addresses(RandomNumberGenerator.get().nextInt(addresses.size))) /** * Publishes to the event stream. diff --git a/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsRouting.scala b/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsRouting.scala index d40b9871db0..7f1371d4ed7 100644 --- a/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsRouting.scala +++ b/cluster-metrics/src/main/scala/org/apache/pekko/cluster/metrics/ClusterMetricsRouting.scala @@ -14,7 +14,6 @@ package org.apache.pekko.cluster.metrics import java.util.Arrays -import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec @@ -36,6 +35,7 @@ import pekko.cluster.routing.ClusterRouterSettingsBase import pekko.dispatch.Dispatchers import pekko.japi.Util.immutableSeq import pekko.routing._ +import pekko.util.RandomNumberGenerator /** * Load balancing of messages to cluster nodes based on cluster metric data. @@ -93,9 +93,9 @@ final case class AdaptiveLoadBalancingRoutingLogic( updateWeightedRoutees() match { case Some(weighted) => if (weighted.isEmpty) NoRoutee - else weighted(ThreadLocalRandom.current.nextInt(weighted.total) + 1) + else weighted(RandomNumberGenerator.get().nextInt(weighted.total) + 1) case None => - routees(ThreadLocalRandom.current.nextInt(routees.size)) + routees(RandomNumberGenerator.get().nextInt(routees.size)) } } diff --git a/cluster-sharding-typed/src/test/java/org/apache/pekko/cluster/sharding/typed/ReplicatedShardingTest.java b/cluster-sharding-typed/src/test/java/org/apache/pekko/cluster/sharding/typed/ReplicatedShardingTest.java index 25599dd4d1d..817c738c871 100644 --- a/cluster-sharding-typed/src/test/java/org/apache/pekko/cluster/sharding/typed/ReplicatedShardingTest.java +++ b/cluster-sharding-typed/src/test/java/org/apache/pekko/cluster/sharding/typed/ReplicatedShardingTest.java @@ -201,7 +201,7 @@ public Receive createReceive() { private Behavior onForwardToRandom(ForwardToRandom forwardToRandom) { Map> refs = replicatedSharding.getEntityRefsFor(forwardToRandom.entityId); - int chosenIdx = ThreadLocalRandom.current().nextInt(refs.size()); + int chosenIdx = RandomNumberGenerator.get().nextInt(refs.size()); new ArrayList<>(refs.values()).get(chosenIdx).tell(forwardToRandom.message); return this; } diff --git a/cluster-tools/src/main/scala/org/apache/pekko/cluster/pubsub/DistributedPubSubMediator.scala b/cluster-tools/src/main/scala/org/apache/pekko/cluster/pubsub/DistributedPubSubMediator.scala index e3dd635f202..9e0cfe5ffd1 100644 --- a/cluster-tools/src/main/scala/org/apache/pekko/cluster/pubsub/DistributedPubSubMediator.scala +++ b/cluster-tools/src/main/scala/org/apache/pekko/cluster/pubsub/DistributedPubSubMediator.scala @@ -15,7 +15,6 @@ package org.apache.pekko.cluster.pubsub import java.net.URLDecoder import java.net.URLEncoder -import java.util.concurrent.ThreadLocalRandom import scala.collection.immutable import scala.collection.immutable.Set @@ -40,6 +39,7 @@ import pekko.routing.Routee import pekko.routing.Router import pekko.routing.RouterEnvelope import pekko.routing.RoutingLogic +import pekko.util.RandomNumberGenerator object DistributedPubSubSettings { @@ -911,7 +911,7 @@ class DistributedPubSubMediator(settings: DistributedPubSubSettings) } def selectRandomNode(addresses: immutable.IndexedSeq[Address]): Option[Address] = - if (addresses.isEmpty) None else Some(addresses(ThreadLocalRandom.current.nextInt(addresses.size))) + if (addresses.isEmpty) None else Some(addresses(RandomNumberGenerator.get().nextInt(addresses.size))) def prune(): Unit = { registry.foreach { diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/MembershipState.scala b/cluster/src/main/scala/org/apache/pekko/cluster/MembershipState.scala index ccedacf7624..e36872f260b 100644 --- a/cluster/src/main/scala/org/apache/pekko/cluster/MembershipState.scala +++ b/cluster/src/main/scala/org/apache/pekko/cluster/MembershipState.scala @@ -13,8 +13,6 @@ package org.apache.pekko.cluster -import java.util.concurrent.ThreadLocalRandom - import scala.annotation.tailrec import scala.collection.SortedSet import scala.collection.immutable @@ -25,6 +23,7 @@ import pekko.annotation.InternalApi import pekko.cluster.ClusterSettings.DataCenter import pekko.cluster.MemberStatus._ import pekko.util.ccompat._ +import pekko.util.RandomNumberGenerator /** * INTERNAL API @@ -405,16 +404,16 @@ import pekko.util.ccompat._ // don't go below the configured probability math.max((5 - localMembers) * 0.25, crossDcGossipProbability) } - ThreadLocalRandom.current.nextDouble() > probability + RandomNumberGenerator.get().nextDouble() > probability } protected def preferNodesWithDifferentView(state: MembershipState): Boolean = - ThreadLocalRandom.current.nextDouble() < adjustedGossipDifferentViewProbability(state.latestGossip.members.size) + RandomNumberGenerator.get().nextDouble() < adjustedGossipDifferentViewProbability(state.latestGossip.members.size) protected def dcsInRandomOrder(dcs: List[DataCenter]): List[DataCenter] = Random.shuffle(dcs) protected def selectRandomNode(nodes: IndexedSeq[UniqueAddress]): Option[UniqueAddress] = if (nodes.isEmpty) None - else Some(nodes(ThreadLocalRandom.current.nextInt(nodes.size))) + else Some(nodes(RandomNumberGenerator.get().nextInt(nodes.size))) } diff --git a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala index 8b64e33ec66..ad28a0e25c7 100644 --- a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala +++ b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala @@ -15,7 +15,6 @@ package org.apache.pekko.cluster.ddata import java.security.MessageDigest import java.util.Optional -import java.util.concurrent.ThreadLocalRandom import java.util.concurrent.TimeUnit import java.util.function.{ Function => JFunction } @@ -66,7 +65,7 @@ import pekko.dispatch.Dispatchers import pekko.event.Logging import pekko.remote.RARP import pekko.serialization.SerializationExtension -import pekko.util.ByteString +import pekko.util.{ ByteString, RandomNumberGenerator } import pekko.util.Helpers.toRootLowerCase import pekko.util.JavaDurationConverters._ import pekko.util.ccompat._ @@ -2154,7 +2153,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog if (totChunks == statusTotChunks) statusCount += 1 else { - statusCount = ThreadLocalRandom.current.nextInt(0, totChunks) + statusCount = RandomNumberGenerator.get().nextInt(0, totChunks) statusTotChunks = totChunks } val chunk = (statusCount % totChunks).toInt @@ -2167,7 +2166,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog } def selectRandomNode(addresses: immutable.IndexedSeq[UniqueAddress]): Option[UniqueAddress] = - if (addresses.isEmpty) None else Some(addresses(ThreadLocalRandom.current.nextInt(addresses.size))) + if (addresses.isEmpty) None else Some(addresses(RandomNumberGenerator.get().nextInt(addresses.size))) def replica(node: UniqueAddress): ActorSelection = context.actorSelection(self.path.toStringWithAddress(node.address)) diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/FailureInjectorTransportAdapter.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/FailureInjectorTransportAdapter.scala index cd3f730b7b3..7e257ea65e4 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/FailureInjectorTransportAdapter.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/FailureInjectorTransportAdapter.scala @@ -14,7 +14,6 @@ package org.apache.pekko.remote.transport import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.ThreadLocalRandom import scala.concurrent.{ Future, Promise } import scala.util.control.NoStackTrace @@ -28,7 +27,7 @@ import pekko.actor.{ Address, ExtendedActorSystem } import pekko.event.{ Logging, LoggingAdapter } import pekko.remote.transport.AssociationHandle.{ HandleEvent, HandleEventListener } import pekko.remote.transport.Transport._ -import pekko.util.ByteString +import pekko.util.{ ByteString, RandomNumberGenerator } @SerialVersionUID(1L) @deprecated("Classic remoting is deprecated, use Artery", "Akka 2.6.0") @@ -78,7 +77,7 @@ private[remote] class FailureInjectorTransportAdapter( extends AbstractTransportAdapter(wrappedTransport)(extendedSystem.dispatchers.internalDispatcher) with AssociationEventListener { - private def rng = ThreadLocalRandom.current() + private val rng = RandomNumberGenerator.get() private val log = Logging(extendedSystem, classOf[FailureInjectorTransportAdapter]) private val shouldDebugLog: Boolean = extendedSystem.settings.config.getBoolean("pekko.remote.classic.gremlin.debug") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala index 4442cc61698..cbb3f406360 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphInterpreter.scala @@ -13,8 +13,6 @@ package org.apache.pekko.stream.impl.fusing -import java.util.concurrent.ThreadLocalRandom - import scala.concurrent.Promise import scala.util.control.NonFatal @@ -27,6 +25,7 @@ import pekko.stream._ import pekko.stream.Attributes.LogLevels import pekko.stream.snapshot._ import pekko.stream.stage._ +import pekko.util.RandomNumberGenerator /** * INTERNAL API @@ -576,7 +575,7 @@ import pekko.stream.stage._ private def dequeue(): Connection = { val idx = queueHead & mask if (fuzzingMode) { - val swapWith = (ThreadLocalRandom.current.nextInt(queueTail - queueHead) + queueHead) & mask + val swapWith = (RandomNumberGenerator.get().nextInt(queueTail - queueHead) + queueHead) & mask val ev = eventQueue(swapWith) eventQueue(swapWith) = eventQueue(idx) eventQueue(idx) = ev From 1e345b0c2c150a5e74c55eabe60bc3c409caddbc Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 30 Jan 2025 01:27:44 +0100 Subject: [PATCH 02/12] scalafmt --- .../testkit/typed/internal/StubbedActorContext.scala | 8 +++++--- .../transport/FailureInjectorTransportAdapter.scala | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/StubbedActorContext.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/StubbedActorContext.scala index cb1325e2a01..124c10d2c14 100644 --- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/StubbedActorContext.scala +++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/StubbedActorContext.scala @@ -19,7 +19,7 @@ import pekko.actor.typed._ import pekko.actor.typed.internal._ import pekko.actor.{ ActorPath, ActorRefProvider, InvalidMessageException } import pekko.annotation.InternalApi -import pekko.util.{ Helpers, RandomNumberGenerator} +import pekko.util.{ Helpers, RandomNumberGenerator } import pekko.{ actor => classic } import org.slf4j.{ Logger, Marker } import org.slf4j.helpers.{ MessageFormatter, SubstituteLoggerFactory } @@ -110,7 +110,8 @@ private[pekko] final class FunctionRef[-T](override val path: ActorPath, send: ( override def spawnAnonymous[U](behavior: Behavior[U], props: Props = Props.empty): ActorRef[U] = { checkCurrentActorThread() - val btk = new BehaviorTestKitImpl[U](system, (path / childName.next()).withUid(RandomNumberGenerator.get().nextInt()), behavior) + val btk = new BehaviorTestKitImpl[U](system, + (path / childName.next()).withUid(RandomNumberGenerator.get().nextInt()), behavior) _children += btk.context.self.path.name -> btk btk.context.self } @@ -119,7 +120,8 @@ private[pekko] final class FunctionRef[-T](override val path: ActorPath, send: ( _children.get(name) match { case Some(_) => throw classic.InvalidActorNameException(s"actor name $name is already taken") case None => - val btk = new BehaviorTestKitImpl[U](system, (path / name).withUid(RandomNumberGenerator.get().nextInt()), behavior) + val btk = + new BehaviorTestKitImpl[U](system, (path / name).withUid(RandomNumberGenerator.get().nextInt()), behavior) _children += name -> btk btk.context.self } diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/FailureInjectorTransportAdapter.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/FailureInjectorTransportAdapter.scala index 7e257ea65e4..83aa9a5c2ad 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/FailureInjectorTransportAdapter.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/FailureInjectorTransportAdapter.scala @@ -77,7 +77,7 @@ private[remote] class FailureInjectorTransportAdapter( extends AbstractTransportAdapter(wrappedTransport)(extendedSystem.dispatchers.internalDispatcher) with AssociationEventListener { - private val rng = RandomNumberGenerator.get() + private def rng = RandomNumberGenerator.get() private val log = Logging(extendedSystem, classOf[FailureInjectorTransportAdapter]) private val shouldDebugLog: Boolean = extendedSystem.settings.config.getBoolean("pekko.remote.classic.gremlin.debug") From bf76e89638292c81ec5993ba8cdfd836cbbed665 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 30 Jan 2025 20:33:45 +0100 Subject: [PATCH 03/12] some compile issues --- .../actor/testkit/typed/internal/StubbedActorContext.scala | 2 +- .../main/scala/org/apache/pekko/pattern/CircuitBreaker.scala | 4 ++-- .../scala/org/apache/pekko/util/RandomNumberGenerator.scala | 2 ++ 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/StubbedActorContext.scala b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/StubbedActorContext.scala index 124c10d2c14..9d87f58f5a4 100644 --- a/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/StubbedActorContext.scala +++ b/actor-testkit-typed/src/main/scala/org/apache/pekko/actor/testkit/typed/internal/StubbedActorContext.scala @@ -173,7 +173,7 @@ private[pekko] final class FunctionRef[-T](override val path: ActorPath, send: ( @InternalApi private[pekko] def internalSpawnMessageAdapter[U](f: U => T, name: String): ActorRef[U] = { val n = if (name != "") s"${childName.next()}-$name" else childName.next() - val p = (path / n).withUid(rnd().nextInt()) + val p = (path / n).withUid(RandomNumberGenerator.get().nextInt()) val i = new BehaviorTestKitImpl[U](system, p, BehaviorImpl.ignore) _children += p.name -> i diff --git a/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala b/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala index f80d3f51738..5e6eb94289c 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala @@ -14,7 +14,7 @@ package org.apache.pekko.pattern import java.util.Optional -import java.util.concurrent.{ Callable, CompletionException, CompletionStage, CopyOnWriteArrayList, ThreadLocalRandom } +import java.util.concurrent.{ Callable, CompletionException, CompletionStage, CopyOnWriteArrayList } import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicLong } import java.util.function.BiFunction import java.util.function.Consumer @@ -32,7 +32,7 @@ import pekko.dispatch.ExecutionContexts.parasitic import pekko.pattern.internal.{ CircuitBreakerNoopTelemetry, CircuitBreakerTelemetry } import pekko.util.FutureConverters._ import pekko.util.JavaDurationConverters._ -import pekko.util.Unsafe +import pekko.util.{ RandomNumberGenerator, Unsafe } /** * Companion object providing factory methods for Circuit Breaker which runs callbacks in caller's thread diff --git a/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala b/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala index 334ad1308c6..be84de1d568 100644 --- a/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala +++ b/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala @@ -26,6 +26,7 @@ import java.util.concurrent.ThreadLocalRandom private[pekko] trait RandomNumberGenerator { def nextInt(): Int def nextInt(n: Int): Int + def nextInt(origin: Int, n: Int): Int def nextLong(): Long def nextDouble(): Double } @@ -34,6 +35,7 @@ private[pekko] trait RandomNumberGenerator { private[pekko] object ThreadLocalRandomNumberGenerator extends RandomNumberGenerator { override def nextInt(): Int = ThreadLocalRandom.current().nextInt() override def nextInt(bound: Int): Int = ThreadLocalRandom.current().nextInt(bound) + override def nextInt(origin: Int, bound: Int): Int = ThreadLocalRandom.current().nextInt(origin, bound) override def nextLong(): Long = ThreadLocalRandom.current().nextLong() override def nextDouble(): Double = ThreadLocalRandom.current().nextDouble() } From e4fda5f728f608321ce6a759eb9a0122721fda72 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 30 Jan 2025 20:47:45 +0100 Subject: [PATCH 04/12] Update ReplicatedShardingTest.java --- .../pekko/cluster/sharding/typed/ReplicatedShardingTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cluster-sharding-typed/src/test/java/org/apache/pekko/cluster/sharding/typed/ReplicatedShardingTest.java b/cluster-sharding-typed/src/test/java/org/apache/pekko/cluster/sharding/typed/ReplicatedShardingTest.java index 817c738c871..25599dd4d1d 100644 --- a/cluster-sharding-typed/src/test/java/org/apache/pekko/cluster/sharding/typed/ReplicatedShardingTest.java +++ b/cluster-sharding-typed/src/test/java/org/apache/pekko/cluster/sharding/typed/ReplicatedShardingTest.java @@ -201,7 +201,7 @@ public Receive createReceive() { private Behavior onForwardToRandom(ForwardToRandom forwardToRandom) { Map> refs = replicatedSharding.getEntityRefsFor(forwardToRandom.entityId); - int chosenIdx = RandomNumberGenerator.get().nextInt(refs.size()); + int chosenIdx = ThreadLocalRandom.current().nextInt(refs.size()); new ArrayList<>(refs.values()).get(chosenIdx).tell(forwardToRandom.message); return this; } From a51d31ba308be9f2b2017ef3741fc3398158f251 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 30 Jan 2025 22:24:18 +0100 Subject: [PATCH 05/12] stub of JEP 356 generator --- actor/src/main/resources/reference.conf | 8 ++++ .../pekko/util/RandomNumberGenerator.scala | 39 ++++++++++++++++++- 2 files changed, 46 insertions(+), 1 deletion(-) diff --git a/actor/src/main/resources/reference.conf b/actor/src/main/resources/reference.conf index 2cab15a2945..f7850706a83 100644 --- a/actor/src/main/resources/reference.conf +++ b/actor/src/main/resources/reference.conf @@ -1396,4 +1396,12 @@ pekko { } #//#circuit-breaker-default + random { + # The default random number generator used by the Pekko library. + # This setting does not affect SecureRandom use cases. + # Valid options are listed in + # https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/random/package-summary.html#algorithms + generator-implementation = "ThreadLocalRandom" + } + } diff --git a/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala b/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala index be84de1d568..194e6ae8b14 100644 --- a/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala +++ b/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala @@ -20,6 +20,9 @@ package org.apache.pekko.util import org.apache.pekko import pekko.annotation.InternalApi +import com.typesafe.config.ConfigFactory + +import java.lang.invoke.{ MethodHandles, MethodType } import java.util.concurrent.ThreadLocalRandom @InternalApi @@ -40,7 +43,41 @@ private[pekko] object ThreadLocalRandomNumberGenerator extends RandomNumberGener override def nextDouble(): Double = ThreadLocalRandom.current().nextDouble() } +// https://openjdk.org/jeps/356 +@InternalApi +private[pekko] class Jep356RandomNumberGenerator(impl: String) extends RandomNumberGenerator { + + private val rngClass = Class.forName("java.util.random.RandomGenerator") + private val lookup = MethodHandles.publicLookup() + private val createHandle = lookup.findStatic(rngClass, "of", MethodType.methodType(rngClass, classOf[String])) + private val intHandle = lookup.findVirtual(rngClass, "nextInt", MethodType.methodType(classOf[Int])) + private val intBoundHandle = + lookup.findVirtual(rngClass, "nextInt", MethodType.methodType(classOf[Int], classOf[Int])) + private val longHandle = lookup.findVirtual(rngClass, "nextLong", MethodType.methodType(classOf[Long])) + private val doubleHandle = lookup.findVirtual(rngClass, "nextDouble", MethodType.methodType(classOf[Double])) + private val rng = createHandle.invoke(impl) + + override def nextInt(): Int = intHandle.invoke(rng) + override def nextInt(bound: Int): Int = intBoundHandle.invoke(rng, bound) + override def nextInt(origin: Int, bound: Int): Int = { + if (origin >= bound) + throw new IllegalArgumentException("origin must be less than bound") + nextInt(bound - origin) + origin + } + override def nextLong(): Long = longHandle.invoke(rng) + override def nextDouble(): Double = doubleHandle.invoke(rng) +} + @InternalApi private[pekko] object RandomNumberGenerator { - def get(): RandomNumberGenerator = ThreadLocalRandomNumberGenerator + + private val generator = { + val cfg = ConfigFactory.load() + cfg.getString("pekko.random.generator-implementation") match { + case "ThreadLocalRandom" => ThreadLocalRandomNumberGenerator + case impl => new Jep356RandomNumberGenerator(impl) + } + } + + def get(): RandomNumberGenerator = generator } From 25a0e33195c58685d14942542108c11cde56d6b8 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 30 Jan 2025 22:28:21 +0100 Subject: [PATCH 06/12] Update RandomNumberGenerator.scala --- .../scala/org/apache/pekko/util/RandomNumberGenerator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala b/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala index 194e6ae8b14..6104a18b25d 100644 --- a/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala +++ b/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala @@ -75,7 +75,7 @@ private[pekko] object RandomNumberGenerator { val cfg = ConfigFactory.load() cfg.getString("pekko.random.generator-implementation") match { case "ThreadLocalRandom" => ThreadLocalRandomNumberGenerator - case impl => new Jep356RandomNumberGenerator(impl) + case impl => new Jep356RandomNumberGenerator(impl) } } From da776a2a06d15cc7741f314c3544e66cddc28dbb Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 30 Jan 2025 22:34:35 +0100 Subject: [PATCH 07/12] java version check --- .../scala/org/apache/pekko/util/RandomNumberGenerator.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala b/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala index 6104a18b25d..e1b0fd64731 100644 --- a/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala +++ b/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala @@ -71,12 +71,14 @@ private[pekko] class Jep356RandomNumberGenerator(impl: String) extends RandomNum @InternalApi private[pekko] object RandomNumberGenerator { - private val generator = { + private val generator = if (JavaVersion.majorVersion >= 17) { val cfg = ConfigFactory.load() cfg.getString("pekko.random.generator-implementation") match { case "ThreadLocalRandom" => ThreadLocalRandomNumberGenerator case impl => new Jep356RandomNumberGenerator(impl) } + } else { + ThreadLocalRandomNumberGenerator } def get(): RandomNumberGenerator = generator From a547beb6b8366e41322a652f778ed5f98e66994f Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Fri, 31 Jan 2025 14:19:10 +0100 Subject: [PATCH 08/12] Update RandomNumberGenerator.scala --- .../pekko/util/RandomNumberGenerator.scala | 38 ++++++++++++++----- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala b/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala index e1b0fd64731..c41ca7f51e2 100644 --- a/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala +++ b/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala @@ -20,11 +20,14 @@ package org.apache.pekko.util import org.apache.pekko import pekko.annotation.InternalApi -import com.typesafe.config.ConfigFactory +import com.typesafe.config.{ Config, ConfigFactory } import java.lang.invoke.{ MethodHandles, MethodType } import java.util.concurrent.ThreadLocalRandom +/** + * INTERNAL API + */ @InternalApi private[pekko] trait RandomNumberGenerator { def nextInt(): Int @@ -34,6 +37,9 @@ private[pekko] trait RandomNumberGenerator { def nextDouble(): Double } +/** + * INTERNAL API + */ @InternalApi private[pekko] object ThreadLocalRandomNumberGenerator extends RandomNumberGenerator { override def nextInt(): Int = ThreadLocalRandom.current().nextInt() @@ -43,10 +49,14 @@ private[pekko] object ThreadLocalRandomNumberGenerator extends RandomNumberGener override def nextDouble(): Double = ThreadLocalRandom.current().nextDouble() } -// https://openjdk.org/jeps/356 +/** + * INTERNAL API + */ @InternalApi private[pekko] class Jep356RandomNumberGenerator(impl: String) extends RandomNumberGenerator { + // https://openjdk.org/jeps/356 + private val rngClass = Class.forName("java.util.random.RandomGenerator") private val lookup = MethodHandles.publicLookup() private val createHandle = lookup.findStatic(rngClass, "of", MethodType.methodType(rngClass, classOf[String])) @@ -68,18 +78,26 @@ private[pekko] class Jep356RandomNumberGenerator(impl: String) extends RandomNum override def nextDouble(): Double = doubleHandle.invoke(rng) } +/** + * INTERNAL API + */ @InternalApi private[pekko] object RandomNumberGenerator { - private val generator = if (JavaVersion.majorVersion >= 17) { - val cfg = ConfigFactory.load() - cfg.getString("pekko.random.generator-implementation") match { - case "ThreadLocalRandom" => ThreadLocalRandomNumberGenerator - case impl => new Jep356RandomNumberGenerator(impl) + /** + * INTERNAL API. Open for testing. + */ + def createGenerator(cfg: Config) = + if (JavaVersion.majorVersion >= 17) { + cfg.getString("pekko.random.generator-implementation") match { + case "ThreadLocalRandom" => ThreadLocalRandomNumberGenerator + case impl => new Jep356RandomNumberGenerator(impl) + } + } else { + ThreadLocalRandomNumberGenerator } - } else { - ThreadLocalRandomNumberGenerator - } + + private val generator = createGenerator(ConfigFactory.load()) def get(): RandomNumberGenerator = generator } From e10720d820dc048cee0ff488578684823fe54b0f Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Fri, 31 Jan 2025 19:12:03 +0100 Subject: [PATCH 09/12] add test --- .../util/RandomNumberGeneratorSpec.scala | 32 +++++++++++++++++++ actor/src/main/resources/reference.conf | 1 + 2 files changed, 33 insertions(+) create mode 100644 actor-tests/src/test/scala/org/apache/pekko/util/RandomNumberGeneratorSpec.scala diff --git a/actor-tests/src/test/scala/org/apache/pekko/util/RandomNumberGeneratorSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/util/RandomNumberGeneratorSpec.scala new file mode 100644 index 00000000000..7b3f4558d53 --- /dev/null +++ b/actor-tests/src/test/scala/org/apache/pekko/util/RandomNumberGeneratorSpec.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.util + +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class RandomNumberGeneratorSpec extends AnyWordSpec with Matchers { + + "RandomNumberGenerator" should { + + "default to ThreadLocalRandom" in { + RandomNumberGenerator.get() shouldEqual ThreadLocalRandomNumberGenerator + } + + } +} diff --git a/actor/src/main/resources/reference.conf b/actor/src/main/resources/reference.conf index f7850706a83..573d5922e68 100644 --- a/actor/src/main/resources/reference.conf +++ b/actor/src/main/resources/reference.conf @@ -1399,6 +1399,7 @@ pekko { random { # The default random number generator used by the Pekko library. # This setting does not affect SecureRandom use cases. + # This option is ignored if you are not using Java 17+. The default is "ThreadLocalRandom". # Valid options are listed in # https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/random/package-summary.html#algorithms generator-implementation = "ThreadLocalRandom" From f9feeb67fb51a98d281df141f94875b4824f34db Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Fri, 31 Jan 2025 19:29:30 +0100 Subject: [PATCH 10/12] Update RandomNumberGenerator.scala --- .../scala/org/apache/pekko/util/RandomNumberGenerator.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala b/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala index c41ca7f51e2..61bba0dc08b 100644 --- a/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala +++ b/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala @@ -51,7 +51,7 @@ private[pekko] object ThreadLocalRandomNumberGenerator extends RandomNumberGener /** * INTERNAL API - */ + */ @InternalApi private[pekko] class Jep356RandomNumberGenerator(impl: String) extends RandomNumberGenerator { @@ -80,13 +80,13 @@ private[pekko] class Jep356RandomNumberGenerator(impl: String) extends RandomNum /** * INTERNAL API - */ + */ @InternalApi private[pekko] object RandomNumberGenerator { /** * INTERNAL API. Open for testing. - */ + */ def createGenerator(cfg: Config) = if (JavaVersion.majorVersion >= 17) { cfg.getString("pekko.random.generator-implementation") match { From 74d435862f6f7caf196b5a428edf6eb328c0fd76 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sat, 8 Feb 2025 13:07:08 +0100 Subject: [PATCH 11/12] add test --- .../RandomNumberGeneratorJava21Spec.scala | 37 +++++++++++++++++++ .../util/RandomNumberGeneratorSpec.scala | 4 +- .../pekko/util/RandomNumberGenerator.scala | 2 +- 3 files changed, 41 insertions(+), 2 deletions(-) create mode 100644 actor-tests/src/test/scala-jdk21-only/org/apache/pekko/util/RandomNumberGeneratorJava21Spec.scala diff --git a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/util/RandomNumberGeneratorJava21Spec.scala b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/util/RandomNumberGeneratorJava21Spec.scala new file mode 100644 index 00000000000..97dd521cf7f --- /dev/null +++ b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/util/RandomNumberGeneratorJava21Spec.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.util + +import com.typesafe.config.ConfigFactory +import org.scalatest.matchers.should.Matchers +import org.scalatest.wordspec.AnyWordSpec + +class RandomNumberGeneratorJava21Spec extends AnyWordSpec with Matchers { + + "RandomNumberGenerator (Java 21+)" should { + + "support config" in { + val config = ConfigFactory.parseString( + """pekko.random.generator.generator-implementation = "Xoroshiro128PlusPlus"""") + val rng = RandomNumberGenerator.createGenerator(config) + rng shouldBe a[Jep356RandomNumberGenerator] + rng.nextInt(10) should (be >= 0 and be < 10) + } + + } +} diff --git a/actor-tests/src/test/scala/org/apache/pekko/util/RandomNumberGeneratorSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/util/RandomNumberGeneratorSpec.scala index 7b3f4558d53..5202ea10123 100644 --- a/actor-tests/src/test/scala/org/apache/pekko/util/RandomNumberGeneratorSpec.scala +++ b/actor-tests/src/test/scala/org/apache/pekko/util/RandomNumberGeneratorSpec.scala @@ -25,7 +25,9 @@ class RandomNumberGeneratorSpec extends AnyWordSpec with Matchers { "RandomNumberGenerator" should { "default to ThreadLocalRandom" in { - RandomNumberGenerator.get() shouldEqual ThreadLocalRandomNumberGenerator + val rng = RandomNumberGenerator.get() + rng shouldEqual ThreadLocalRandomNumberGenerator + rng.nextInt(10) should (be >= 0 and be < 10) } } diff --git a/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala b/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala index 61bba0dc08b..0c9893fb10e 100644 --- a/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala +++ b/actor/src/main/scala/org/apache/pekko/util/RandomNumberGenerator.scala @@ -53,7 +53,7 @@ private[pekko] object ThreadLocalRandomNumberGenerator extends RandomNumberGener * INTERNAL API */ @InternalApi -private[pekko] class Jep356RandomNumberGenerator(impl: String) extends RandomNumberGenerator { +private[pekko] final class Jep356RandomNumberGenerator(impl: String) extends RandomNumberGenerator { // https://openjdk.org/jeps/356 From dba4e2c114389c5c33f58099a811f2083c0aa2e8 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sat, 8 Feb 2025 13:31:48 +0100 Subject: [PATCH 12/12] Update RandomNumberGeneratorJava21Spec.scala --- .../org/apache/pekko/util/RandomNumberGeneratorJava21Spec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/util/RandomNumberGeneratorJava21Spec.scala b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/util/RandomNumberGeneratorJava21Spec.scala index 97dd521cf7f..7c952054425 100644 --- a/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/util/RandomNumberGeneratorJava21Spec.scala +++ b/actor-tests/src/test/scala-jdk21-only/org/apache/pekko/util/RandomNumberGeneratorJava21Spec.scala @@ -27,7 +27,7 @@ class RandomNumberGeneratorJava21Spec extends AnyWordSpec with Matchers { "support config" in { val config = ConfigFactory.parseString( - """pekko.random.generator.generator-implementation = "Xoroshiro128PlusPlus"""") + """pekko.random.generator-implementation = "Xoroshiro128PlusPlus"""") val rng = RandomNumberGenerator.createGenerator(config) rng shouldBe a[Jep356RandomNumberGenerator] rng.nextInt(10) should (be >= 0 and be < 10)