Skip to content

Commit

Permalink
chore: add a helper method to create a dedicated virtual thread sched…
Browse files Browse the repository at this point in the history
…uler.
  • Loading branch information
He-Pin committed Jan 21, 2025
1 parent 05ca192 commit 80fb42c
Showing 1 changed file with 45 additions and 2 deletions.
47 changes: 45 additions & 2 deletions cask/src/cask/internal/Util.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import java.io.{InputStream, PrintWriter, StringWriter}
import scala.collection.generic.CanBuildFrom
import scala.collection.mutable
import java.io.OutputStream
import java.lang.invoke.{MethodHandles, MethodType}
import java.util.concurrent.{Executor, ExecutorService, ForkJoinPool, ThreadFactory}
import java.lang.invoke.{MethodHandle, MethodHandles, MethodType}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{Executor, ExecutorService, ForkJoinPool, ForkJoinWorkerThread, ThreadFactory}
import scala.annotation.switch
import scala.concurrent.duration.TimeUnit
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Try
import scala.util.control.NonFatal
Expand Down Expand Up @@ -69,6 +71,47 @@ object Util {
}
}

/**
* A helper class to create the carrier thread for the virtual thread,
* Require Java 21 or above.
* */
private object CarrierThreadFactory extends ForkJoinPool.ForkJoinWorkerThreadFactory {
private val counter = new AtomicInteger(0)
private val clazz = lookup.findClass("jdk.internal.misc.CarrierThread")
private val constructor: MethodHandle = lookup.findConstructor(
clazz,
MethodType.methodType(classOf[Unit], classOf[ForkJoinPool]))

override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = {
val carrierThread = constructor.invoke(pool).asInstanceOf[ForkJoinWorkerThread]
// Set the name of the carrier thread
carrierThread.setName("cask-carrier-thread-" + counter.incrementAndGet())
carrierThread
}
}

/**
* Create a dedicated forkjoin based scheduler for the virtual thread.
* */
def createVirtualThreadScheduler(parallelism: Int,
corePoolSize: Int,
maximumPoolSize: Int,
keepAliveTime: Int,
timeUnit: TimeUnit): ForkJoinPool = {
new ForkJoinPool(
parallelism,
CarrierThreadFactory,
(_: Thread, _: Throwable) => {}, // ignored for carrier thread
true, //FIFO
corePoolSize,
maximumPoolSize,
parallelism / 2,
(_: ForkJoinPool) => true, //which is needed for virtual thread
keepAliveTime,
timeUnit
)
}

/**
* Create a virtual thread factory with a executor, the executor will be used as the scheduler of
* virtual thread.
Expand Down

0 comments on commit 80fb42c

Please sign in to comment.