diff --git a/src/hotspot/share/runtime/objectMonitor.cpp b/src/hotspot/share/runtime/objectMonitor.cpp index 3c98d15e4e9a8..c7af150aa74ce 100644 --- a/src/hotspot/share/runtime/objectMonitor.cpp +++ b/src/hotspot/share/runtime/objectMonitor.cpp @@ -22,6 +22,7 @@ * */ +#include "classfile/symbolTable.hpp" #include "classfile/vmSymbols.hpp" #include "gc/shared/oopStorage.hpp" #include "gc/shared/oopStorageSet.hpp" @@ -42,6 +43,7 @@ #include "runtime/globals.hpp" #include "runtime/handles.inline.hpp" #include "runtime/interfaceSupport.inline.hpp" +#include "runtime/javaCalls.hpp" #include "runtime/javaThread.inline.hpp" #include "runtime/mutexLocker.hpp" #include "runtime/objectMonitor.inline.hpp" @@ -536,6 +538,90 @@ void ObjectMonitor::notify_contended_enter(JavaThread* current, bool post_jvmti_ } } +// Compensate the ForkJoinPool scheduler when a pinned virtual thread's +// carrier is about to block on a contended monitor. Without compensation +// the pool has no visibility that its worker is blocked, which can lead to +// carrier starvation / deadlock (JDK-8345294). +// +// This calls CarrierThread.beginMonitorBlock() which uses +// ForkJoinPool.tryCompensateForMonitor() to (a) activate an idle worker or +// create a replacement spare carrier, and (b) decrement RC so that signalWork +// can find a carrier for any queued continuation (e.g. the VT holding the +// contested lock) that would otherwise be starved because blocked carriers +// still count as active in the pool's RC field. The RC decrement is restored +// by endCompensatedBlock when this carrier unblocks. +// +// IMPORTANT: At this point notify_contended_enter() has already set +// _current_pending_monitor. The Java call below may itself trigger +// nested monitor contention (e.g., ThreadGroup.addUnstarted acquires a +// monitor when creating a spare worker thread). To avoid: +// (a) assertion failures (pending_monitor != nullptr on nested entry) +// (b) corruption of _current_pending_monitor for the outer monitor +// we save and clear the pending monitor before the Java call and restore +// it afterwards. +static bool compensate_pinned_carrier(JavaThread* current) { + oop carrier = current->threadObj(); + if (carrier == nullptr) return false; + + // Save and clear pending monitor so nested monitor enters work cleanly. + ObjectMonitor* saved_pending = current->current_pending_monitor(); + current->set_current_pending_monitor(nullptr); + + HandleMark hm(current); + Handle carrier_h(current, carrier); + Klass* klass = carrier_h->klass(); + + // CarrierThread.beginMonitorBlock() — compensates the ForkJoinPool via + // tryCompensateForMonitor which always activates/creates a real spare + // carrier (branch 2 of tryCompensate is omitted for this path). + // Not every carrier thread is a CarrierThread (custom schedulers may + // use plain threads), so we look up the method and silently return + // false if it is not found. + TempNewSymbol begin_name = SymbolTable::new_symbol("beginMonitorBlock"); + TempNewSymbol begin_sig = SymbolTable::new_symbol("()Z"); + + JavaValue result(T_BOOLEAN); + JavaCalls::call_virtual(&result, carrier_h, klass, + begin_name, begin_sig, current); + + bool compensated = false; + if (current->has_pending_exception()) { + current->clear_pending_exception(); + } else { + compensated = result.get_jboolean(); + } + + // Restore pending monitor for the outer monitor enter. + current->set_current_pending_monitor(saved_pending); + return compensated; +} + +static void end_compensate_pinned_carrier(JavaThread* current) { + oop carrier = current->threadObj(); + if (carrier == nullptr) return; + + // Save and clear pending monitor (should be nullptr here, but be safe). + ObjectMonitor* saved_pending = current->current_pending_monitor(); + current->set_current_pending_monitor(nullptr); + + HandleMark hm(current); + Handle carrier_h(current, carrier); + Klass* klass = carrier_h->klass(); + + TempNewSymbol end_name = SymbolTable::new_symbol("endBlocking"); + TempNewSymbol end_sig = vmSymbols::void_method_signature(); + + JavaValue result(T_VOID); + JavaCalls::call_virtual(&result, carrier_h, klass, + end_name, end_sig, current); + + if (current->has_pending_exception()) { + current->clear_pending_exception(); + } + + current->set_current_pending_monitor(saved_pending); +} + void ObjectMonitor::enter_with_contention_mark(JavaThread* current, ObjectMonitorContentionMark &cm, bool post_jvmti_events) { assert(current == JavaThread::current(), "must be"); assert(!has_owner(current), "must be"); @@ -583,6 +669,15 @@ void ObjectMonitor::enter_with_contention_mark(JavaThread* current, ObjectMonito } } + // Compensate the ForkJoinPool before the carrier blocks so that the + // scheduler can spin up a replacement worker thread. This prevents + // deadlocks where every carrier is pinned and waiting for a monitor + // held by an unmounted virtual thread that can never get a carrier. + bool compensated = false; + if (is_virtual) { + compensated = compensate_pinned_carrier(current); + } + { // Change java thread status to indicate blocked on monitor enter. JavaThreadBlockedOnMonitorEnterState jtbmes(current, this); @@ -620,6 +715,11 @@ void ObjectMonitor::enter_with_contention_mark(JavaThread* current, ObjectMonito // the monitor free and clear. } + // End compensation now that the carrier is no longer blocked. + if (compensated) { + end_compensate_pinned_carrier(current); + } + assert(contentions() >= 0, "must not be negative: contentions=%d", contentions()); // Must either set _recursions = 0 or ASSERT _recursions == 0. diff --git a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java index f289186e0aded..f36d07385e36b 100644 --- a/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java +++ b/src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java @@ -4329,14 +4329,114 @@ final long beginCompensatedBlock() { return (c == 0) ? 0L : RC_UNIT; } + /** + * Variant of tryCompensate for use when a carrier thread is about to block + * on a native OS-level monitor (objectMonitor::enter_internal) while pinned + * by a virtual thread. + * + *

