Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add a helper method to create a dedicated virtual thread scheduler. #164

Merged
merged 1 commit into from
Feb 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions cask/src/cask/internal/Util.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import java.io.{InputStream, PrintWriter, StringWriter}
import scala.collection.generic.CanBuildFrom
import scala.collection.mutable
import java.io.OutputStream
import java.lang.invoke.{MethodHandles, MethodType}
import java.util.concurrent.{Executor, ExecutorService, ForkJoinPool, ThreadFactory}
import java.lang.invoke.{MethodHandle, MethodHandles, MethodType}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.{Executor, ExecutorService, ForkJoinPool, ForkJoinWorkerThread, ThreadFactory}
import scala.annotation.switch
import scala.concurrent.duration.TimeUnit
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Try
import scala.util.control.NonFatal
Expand Down Expand Up @@ -69,6 +71,49 @@ object Util {
}
}

/**
* A helper class to create the carrier thread for the virtual thread,
* Require Java 21 or above.
* */
private object CarrierThreadFactory extends ForkJoinPool.ForkJoinWorkerThreadFactory {
private val counter = new AtomicInteger(0)
private val clazz = lookup.findClass("jdk.internal.misc.CarrierThread")
private val constructor: MethodHandle = lookup.findConstructor(
clazz,
MethodType.methodType(classOf[Unit], classOf[ForkJoinPool]))

override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = {
val carrierThread = constructor.invoke(pool).asInstanceOf[ForkJoinWorkerThread]
// Set the name of the carrier thread
carrierThread.setName("cask-carrier-thread-" + counter.incrementAndGet())
carrierThread
}
}

/**
* Create a dedicated forkjoin based scheduler for the virtual thread.
* NOTE: you can use other threads pool as scheduler too, this method just integrated the `CarrierThreadFactory`
* when creating the ForkJoinPool.
* */
def createForkJoinPoolBasedScheduler(parallelism: Int,
corePoolSize: Int,
maximumPoolSize: Int,
keepAliveTime: Int,
timeUnit: TimeUnit): Executor = {
new ForkJoinPool(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it need to be a ForkJoinPool, or can other pools work? I've had odd interactions between ForkJoinPool and other libraries (e.g. scala.util.concurrent.blocking) so I wonder if we can provide alternative pools

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it can be any Executor, just make sure a virtual thread can be always run. We can use Executor#newCachedThreadPool, or ThreadPoolExecutor with a larger max pool size (because of the classloading issue) if we have a small thread pool size, and then the Virtual thread may not be able to run, then deadlock can happen, I have updated the method name.

This help method is just integrated with the CarrierThreadFactory.

see: https://openjdk.org/jeps/491

Future Work
There are a few remaining cases, unrelated to the synchronized keyword, in which a virtual thread cannot unmount when blocking:

When resolving a symbolic reference ([JVMS §5.4.3](https://docs.oracle.com/javase/specs/jvms/se23/html/jvms-5.html#jvms-5.4.3)) to a class or interface and the virtual thread blocks while loading a class. This is a case where the virtual thread pins the carrier due to a native frame on the stack.

When blocking inside a class initializer. This is also a case where the virtual thread pins the carrier due to a native frame on the stack.

When waiting for a class to be initialized by another thread ([JVMS §5.5](https://docs.oracle.com/javase/specs/jvms/se23/html/jvms-5.html#jvms-5.5)). This is a special case where the virtual thread blocks in the JVM, thus pinning the carrier.

These cases should rarely cause issues but we will revisit them if they prove to be problematic.

parallelism,
CarrierThreadFactory,
(_: Thread, _: Throwable) => {}, // ignored for carrier thread
true, //FIFO
corePoolSize,
maximumPoolSize,
parallelism / 2,
(_: ForkJoinPool) => true, //which is needed for virtual thread
keepAliveTime,
timeUnit
)
}

/**
* Create a virtual thread factory with a executor, the executor will be used as the scheduler of
* virtual thread.
Expand Down
1 change: 1 addition & 0 deletions docs/pages/1 - Cask - a Scala HTTP micro-framework.md
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ Cask can support using Virtual Threads to handle the request out of the box, you
1. You can change the default scheduler of the carrier threads with `cask.internal.Util.createVirtualThreadExecutor` method, but keep in mind, that's not officially supported by JDK for now.
2. You can supply your own `Executor` by override the `handlerExecutor()` method in your `cask.Main` object, which will be called only once when the server starts.
3. You can use `jdk.internal.misc.Blocker`'s `begin` and `end` methods to help the `ForkJoinPool` when needed.
4. You can use `Util.createVirtualThreadScheduler` to create separate `ForkJoinPool` as scheduler for the virtual threads.

**NOTE**:

Expand Down
Loading