Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions src/hotspot/share/runtime/objectMonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
*
*/

#include "classfile/symbolTable.hpp"
#include "classfile/vmSymbols.hpp"
#include "gc/shared/oopStorage.hpp"
#include "gc/shared/oopStorageSet.hpp"
Expand All @@ -42,6 +43,7 @@
#include "runtime/globals.hpp"
#include "runtime/handles.inline.hpp"
#include "runtime/interfaceSupport.inline.hpp"
#include "runtime/javaCalls.hpp"
#include "runtime/javaThread.inline.hpp"
#include "runtime/mutexLocker.hpp"
#include "runtime/objectMonitor.inline.hpp"
Expand Down Expand Up @@ -536,6 +538,90 @@ void ObjectMonitor::notify_contended_enter(JavaThread* current, bool post_jvmti_
}
}

// Compensate the ForkJoinPool scheduler when a pinned virtual thread's
// carrier is about to block on a contended monitor. Without compensation
// the pool has no visibility that its worker is blocked, which can lead to
// carrier starvation / deadlock (JDK-8345294).
//
// This calls CarrierThread.beginMonitorBlock() which uses
// ForkJoinPool.tryCompensateForMonitor() to (a) activate an idle worker or
// create a replacement spare carrier, and (b) decrement RC so that signalWork
// can find a carrier for any queued continuation (e.g. the VT holding the
// contested lock) that would otherwise be starved because blocked carriers
// still count as active in the pool's RC field. The RC decrement is restored
// by endCompensatedBlock when this carrier unblocks.
//
// IMPORTANT: At this point notify_contended_enter() has already set
// _current_pending_monitor. The Java call below may itself trigger
// nested monitor contention (e.g., ThreadGroup.addUnstarted acquires a
// monitor when creating a spare worker thread). To avoid:
// (a) assertion failures (pending_monitor != nullptr on nested entry)
// (b) corruption of _current_pending_monitor for the outer monitor
// we save and clear the pending monitor before the Java call and restore
// it afterwards.
static bool compensate_pinned_carrier(JavaThread* current) {
oop carrier = current->threadObj();
if (carrier == nullptr) return false;

// Save and clear pending monitor so nested monitor enters work cleanly.
ObjectMonitor* saved_pending = current->current_pending_monitor();
current->set_current_pending_monitor(nullptr);

HandleMark hm(current);
Handle carrier_h(current, carrier);
Klass* klass = carrier_h->klass();

// CarrierThread.beginMonitorBlock() — compensates the ForkJoinPool via
// tryCompensateForMonitor which always activates/creates a real spare
// carrier (branch 2 of tryCompensate is omitted for this path).
// Not every carrier thread is a CarrierThread (custom schedulers may
// use plain threads), so we look up the method and silently return
// false if it is not found.
TempNewSymbol begin_name = SymbolTable::new_symbol("beginMonitorBlock");
TempNewSymbol begin_sig = SymbolTable::new_symbol("()Z");

JavaValue result(T_BOOLEAN);
JavaCalls::call_virtual(&result, carrier_h, klass,
begin_name, begin_sig, current);

bool compensated = false;
if (current->has_pending_exception()) {
current->clear_pending_exception();
} else {
compensated = result.get_jboolean();
}

// Restore pending monitor for the outer monitor enter.
current->set_current_pending_monitor(saved_pending);
return compensated;
}

static void end_compensate_pinned_carrier(JavaThread* current) {
oop carrier = current->threadObj();
if (carrier == nullptr) return;

// Save and clear pending monitor (should be nullptr here, but be safe).
ObjectMonitor* saved_pending = current->current_pending_monitor();
current->set_current_pending_monitor(nullptr);

HandleMark hm(current);
Handle carrier_h(current, carrier);
Klass* klass = carrier_h->klass();

TempNewSymbol end_name = SymbolTable::new_symbol("endBlocking");
TempNewSymbol end_sig = vmSymbols::void_method_signature();

JavaValue result(T_VOID);
JavaCalls::call_virtual(&result, carrier_h, klass,
end_name, end_sig, current);

if (current->has_pending_exception()) {
current->clear_pending_exception();
}

current->set_current_pending_monitor(saved_pending);
}

void ObjectMonitor::enter_with_contention_mark(JavaThread* current, ObjectMonitorContentionMark &cm, bool post_jvmti_events) {
assert(current == JavaThread::current(), "must be");
assert(!has_owner(current), "must be");
Expand Down Expand Up @@ -583,6 +669,15 @@ void ObjectMonitor::enter_with_contention_mark(JavaThread* current, ObjectMonito
}
}