Unlike tryCompensate, this method omits the passive RC-only path + * (branch 2 of tryCompensate), so the blocking carrier always results in + * either an idle worker being activated (branch 1) or a new spare being + * created (branch 3). + * + *

Both branches decrement RC so that {@code signalWork} (triggered by + * subsequent VT task submissions) sees the pool as under-active and can + * dispatch the queued VT continuations. Without the RC decrement in + * branch 1, monitor-blocked carriers are counted as "active" by + * {@code signalWork}'s {@code (short)(c >>> RC_SHIFT) >= pc} check, + * preventing it from waking idle workers when tasks are queued. + * + *

The RC decrement in both branches requires {@code endCompensatedBlock} + * to restore two RC_UNITs: one for the decrement here and one for + * the compensator's eventual deactivation (RC--). Both branches return + * {@code 2} so that {@code beginMonitorCompensatedBlock} passes + * {@code 2 * RC_UNIT} to {@code endCompensatedBlock}: + *

+     *   branch CAS: RC--                (-1)
+     *   compensator deactivates: RC--   (-1)
+     *   endCompensatedBlock:            (+2)
+     *   net:                             0
+     * 
+ */ + private int tryCompensateForMonitor(long c) { + Predicate sat; + long b = config; + int pc = parallelism, + maxTotal = (short)(b >>> TC_SHIFT) + pc, + total = (short)(c >>> TC_SHIFT), + sp = (int)c, + stat = -1; + if (sp != 0) { // activate idle worker (branch 1) + // RC-- so that signalWork sees the pool as under-active while + // the carrier is blocked on the monitor. Without this, RC + // counts monitor-blocked carriers as "active", preventing + // signalWork from waking idle workers. Return stat=2 so + // that beginMonitorCompensatedBlock passes 2*RC_UNIT to + // endCompensatedBlock, balancing both the RC-- here and the + // compensator's eventual deactivation (RC--). + WorkQueue[] qs; WorkQueue v; int i; + if ((qs = queues) != null && qs.length > (i = sp & SMASK) && + (v = qs[i]) != null && + compareAndSetCtl(c, ((c - RC_UNIT) & UMASK) | + (v.stackPred & LMASK))) { + v.phase = sp; + if (v.parking != 0) + U.unpark(v.owner); + stat = 2; // need 2 * RC_UNIT at unblock + } + } + // Branch 2 (passive RC-only decrement) is intentionally absent. + else if (total < maxTotal && total < MAX_CAP) { // create spare (branch 3) + // TC++ and RC-- so that signalWork sees the pool as under-active. + // Return stat=2 so that beginMonitorCompensatedBlock passes + // 2*RC_UNIT to endCompensatedBlock, balancing both the RC-- + // here and the spare's eventual deactivation (RC--). + long nc = ((c + TC_UNIT) & TC_MASK) | + ((c - RC_UNIT) & RC_MASK) | + (c & LMASK); + if ((runState & STOP) != 0L) + stat = 0; + else if (compareAndSetCtl(c, nc)) { + createWorker(); + stat = 2; // need 2 * RC_UNIT at unblock + } + } + else if (!compareAndSetCtl(c, c)) // validate + ; + else if ((sat = saturate) != null && sat.test(this)) + stat = 0; + else + throw new RejectedExecutionException( + "Thread limit exceeded replacing blocked worker"); + return stat; + } + + /** + * Like beginCompensatedBlock but for carrier threads that are about to + * block on a native OS-level monitor (objectMonitor::enter_internal) while + * pinned by a virtual thread. Uses tryCompensateForMonitor which always + * activates or creates a real replacement carrier (never passive RC-only). + *

tryCompensateForMonitor returns: + *

+ * @return value to pass to endCompensatedBlock + */ + final long beginMonitorCompensatedBlock() { + int c; + do {} while ((c = tryCompensateForMonitor(ctl)) < 0); + return (c == 0) ? 0L : (long)c * RC_UNIT; + } + /** * Re-adjusts parallelism after a blocking operation completes. - * @param post value from beginCompensatedBlock + * @param post value from beginCompensatedBlock or beginMonitorCompensatedBlock */ void endCompensatedBlock(long post) { - if (post > 0L) { + if (post > 0L) getAndAddCtl(post); - } } /** ManagedBlock for external threads */ @@ -4393,9 +4493,14 @@ protected RunnableFuture newTaskFor(Callable callable) { public long beginCompensatedBlock(ForkJoinPool pool) { return pool.beginCompensatedBlock(); } + @Override public void endCompensatedBlock(ForkJoinPool pool, long post) { pool.endCompensatedBlock(post); } + @Override + public long beginMonitorCompensatedBlock(ForkJoinPool pool) { + return pool.beginMonitorCompensatedBlock(); + } }); defaultForkJoinWorkerThreadFactory = new DefaultForkJoinWorkerThreadFactory(); diff --git a/src/java.base/share/classes/jdk/internal/access/JavaUtilConcurrentFJPAccess.java b/src/java.base/share/classes/jdk/internal/access/JavaUtilConcurrentFJPAccess.java index 28c1b7fddee99..5f824dfabbd49 100644 --- a/src/java.base/share/classes/jdk/internal/access/JavaUtilConcurrentFJPAccess.java +++ b/src/java.base/share/classes/jdk/internal/access/JavaUtilConcurrentFJPAccess.java @@ -29,4 +29,20 @@ public interface JavaUtilConcurrentFJPAccess { long beginCompensatedBlock(ForkJoinPool pool); void endCompensatedBlock(ForkJoinPool pool, long post); + /** + * Like beginCompensatedBlock but for carrier threads that are about to + * block on a native OS-level monitor (objectMonitor::enter_internal) while + * pinned by a virtual thread. This variant: + *
    + *
  • Never takes the passive RC-only path (branch 2 of tryCompensate); + * it always activates an idle worker or creates a new spare.
  • + *
  • Decrements RC immediately so that signalWork can create a carrier + * for any continuation (e.g. the contested lock's holder) that is + * already in the queue, which would otherwise be starved because + * blocked carriers still count as active in RC.
  • + *
+ * The RC decrement is restored by endCompensatedBlock when the carrier + * unblocks. + */ + long beginMonitorCompensatedBlock(ForkJoinPool pool); } diff --git a/src/java.base/share/classes/jdk/internal/misc/CarrierThread.java b/src/java.base/share/classes/jdk/internal/misc/CarrierThread.java index b314b1823a56d..811477c247513 100644 --- a/src/java.base/share/classes/jdk/internal/misc/CarrierThread.java +++ b/src/java.base/share/classes/jdk/internal/misc/CarrierThread.java @@ -89,6 +89,47 @@ public boolean beginBlocking() { } } + /** + * Mark the start of a blocking operation where the carrier thread is about + * to block on a native OS-level monitor (objectMonitor::enter_internal) + * while pinned by a virtual thread. Unlike beginBlocking, this always + * activates an idle worker or creates a new spare carrier (never takes the + * passive RC-only path that creates no worker). endBlocking restores + * the compensateValue (1 or 2 RC_UNITs) when this carrier unblocks. + */ + public boolean beginMonitorBlock() { + assert Thread.currentThread().isVirtual() && JLA.currentCarrierThread() == this; + + // Guard against re-entrant calls. tryCompensateForMonitor may call + // createWorker() which can hit a nested monitor (e.g. ThreadGroup), + // re-entering objectMonitor::enter_with_contention_mark and thus this + // method. Without this check each nested call overwrites + // compensateValue, leaking an RC_UNIT per re-entrant compensation. + if (compensating != NOT_COMPENSATING) { + return false; + } + + // don't preempt when attempting to compensate + Continuation.pin(); + try { + compensating = COMPENSATE_IN_PROGRESS; + + // Uses FJP.tryCompensateForMonitor (branch 2 removed) – always + // activates an idle worker (1*RC_UNIT) or creates a new spare + // with RC-- (2*RC_UNIT to balance both the RC-- and the spare's + // eventual deactivation). + compensateValue = ForkJoinPools.beginMonitorCompensatedBlock(getPool()); + compensating = COMPENSATING; + return true; + } catch (Throwable e) { + // exception starting spare thread + compensating = NOT_COMPENSATING; + throw e; + } finally { + Continuation.unpin(); + } + } + /** * Mark the end of a blocking operation. */ @@ -133,6 +174,9 @@ static long beginCompensatedBlock(ForkJoinPool pool) { static void endCompensatedBlock(ForkJoinPool pool, long post) { FJP_ACCESS.endCompensatedBlock(pool, post); } + static long beginMonitorCompensatedBlock(ForkJoinPool pool) { + return FJP_ACCESS.beginMonitorCompensatedBlock(pool); + } } static {