Skip to content

Commit

Permalink
feat: Add support for switching scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Jan 17, 2025
1 parent b20ec82 commit 603397b
Showing 1 changed file with 34 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.pekko.dispatch

import org.apache.pekko.annotation.InternalApi
import org.apache.pekko.util.JavaVersion
import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.util.JavaVersion

import java.lang.invoke.{ MethodHandles, MethodType }
import java.util.concurrent.{ ExecutorService, ThreadFactory }
import java.util.concurrent.{ Executor, ExecutorService, ThreadFactory }
import scala.util.control.NonFatal

@InternalApi
Expand All @@ -33,20 +34,48 @@ private[dispatch] object VirtualThreadSupport {
*/
val isSupported: Boolean = JavaVersion.majorVersion >= 21

/**
* Create a virtual thread executor with the given executor as the scheduler.
*/
def newThreadPerTaskExecutor(prefix: String, executor: Executor): ExecutorService = {
val factory = newVirtualThreadFactory(prefix, executor)
newThreadPerTaskExecutor(factory)
}

/**
* Create a virtual thread factory with the default scheduler in VirtualThread.
*/
def newVirtualThreadFactory(prefix: String): ThreadFactory = {
newVirtualThreadFactory(prefix, null)
}

/**
* Create a virtual thread factory with a executor, the executor will be used as the scheduler of
* virtual thread.
*
* The executor should run task on platform threads.
*
* returns null if not supported.
*/
def newVirtualThreadFactory(prefix: String): ThreadFactory = {
def newVirtualThreadFactory(prefix: String, executor: Executor): ThreadFactory = {
require(isSupported, "Virtual thread is not supported.")
try {
val builderClass = ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder")
val ofVirtualClass = ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder$OfVirtual")
val ofVirtualMethod = lookup.findStatic(classOf[Thread], "ofVirtual", MethodType.methodType(ofVirtualClass))
var builder = ofVirtualMethod.invoke()
if (executor ne null) { // Set the scheduler of virtual thread
val clazz = builder.getClass
val privateLookup = MethodHandles.privateLookupIn(
clazz,
lookup
)
val schedulerFieldSetter = privateLookup
.findSetter(clazz, "scheduler", classOf[Executor])
schedulerFieldSetter.invoke(builder, executor)
}
val nameMethod = lookup.findVirtual(ofVirtualClass, "name",
MethodType.methodType(ofVirtualClass, classOf[String], classOf[Long]))
// TODO support replace scheduler when we drop Java 8 support
val factoryMethod = lookup.findVirtual(builderClass, "factory", MethodType.methodType(classOf[ThreadFactory]))
builder = nameMethod.invoke(builder, prefix + "-virtual-thread-", 0L)
factoryMethod.invoke(builder).asInstanceOf[ThreadFactory]
Expand Down

0 comments on commit 603397b

Please sign in to comment.