// Compensate the ForkJoinPool before the carrier blocks so that the
// scheduler can spin up a replacement worker thread. This prevents
// deadlocks where every carrier is pinned and waiting for a monitor
// held by an unmounted virtual thread that can never get a carrier.
bool compensated = false;
if (is_virtual) {
compensated = compensate_pinned_carrier(current);
}

{
// Change java thread status to indicate blocked on monitor enter.
JavaThreadBlockedOnMonitorEnterState jtbmes(current, this);
Expand Down Expand Up @@ -620,6 +715,11 @@ void ObjectMonitor::enter_with_contention_mark(JavaThread* current, ObjectMonito
// the monitor free and clear.
}

// End compensation now that the carrier is no longer blocked.
if (compensated) {
end_compensate_pinned_carrier(current);
}

assert(contentions() >= 0, "must not be negative: contentions=%d", contentions());

// Must either set _recursions = 0 or ASSERT _recursions == 0.
Expand Down
111 changes: 108 additions & 3 deletions src/java.base/share/classes/java/util/concurrent/ForkJoinPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -4329,14 +4329,114 @@ final long beginCompensatedBlock() {
return (c == 0) ? 0L : RC_UNIT;
}

/**
* Variant of tryCompensate for use when a carrier thread is about to block
* on a native OS-level monitor (objectMonitor::enter_internal) while pinned
* by a virtual thread.
*
* <p>Unlike tryCompensate, this method omits the passive RC-only path
* (branch 2 of tryCompensate), so the blocking carrier always results in
* either an idle worker being activated (branch 1) or a new spare being
* created (branch 3).
*
* <p>Both branches decrement RC so that {@code signalWork} (triggered by
* subsequent VT task submissions) sees the pool as under-active and can
* dispatch the queued VT continuations. Without the RC decrement in
* branch 1, monitor-blocked carriers are counted as "active" by
* {@code signalWork}'s {@code (short)(c >>> RC_SHIFT) >= pc} check,
* preventing it from waking idle workers when tasks are queued.
*
* <p>The RC decrement in both branches requires {@code endCompensatedBlock}
* to restore <em>two</em> RC_UNITs: one for the decrement here and one for
* the compensator's eventual deactivation (RC--). Both branches return
* {@code 2} so that {@code beginMonitorCompensatedBlock} passes
* {@code 2 * RC_UNIT} to {@code endCompensatedBlock}:
* <pre>
* branch CAS: RC-- (-1)
* compensator deactivates: RC-- (-1)
* endCompensatedBlock: (+2)
* net: 0
* </pre>
*/
private int tryCompensateForMonitor(long c) {
Predicate<? super ForkJoinPool> sat;
long b = config;
int pc = parallelism,
maxTotal = (short)(b >>> TC_SHIFT) + pc,
total = (short)(c >>> TC_SHIFT),
sp = (int)c,
stat = -1;
if (sp != 0) { // activate idle worker (branch 1)
// RC-- so that signalWork sees the pool as under-active while
// the carrier is blocked on the monitor. Without this, RC
// counts monitor-blocked carriers as "active", preventing
// signalWork from waking idle workers. Return stat=2 so
// that beginMonitorCompensatedBlock passes 2*RC_UNIT to
// endCompensatedBlock, balancing both the RC-- here and the
// compensator's eventual deactivation (RC--).
WorkQueue[] qs; WorkQueue v; int i;
if ((qs = queues) != null && qs.length > (i = sp & SMASK) &&
(v = qs[i]) != null &&
compareAndSetCtl(c, ((c - RC_UNIT) & UMASK) |
(v.stackPred & LMASK))) {
v.phase = sp;
if (v.parking != 0)
U.unpark(v.owner);
stat = 2; // need 2 * RC_UNIT at unblock
}
}
// Branch 2 (passive RC-only decrement) is intentionally absent.
else if (total < maxTotal && total < MAX_CAP) { // create spare (branch 3)
// TC++ and RC-- so that signalWork sees the pool as under-active.
// Return stat=2 so that beginMonitorCompensatedBlock passes
// 2*RC_UNIT to endCompensatedBlock, balancing both the RC--
// here and the spare's eventual deactivation (RC--).
long nc = ((c + TC_UNIT) & TC_MASK) |
((c - RC_UNIT) & RC_MASK) |
(c & LMASK);
if ((runState & STOP) != 0L)
stat = 0;
else if (compareAndSetCtl(c, nc)) {
createWorker();
stat = 2; // need 2 * RC_UNIT at unblock
}
}
else if (!compareAndSetCtl(c, c)) // validate
;
else if ((sat = saturate) != null && sat.test(this))
stat = 0;
else
throw new RejectedExecutionException(
"Thread limit exceeded replacing blocked worker");
return stat;
}

/**
* Like beginCompensatedBlock but for carrier threads that are about to
* block on a native OS-level monitor (objectMonitor::enter_internal) while
* pinned by a virtual thread. Uses tryCompensateForMonitor which always
* activates or creates a real replacement carrier (never passive RC-only).
* <p>tryCompensateForMonitor returns:
* <ul>
* <li>{@code 2} — branch 1 (activated idle worker with RC--), need 2 × RC_UNIT</li>
* <li>{@code 2} — branch 3 (created spare with RC--), need 2 × RC_UNIT</li>
* <li>{@code 0} — saturated/stopping, no restoration needed</li>
* </ul>
* @return value to pass to endCompensatedBlock
*/
final long beginMonitorCompensatedBlock() {
int c;
do {} while ((c = tryCompensateForMonitor(ctl)) < 0);
return (c == 0) ? 0L : (long)c * RC_UNIT;
}

/**
* Re-adjusts parallelism after a blocking operation completes.
* @param post value from beginCompensatedBlock
* @param post value from beginCompensatedBlock or beginMonitorCompensatedBlock
*/
void endCompensatedBlock(long post) {
if (post > 0L) {
if (post > 0L)
getAndAddCtl(post);
}
}

/** ManagedBlock for external threads */
Expand Down Expand Up @@ -4393,9 +4493,14 @@ protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
public long beginCompensatedBlock(ForkJoinPool pool) {
return pool.beginCompensatedBlock();
}
@Override
public void endCompensatedBlock(ForkJoinPool pool, long post) {
pool.endCompensatedBlock(post);
}
@Override
public long beginMonitorCompensatedBlock(ForkJoinPool pool) {
return pool.beginMonitorCompensatedBlock();
}
});
defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,20 @@
public interface JavaUtilConcurrentFJPAccess {
long beginCompensatedBlock(ForkJoinPool pool);
void endCompensatedBlock(ForkJoinPool pool, long post);
/**
* Like beginCompensatedBlock but for carrier threads that are about to
* block on a native OS-level monitor (objectMonitor::enter_internal) while
* pinned by a virtual thread. This variant:
* <ul>
* <li>Never takes the passive RC-only path (branch 2 of tryCompensate);
* it always activates an idle worker or creates a new spare.</li>
* <li>Decrements RC immediately so that signalWork can create a carrier
* for any continuation (e.g. the contested lock's holder) that is
* already in the queue, which would otherwise be starved because
* blocked carriers still count as active in RC.</li>
* </ul>
* The RC decrement is restored by endCompensatedBlock when the carrier
* unblocks.
*/
long beginMonitorCompensatedBlock(ForkJoinPool pool);
}
44 changes: 44 additions & 0 deletions src/java.base/share/classes/jdk/internal/misc/CarrierThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,47 @@ public boolean beginBlocking() {
}
}

