From 80fb42caf92ab0c848468f3932a718839145a30d Mon Sep 17 00:00:00 2001 From: He-Pin Date: Wed, 22 Jan 2025 04:20:36 +0800 Subject: [PATCH] chore: add a helper method to create a dedicated virtual thread scheduler. --- cask/src/cask/internal/Util.scala | 47 +++++++++++++++++++++++++++++-- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/cask/src/cask/internal/Util.scala b/cask/src/cask/internal/Util.scala index b3bb82d587..3e7019b874 100644 --- a/cask/src/cask/internal/Util.scala +++ b/cask/src/cask/internal/Util.scala @@ -4,9 +4,11 @@ import java.io.{InputStream, PrintWriter, StringWriter} import scala.collection.generic.CanBuildFrom import scala.collection.mutable import java.io.OutputStream -import java.lang.invoke.{MethodHandles, MethodType} -import java.util.concurrent.{Executor, ExecutorService, ForkJoinPool, ThreadFactory} +import java.lang.invoke.{MethodHandle, MethodHandles, MethodType} +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.{Executor, ExecutorService, ForkJoinPool, ForkJoinWorkerThread, ThreadFactory} import scala.annotation.switch +import scala.concurrent.duration.TimeUnit import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.Try import scala.util.control.NonFatal @@ -69,6 +71,47 @@ object Util { } } + /** + * A helper class to create the carrier thread for the virtual thread, + * Require Java 21 or above. + * */ + private object CarrierThreadFactory extends ForkJoinPool.ForkJoinWorkerThreadFactory { + private val counter = new AtomicInteger(0) + private val clazz = lookup.findClass("jdk.internal.misc.CarrierThread") + private val constructor: MethodHandle = lookup.findConstructor( + clazz, + MethodType.methodType(classOf[Unit], classOf[ForkJoinPool])) + + override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = { + val carrierThread = constructor.invoke(pool).asInstanceOf[ForkJoinWorkerThread] + // Set the name of the carrier thread + carrierThread.setName("cask-carrier-thread-" + counter.incrementAndGet()) + carrierThread + } + } + + /** + * Create a dedicated forkjoin based scheduler for the virtual thread. + * */ + def createVirtualThreadScheduler(parallelism: Int, + corePoolSize: Int, + maximumPoolSize: Int, + keepAliveTime: Int, + timeUnit: TimeUnit): ForkJoinPool = { + new ForkJoinPool( + parallelism, + CarrierThreadFactory, + (_: Thread, _: Throwable) => {}, // ignored for carrier thread + true, //FIFO + corePoolSize, + maximumPoolSize, + parallelism / 2, + (_: ForkJoinPool) => true, //which is needed for virtual thread + keepAliveTime, + timeUnit + ) + } + /** * Create a virtual thread factory with a executor, the executor will be used as the scheduler of * virtual thread.