From cb01041d0e3549c4095e18a7ab62adc718b16216 Mon Sep 17 00:00:00 2001 From: Harris Hancock Date: Wed, 21 Aug 2024 13:47:51 +0100 Subject: [PATCH] EW-8447 Fix CPU profiling harder My initial attempt at a fix made a subtle behavior change which I did not consider fully: instead of a new `incomingQueueNotifier` XThreadNotifier being constructed for every inspector connection, my fix caused all inspector connections to re-use one long-lived XThreadNotifier. This subsequently ran afoul of the fact that XThreadNotifier::awaitNotification() is not cancel-safe: if it is cancelled while awaiting its stored promise, calling awaitNotification() a second time tries to await a moved-from promise. This commit restores the previous behavior of creating a new `incomingQueueNotifier` XThreadNotifier for every inspector connection. However, instead of constructing it in the WebSocketIoHandler constructor as before, I moved the construction to the `messageLoop()` function implementation, which also spawns the dispatch loop. This narrows the scope of access to the notifier to only those functions which actually need it, keeps our usage of the dispatch kj::Executor localized in one place, and avoids synchronously blocking the inspector thread, as we would have had to do if we constructed it in the WebSocketIoHandler constructor. Fixes #2564. --- src/workerd/io/worker.c++ | 49 +++++++++++++++++++++++------------ src/workerd/io/worker.h | 6 ++--- src/workerd/server/server.c++ | 30 ++++++++++----------- 3 files changed, 49 insertions(+), 36 deletions(-) diff --git a/src/workerd/io/worker.c++ b/src/workerd/io/worker.c++ index 34e96ce980a..efc4d99c6d8 100644 --- a/src/workerd/io/worker.c++ +++ b/src/workerd/io/worker.c++ @@ -2312,9 +2312,9 @@ struct MessageQueue { class Worker::Isolate::InspectorChannelImpl final: public v8_inspector::V8Inspector::Channel { public: InspectorChannelImpl(kj::Own isolateParam, - ExecutorNotifierPair isolateThreadExecutorNotifierPair, + kj::Own isolateThreadExecutor, kj::WebSocket& webSocket) - : ioHandler(kj::mv(isolateThreadExecutorNotifierPair), webSocket), + : ioHandler(kj::mv(isolateThreadExecutor), webSocket), state(kj::heap(this, kj::mv(isolateParam))) { ioHandler.connect(*this); } @@ -2563,10 +2563,8 @@ private: // the InspectorChannelImpl and the InspectorClient. class WebSocketIoHandler final { public: - WebSocketIoHandler( - ExecutorNotifierPair isolateThreadExecutorNotifierPair, kj::WebSocket& webSocket) - : isolateThreadExecutor(kj::mv(isolateThreadExecutorNotifierPair.executor)), - incomingQueueNotifier(kj::mv(isolateThreadExecutorNotifierPair.notifier)), + WebSocketIoHandler(kj::Own isolateThreadExecutor, kj::WebSocket& webSocket) + : isolateThreadExecutor(kj::mv(isolateThreadExecutor)), webSocket(webSocket) { // Assume we are being instantiated on the InspectorService thread, the thread that will do // I/O for CDP messages. Messages are delivered to the InspectorChannelImpl on the Isolate thread. @@ -2608,9 +2606,26 @@ private: // internal Cloudflare Workers runtime, `isolateThreadExecutor` may actually refer to the // current thread's `kj::Executor`. That's fine; calling `executeAsync()` on the current // thread's executor just posts the task to the event loop, and everything works as expected. - auto dispatchLoopPromise = - isolateThreadExecutor->executeAsync([this]() { return dispatchLoop(); }); - return receiveLoop().exclusiveJoin(kj::mv(dispatchLoopPromise)).exclusiveJoin(transmitLoop()); + + // Since the dispatch loop and the receive loop communicate over a XThreadNotifier, and + // XThreadNotifiers must be created on the thread which will call their `awaitNotification()` + // function, we awkwardly perform two `executeAsync()`s here, one to create the + // XThreadNotifier, then another to spawn the dispatch loop. + // + // We create a new XThreadNotifier for each `messagePump()` call, rather than try to re-use + // one long-term, because XThreadNotifiers' `awaitNotification()` function is not cancel-safe. + // That is, once its promise is cancelled, the notifier is broken. + auto incomingQueueNotifier = + co_await isolateThreadExecutor->executeAsync([]() { return XThreadNotifier::create(); }); + + auto dispatchLoopPromise = isolateThreadExecutor->executeAsync( + [this, notifier = kj::atomicAddRef(*incomingQueueNotifier)]() mutable { + return dispatchLoop(kj::mv(notifier)); + }); + + co_return co_await receiveLoop(kj::mv(incomingQueueNotifier)) + .exclusiveJoin(kj::mv(dispatchLoopPromise)) + .exclusiveJoin(transmitLoop()); } void send(kj::String message) { @@ -2652,7 +2667,7 @@ private: } // Must be called on the InspectorService thread. - kj::Promise receiveLoop() { + kj::Promise receiveLoop(kj::Own incomingQueueNotifier) { for (;;) { auto message = co_await webSocket.receive(MAX_MESSAGE_SIZE); KJ_SWITCH_ONEOF(message) { @@ -2675,7 +2690,7 @@ private: } // Must be called on the Isolate thread. - kj::Promise dispatchLoop() { + kj::Promise dispatchLoop(kj::Own incomingQueueNotifier) { for (;;) { co_await incomingQueueNotifier->awaitNotification(); KJ_IF_SOME(c, channel) { @@ -2717,8 +2732,8 @@ private: kj::Own isolateThreadExecutor; kj::MutexGuarded incomingQueue; - // This XThreadNotifier must be created on the Isolate thread. - kj::Own incomingQueueNotifier; + // The notifier for `incomingQueue`, `incomingQueueNotifier`, is created once per + // `messagePump()` call, and never re-used, so it doesn't live here. kj::MutexGuarded outgoingQueue; // This XThreadNotifier must be created on the InspectorService thread. @@ -2878,14 +2893,14 @@ kj::Promise Worker::Isolate::attachInspector(kj::Timer& timer, // This `attachInspector()` overload is used by the internal Cloudflare Workers runtime, which has // no concept of a single Isolate thread. Instead, it's okay for all inspector messages to be // dispatched on the calling thread. - auto executorNotifierPair = ExecutorNotifierPair{}; + auto executor = kj::getCurrentThreadExecutor().addRef(); - return attachInspector(kj::mv(executorNotifierPair), timer, timerOffset, *webSocket) + return attachInspector(kj::mv(executor), timer, timerOffset, *webSocket) .attach(kj::mv(webSocket)); } kj::Promise Worker::Isolate::attachInspector( - ExecutorNotifierPair isolateThreadExecutorNotifierPair, + kj::Own isolateThreadExecutor, kj::Timer& timer, kj::Duration timerOffset, kj::WebSocket& webSocket) const { @@ -2907,7 +2922,7 @@ kj::Promise Worker::Isolate::attachInspector( lockedSelf.impl->inspectorClient.setInspectorTimerInfo(timer, timerOffset); auto channel = kj::heap( - kj::atomicAddRef(*this), kj::mv(isolateThreadExecutorNotifierPair), webSocket); + kj::atomicAddRef(*this), kj::mv(isolateThreadExecutor), webSocket); lockedSelf.currentInspectorSession = *channel; lockedSelf.impl->inspectorClient.setChannel(*channel); diff --git a/src/workerd/io/worker.h b/src/workerd/io/worker.h index 9dd545b0d3d..2eddc4ed941 100644 --- a/src/workerd/io/worker.h +++ b/src/workerd/io/worker.h @@ -335,9 +335,9 @@ class Worker::Isolate: public kj::AtomicRefcounted { // Accepts a connection to the V8 inspector and handles requests until the client disconnects. // // This overload will dispatch all inspector messages on the `kj::Executor` passed in via - // `isolateThreadExecutorNotifierPair`. For CPU profiling to work as expected, this `kj::Executor` - // must be associated with the same thread which executes the Worker's JavaScript. - kj::Promise attachInspector(ExecutorNotifierPair isolateThreadExecutorNotifierPair, + // `isolateThreadExecutor`. For CPU profiling to work as expected, this `kj::Executor` must be + // associated with the same thread which executes the Worker's JavaScript. + kj::Promise attachInspector(kj::Own isolateThreadExecutor, kj::Timer& timer, kj::Duration timerOffset, kj::WebSocket& webSocket) const; diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 30fe284f501..98f4431d6f0 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -1230,11 +1230,11 @@ private: // to define the inspector socket. class Server::InspectorService final: public kj::HttpService, public kj::HttpServerErrorHandler { public: - InspectorService(ExecutorNotifierPair isolateThreadExecutorNotifierPair, + InspectorService(kj::Own isolateThreadExecutor, kj::Timer& timer, kj::HttpHeaderTable::Builder& headerTableBuilder, InspectorServiceIsolateRegistrar& registrar) - : isolateThreadExecutorNotifierPair(kj::mv(isolateThreadExecutorNotifierPair)), + : isolateThreadExecutor(kj::mv(isolateThreadExecutor)), timer(timer), headerTable(headerTableBuilder.getFutureTable()), server(timer, headerTable, *this, kj::HttpServerSettings{.errorHandler = *this}), @@ -1291,7 +1291,7 @@ public: kj::Duration timerOffset = 0 * kj::MILLISECONDS; try { co_return co_await ref->attachInspector( - isolateThreadExecutorNotifierPair.clone(), timer, timerOffset, *webSocket); + isolateThreadExecutor->addRef(), timer, timerOffset, *webSocket); } catch (...) { auto exception = kj::getCaughtExceptionAsKj(); if (exception.getType() == kj::Exception::Type::DISCONNECTED) { @@ -1410,7 +1410,7 @@ public: } private: - ExecutorNotifierPair isolateThreadExecutorNotifierPair; + kj::Own isolateThreadExecutor; kj::Timer& timer; kj::HttpHeaderTable& headerTable; kj::HashMap> isolates; @@ -3488,25 +3488,23 @@ uint startInspector( // `startInspector()` is called on the Isolate thread. V8 requires CPU profiling to be started and // stopped on the same thread which executes JavaScript -- that is, the Isolate thread -- which // means we need to dispatch inspector messages on this thread. To help make that happen, we - // capture this thread's kj::Executor and create an XThreadNotifier tied to this thread here, and - // pass it into the InspectorService below. Later, when the InspectorService receives a WebSocket - // connection, it calls `Isolate::attachInspector()`, which starts a dispatch loop on the - // kj::Executor we create here. The InspectorService reads subsequent WebSocket inspector messages - // and feeds them to that dispatch loop via the XThreadNotifier we create here. - auto isolateThreadExecutorNotifierPair = ExecutorNotifierPair{}; + // capture this thread's kj::Executor here, and pass it into the InspectorService below. Later, + // when the InspectorService receives a WebSocket connection, it calls + // `Isolate::attachInspector()`, which uses the kj::Executor we create here to create a + // XThreadNotifier and start a dispatch loop. The InspectorService reads subsequent WebSocket + // inspector messages and feeds them to that dispatch loop via the XThreadNotifier. + auto isolateThreadExecutor = kj::getCurrentThreadExecutor().addRef(); // Start the InspectorService thread. - kj::Thread thread( - [inspectorAddress, &inspectorPort, ®istrar, - isolateThreadExecutorNotifierPair = kj::mv(isolateThreadExecutorNotifierPair)]() mutable { + kj::Thread thread([inspectorAddress, &inspectorPort, ®istrar, + isolateThreadExecutor = kj::mv(isolateThreadExecutor)]() mutable { kj::AsyncIoContext io = kj::setupAsyncIo(); kj::HttpHeaderTable::Builder headerTableBuilder; // Create the special inspector service. - auto inspectorService( - kj::heap(kj::mv(isolateThreadExecutorNotifierPair), - io.provider->getTimer(), headerTableBuilder, registrar)); + auto inspectorService(kj::heap( + kj::mv(isolateThreadExecutor), io.provider->getTimer(), headerTableBuilder, registrar)); auto ownHeaderTable = headerTableBuilder.build(); // Configure and start the inspector socket.