/**
* Mark the start of a blocking operation where the carrier thread is about
* to block on a native OS-level monitor (objectMonitor::enter_internal)
* while pinned by a virtual thread. Unlike beginBlocking, this always
* activates an idle worker or creates a new spare carrier (never takes the
* passive RC-only path that creates no worker). endBlocking restores
* the compensateValue (1 or 2 RC_UNITs) when this carrier unblocks.
*/
public boolean beginMonitorBlock() {
assert Thread.currentThread().isVirtual() && JLA.currentCarrierThread() == this;

// Guard against re-entrant calls. tryCompensateForMonitor may call
// createWorker() which can hit a nested monitor (e.g. ThreadGroup),
// re-entering objectMonitor::enter_with_contention_mark and thus this
// method. Without this check each nested call overwrites
// compensateValue, leaking an RC_UNIT per re-entrant compensation.
if (compensating != NOT_COMPENSATING) {
return false;
}

// don't preempt when attempting to compensate
Continuation.pin();
try {
compensating = COMPENSATE_IN_PROGRESS;

// Uses FJP.tryCompensateForMonitor (branch 2 removed) – always
// activates an idle worker (1*RC_UNIT) or creates a new spare
// with RC-- (2*RC_UNIT to balance both the RC-- and the spare's
// eventual deactivation).
compensateValue = ForkJoinPools.beginMonitorCompensatedBlock(getPool());
compensating = COMPENSATING;
return true;
} catch (Throwable e) {
// exception starting spare thread
compensating = NOT_COMPENSATING;
throw e;
} finally {
Continuation.unpin();
}
}

/**
* Mark the end of a blocking operation.
*/
Expand Down Expand Up @@ -133,6 +174,9 @@ static long beginCompensatedBlock(ForkJoinPool pool) {
static void endCompensatedBlock(ForkJoinPool pool, long post) {
FJP_ACCESS.endCompensatedBlock(pool, post);
}
static long beginMonitorCompensatedBlock(ForkJoinPool pool) {
return FJP_ACCESS.beginMonitorCompensatedBlock(pool);
}
}

static {
Expand Down
Loading