Skip to content

Commit

Permalink
EW-8447 Fix CPU profiling harder
Browse files Browse the repository at this point in the history
My initial attempt at a fix made a subtle behavior change which I did not consider fully: instead of a new `incomingQueueNotifier` XThreadNotifier being constructed for every inspector connection, my fix caused all inspector connections to re-use one long-lived XThreadNotifier. This subsequently ran afoul of the fact that XThreadNotifier::awaitNotification() is not cancel-safe: if it is cancelled while awaiting its stored promise, calling awaitNotification() a second time tries to await a moved-from promise.

This commit restores the previous behavior of creating a new `incomingQueueNotifier` XThreadNotifier for every inspector connection. However, instead of constructing it in the WebSocketIoHandler constructor as before, I moved the construction to the `messageLoop()` function implementation, which also spawns the dispatch loop. This narrows the scope of access to the notifier to only those functions which actually need it, keeps our usage of the dispatch kj::Executor localized in one place, and avoids synchronously blocking the inspector thread, as we would have had to do if we constructed it in the WebSocketIoHandler constructor.

Fixes #2564.
  • Loading branch information
harrishancock committed Aug 21, 2024
1 parent 45f58cf commit cb01041
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 36 deletions.
49 changes: 32 additions & 17 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2312,9 +2312,9 @@ struct MessageQueue {
class Worker::Isolate::InspectorChannelImpl final: public v8_inspector::V8Inspector::Channel {
public:
InspectorChannelImpl(kj::Own<const Worker::Isolate> isolateParam,
ExecutorNotifierPair isolateThreadExecutorNotifierPair,
kj::Own<const kj::Executor> isolateThreadExecutor,
kj::WebSocket& webSocket)
: ioHandler(kj::mv(isolateThreadExecutorNotifierPair), webSocket),
: ioHandler(kj::mv(isolateThreadExecutor), webSocket),
state(kj::heap<State>(this, kj::mv(isolateParam))) {
ioHandler.connect(*this);
}
Expand Down Expand Up @@ -2563,10 +2563,8 @@ private:
// the InspectorChannelImpl and the InspectorClient.
class WebSocketIoHandler final {
public:
WebSocketIoHandler(
ExecutorNotifierPair isolateThreadExecutorNotifierPair, kj::WebSocket& webSocket)
: isolateThreadExecutor(kj::mv(isolateThreadExecutorNotifierPair.executor)),
incomingQueueNotifier(kj::mv(isolateThreadExecutorNotifierPair.notifier)),
WebSocketIoHandler(kj::Own<const kj::Executor> isolateThreadExecutor, kj::WebSocket& webSocket)
: isolateThreadExecutor(kj::mv(isolateThreadExecutor)),
webSocket(webSocket) {
// Assume we are being instantiated on the InspectorService thread, the thread that will do
// I/O for CDP messages. Messages are delivered to the InspectorChannelImpl on the Isolate thread.
Expand Down Expand Up @@ -2608,9 +2606,26 @@ private:
// internal Cloudflare Workers runtime, `isolateThreadExecutor` may actually refer to the
// current thread's `kj::Executor`. That's fine; calling `executeAsync()` on the current
// thread's executor just posts the task to the event loop, and everything works as expected.
auto dispatchLoopPromise =
isolateThreadExecutor->executeAsync([this]() { return dispatchLoop(); });
return receiveLoop().exclusiveJoin(kj::mv(dispatchLoopPromise)).exclusiveJoin(transmitLoop());

// Since the dispatch loop and the receive loop communicate over a XThreadNotifier, and
// XThreadNotifiers must be created on the thread which will call their `awaitNotification()`
// function, we awkwardly perform two `executeAsync()`s here, one to create the
// XThreadNotifier, then another to spawn the dispatch loop.
//
// We create a new XThreadNotifier for each `messagePump()` call, rather than try to re-use
// one long-term, because XThreadNotifiers' `awaitNotification()` function is not cancel-safe.
// That is, once its promise is cancelled, the notifier is broken.
auto incomingQueueNotifier =
co_await isolateThreadExecutor->executeAsync([]() { return XThreadNotifier::create(); });

auto dispatchLoopPromise = isolateThreadExecutor->executeAsync(
[this, notifier = kj::atomicAddRef(*incomingQueueNotifier)]() mutable {
return dispatchLoop(kj::mv(notifier));
});

co_return co_await receiveLoop(kj::mv(incomingQueueNotifier))
.exclusiveJoin(kj::mv(dispatchLoopPromise))
.exclusiveJoin(transmitLoop());
}

void send(kj::String message) {
Expand Down Expand Up @@ -2652,7 +2667,7 @@ private:
}

// Must be called on the InspectorService thread.
kj::Promise<void> receiveLoop() {
kj::Promise<void> receiveLoop(kj::Own<XThreadNotifier> incomingQueueNotifier) {
for (;;) {
auto message = co_await webSocket.receive(MAX_MESSAGE_SIZE);
KJ_SWITCH_ONEOF(message) {
Expand All @@ -2675,7 +2690,7 @@ private:
}

// Must be called on the Isolate thread.
kj::Promise<void> dispatchLoop() {
kj::Promise<void> dispatchLoop(kj::Own<XThreadNotifier> incomingQueueNotifier) {
for (;;) {
co_await incomingQueueNotifier->awaitNotification();
KJ_IF_SOME(c, channel) {
Expand Down Expand Up @@ -2717,8 +2732,8 @@ private:
kj::Own<const kj::Executor> isolateThreadExecutor;

kj::MutexGuarded<MessageQueue> incomingQueue;
// This XThreadNotifier must be created on the Isolate thread.
kj::Own<XThreadNotifier> incomingQueueNotifier;
// The notifier for `incomingQueue`, `incomingQueueNotifier`, is created once per
// `messagePump()` call, and never re-used, so it doesn't live here.

kj::MutexGuarded<MessageQueue> outgoingQueue;
// This XThreadNotifier must be created on the InspectorService thread.
Expand Down Expand Up @@ -2878,14 +2893,14 @@ kj::Promise<void> Worker::Isolate::attachInspector(kj::Timer& timer,
// This `attachInspector()` overload is used by the internal Cloudflare Workers runtime, which has
// no concept of a single Isolate thread. Instead, it's okay for all inspector messages to be
// dispatched on the calling thread.
auto executorNotifierPair = ExecutorNotifierPair{};
auto executor = kj::getCurrentThreadExecutor().addRef();

return attachInspector(kj::mv(executorNotifierPair), timer, timerOffset, *webSocket)
return attachInspector(kj::mv(executor), timer, timerOffset, *webSocket)
.attach(kj::mv(webSocket));
}

kj::Promise<void> Worker::Isolate::attachInspector(
ExecutorNotifierPair isolateThreadExecutorNotifierPair,
kj::Own<const kj::Executor> isolateThreadExecutor,
kj::Timer& timer,
kj::Duration timerOffset,
kj::WebSocket& webSocket) const {
Expand All @@ -2907,7 +2922,7 @@ kj::Promise<void> Worker::Isolate::attachInspector(
lockedSelf.impl->inspectorClient.setInspectorTimerInfo(timer, timerOffset);

auto channel = kj::heap<Worker::Isolate::InspectorChannelImpl>(
kj::atomicAddRef(*this), kj::mv(isolateThreadExecutorNotifierPair), webSocket);
kj::atomicAddRef(*this), kj::mv(isolateThreadExecutor), webSocket);
lockedSelf.currentInspectorSession = *channel;
lockedSelf.impl->inspectorClient.setChannel(*channel);

Expand Down
6 changes: 3 additions & 3 deletions src/workerd/io/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,9 @@ class Worker::Isolate: public kj::AtomicRefcounted {
// Accepts a connection to the V8 inspector and handles requests until the client disconnects.
//
// This overload will dispatch all inspector messages on the `kj::Executor` passed in via
// `isolateThreadExecutorNotifierPair`. For CPU profiling to work as expected, this `kj::Executor`
// must be associated with the same thread which executes the Worker's JavaScript.
kj::Promise<void> attachInspector(ExecutorNotifierPair isolateThreadExecutorNotifierPair,
// `isolateThreadExecutor`. For CPU profiling to work as expected, this `kj::Executor` must be
// associated with the same thread which executes the Worker's JavaScript.
kj::Promise<void> attachInspector(kj::Own<const kj::Executor> isolateThreadExecutor,
kj::Timer& timer,
kj::Duration timerOffset,
kj::WebSocket& webSocket) const;
Expand Down
30 changes: 14 additions & 16 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1230,11 +1230,11 @@ private:
// to define the inspector socket.
class Server::InspectorService final: public kj::HttpService, public kj::HttpServerErrorHandler {
public:
InspectorService(ExecutorNotifierPair isolateThreadExecutorNotifierPair,
InspectorService(kj::Own<const kj::Executor> isolateThreadExecutor,
kj::Timer& timer,
kj::HttpHeaderTable::Builder& headerTableBuilder,
InspectorServiceIsolateRegistrar& registrar)
: isolateThreadExecutorNotifierPair(kj::mv(isolateThreadExecutorNotifierPair)),
: isolateThreadExecutor(kj::mv(isolateThreadExecutor)),
timer(timer),
headerTable(headerTableBuilder.getFutureTable()),
server(timer, headerTable, *this, kj::HttpServerSettings{.errorHandler = *this}),
Expand Down Expand Up @@ -1291,7 +1291,7 @@ public:
kj::Duration timerOffset = 0 * kj::MILLISECONDS;
try {
co_return co_await ref->attachInspector(
isolateThreadExecutorNotifierPair.clone(), timer, timerOffset, *webSocket);
isolateThreadExecutor->addRef(), timer, timerOffset, *webSocket);
} catch (...) {
auto exception = kj::getCaughtExceptionAsKj();
if (exception.getType() == kj::Exception::Type::DISCONNECTED) {
Expand Down Expand Up @@ -1410,7 +1410,7 @@ public:
}

private:
ExecutorNotifierPair isolateThreadExecutorNotifierPair;
kj::Own<const kj::Executor> isolateThreadExecutor;
kj::Timer& timer;
kj::HttpHeaderTable& headerTable;
kj::HashMap<kj::String, kj::Own<const Worker::Isolate::WeakIsolateRef>> isolates;
Expand Down Expand Up @@ -3488,25 +3488,23 @@ uint startInspector(
// `startInspector()` is called on the Isolate thread. V8 requires CPU profiling to be started and
// stopped on the same thread which executes JavaScript -- that is, the Isolate thread -- which
// means we need to dispatch inspector messages on this thread. To help make that happen, we
// capture this thread's kj::Executor and create an XThreadNotifier tied to this thread here, and
// pass it into the InspectorService below. Later, when the InspectorService receives a WebSocket
// connection, it calls `Isolate::attachInspector()`, which starts a dispatch loop on the
// kj::Executor we create here. The InspectorService reads subsequent WebSocket inspector messages
// and feeds them to that dispatch loop via the XThreadNotifier we create here.
auto isolateThreadExecutorNotifierPair = ExecutorNotifierPair{};
// capture this thread's kj::Executor here, and pass it into the InspectorService below. Later,
// when the InspectorService receives a WebSocket connection, it calls
// `Isolate::attachInspector()`, which uses the kj::Executor we create here to create a
// XThreadNotifier and start a dispatch loop. The InspectorService reads subsequent WebSocket
// inspector messages and feeds them to that dispatch loop via the XThreadNotifier.
auto isolateThreadExecutor = kj::getCurrentThreadExecutor().addRef();

// Start the InspectorService thread.
kj::Thread thread(
[inspectorAddress, &inspectorPort, &registrar,
isolateThreadExecutorNotifierPair = kj::mv(isolateThreadExecutorNotifierPair)]() mutable {
kj::Thread thread([inspectorAddress, &inspectorPort, &registrar,
isolateThreadExecutor = kj::mv(isolateThreadExecutor)]() mutable {
kj::AsyncIoContext io = kj::setupAsyncIo();

kj::HttpHeaderTable::Builder headerTableBuilder;

// Create the special inspector service.
auto inspectorService(
kj::heap<Server::InspectorService>(kj::mv(isolateThreadExecutorNotifierPair),
io.provider->getTimer(), headerTableBuilder, registrar));
auto inspectorService(kj::heap<Server::InspectorService>(
kj::mv(isolateThreadExecutor), io.provider->getTimer(), headerTableBuilder, registrar));
auto ownHeaderTable = headerTableBuilder.build();

// Configure and start the inspector socket.
Expand Down

0 comments on commit cb01041

Please sign in to comment.