diff --git a/src/workerd/io/worker.c++ b/src/workerd/io/worker.c++ index 34e96ce980ab..efc4d99c6d81 100644 --- a/src/workerd/io/worker.c++ +++ b/src/workerd/io/worker.c++ @@ -2312,9 +2312,9 @@ struct MessageQueue { class Worker::Isolate::InspectorChannelImpl final: public v8_inspector::V8Inspector::Channel { public: InspectorChannelImpl(kj::Own isolateParam, - ExecutorNotifierPair isolateThreadExecutorNotifierPair, + kj::Own isolateThreadExecutor, kj::WebSocket& webSocket) - : ioHandler(kj::mv(isolateThreadExecutorNotifierPair), webSocket), + : ioHandler(kj::mv(isolateThreadExecutor), webSocket), state(kj::heap(this, kj::mv(isolateParam))) { ioHandler.connect(*this); } @@ -2563,10 +2563,8 @@ private: // the InspectorChannelImpl and the InspectorClient. class WebSocketIoHandler final { public: - WebSocketIoHandler( - ExecutorNotifierPair isolateThreadExecutorNotifierPair, kj::WebSocket& webSocket) - : isolateThreadExecutor(kj::mv(isolateThreadExecutorNotifierPair.executor)), - incomingQueueNotifier(kj::mv(isolateThreadExecutorNotifierPair.notifier)), + WebSocketIoHandler(kj::Own isolateThreadExecutor, kj::WebSocket& webSocket) + : isolateThreadExecutor(kj::mv(isolateThreadExecutor)), webSocket(webSocket) { // Assume we are being instantiated on the InspectorService thread, the thread that will do // I/O for CDP messages. Messages are delivered to the InspectorChannelImpl on the Isolate thread. @@ -2608,9 +2606,26 @@ private: // internal Cloudflare Workers runtime, `isolateThreadExecutor` may actually refer to the // current thread's `kj::Executor`. That's fine; calling `executeAsync()` on the current // thread's executor just posts the task to the event loop, and everything works as expected. - auto dispatchLoopPromise = - isolateThreadExecutor->executeAsync([this]() { return dispatchLoop(); }); - return receiveLoop().exclusiveJoin(kj::mv(dispatchLoopPromise)).exclusiveJoin(transmitLoop()); + + // Since the dispatch loop and the receive loop communicate over a XThreadNotifier, and + // XThreadNotifiers must be created on the thread which will call their `awaitNotification()` + // function, we awkwardly perform two `executeAsync()`s here, one to create the + // XThreadNotifier, then another to spawn the dispatch loop. + // + // We create a new XThreadNotifier for each `messagePump()` call, rather than try to re-use + // one long-term, because XThreadNotifiers' `awaitNotification()` function is not cancel-safe. + // That is, once its promise is cancelled, the notifier is broken. + auto incomingQueueNotifier = + co_await isolateThreadExecutor->executeAsync([]() { return XThreadNotifier::create(); }); + + auto dispatchLoopPromise = isolateThreadExecutor->executeAsync( + [this, notifier = kj::atomicAddRef(*incomingQueueNotifier)]() mutable { + return dispatchLoop(kj::mv(notifier)); + }); + + co_return co_await receiveLoop(kj::mv(incomingQueueNotifier)) + .exclusiveJoin(kj::mv(dispatchLoopPromise)) + .exclusiveJoin(transmitLoop()); } void send(kj::String message) { @@ -2652,7 +2667,7 @@ private: } // Must be called on the InspectorService thread. - kj::Promise receiveLoop() { + kj::Promise receiveLoop(kj::Own incomingQueueNotifier) { for (;;) { auto message = co_await webSocket.receive(MAX_MESSAGE_SIZE); KJ_SWITCH_ONEOF(message) { @@ -2675,7 +2690,7 @@ private: } // Must be called on the Isolate thread. - kj::Promise dispatchLoop() { + kj::Promise dispatchLoop(kj::Own incomingQueueNotifier) { for (;;) { co_await incomingQueueNotifier->awaitNotification(); KJ_IF_SOME(c, channel) { @@ -2717,8 +2732,8 @@ private: kj::Own isolateThreadExecutor; kj::MutexGuarded incomingQueue; - // This XThreadNotifier must be created on the Isolate thread. - kj::Own incomingQueueNotifier; + // The notifier for `incomingQueue`, `incomingQueueNotifier`, is created once per + // `messagePump()` call, and never re-used, so it doesn't live here. kj::MutexGuarded outgoingQueue; // This XThreadNotifier must be created on the InspectorService thread. @@ -2878,14 +2893,14 @@ kj::Promise Worker::Isolate::attachInspector(kj::Timer& timer, // This `attachInspector()` overload is used by the internal Cloudflare Workers runtime, which has // no concept of a single Isolate thread. Instead, it's okay for all inspector messages to be // dispatched on the calling thread. - auto executorNotifierPair = ExecutorNotifierPair{}; + auto executor = kj::getCurrentThreadExecutor().addRef(); - return attachInspector(kj::mv(executorNotifierPair), timer, timerOffset, *webSocket) + return attachInspector(kj::mv(executor), timer, timerOffset, *webSocket) .attach(kj::mv(webSocket)); } kj::Promise Worker::Isolate::attachInspector( - ExecutorNotifierPair isolateThreadExecutorNotifierPair, + kj::Own isolateThreadExecutor, kj::Timer& timer, kj::Duration timerOffset, kj::WebSocket& webSocket) const { @@ -2907,7 +2922,7 @@ kj::Promise Worker::Isolate::attachInspector( lockedSelf.impl->inspectorClient.setInspectorTimerInfo(timer, timerOffset); auto channel = kj::heap( - kj::atomicAddRef(*this), kj::mv(isolateThreadExecutorNotifierPair), webSocket); + kj::atomicAddRef(*this), kj::mv(isolateThreadExecutor), webSocket); lockedSelf.currentInspectorSession = *channel; lockedSelf.impl->inspectorClient.setChannel(*channel); diff --git a/src/workerd/io/worker.h b/src/workerd/io/worker.h index 9dd545b0d3d5..2eddc4ed9415 100644 --- a/src/workerd/io/worker.h +++ b/src/workerd/io/worker.h @@ -335,9 +335,9 @@ class Worker::Isolate: public kj::AtomicRefcounted { // Accepts a connection to the V8 inspector and handles requests until the client disconnects. // // This overload will dispatch all inspector messages on the `kj::Executor` passed in via - // `isolateThreadExecutorNotifierPair`. For CPU profiling to work as expected, this `kj::Executor` - // must be associated with the same thread which executes the Worker's JavaScript. - kj::Promise attachInspector(ExecutorNotifierPair isolateThreadExecutorNotifierPair, + // `isolateThreadExecutor`. For CPU profiling to work as expected, this `kj::Executor` must be + // associated with the same thread which executes the Worker's JavaScript. + kj::Promise attachInspector(kj::Own isolateThreadExecutor, kj::Timer& timer, kj::Duration timerOffset, kj::WebSocket& webSocket) const; diff --git a/src/workerd/server/server.c++ b/src/workerd/server/server.c++ index 30fe284f501a..98f4431d6f01 100644 --- a/src/workerd/server/server.c++ +++ b/src/workerd/server/server.c++ @@ -1230,11 +1230,11 @@ private: // to define the inspector socket. class Server::InspectorService final: public kj::HttpService, public kj::HttpServerErrorHandler { public: - InspectorService(ExecutorNotifierPair isolateThreadExecutorNotifierPair, + InspectorService(kj::Own isolateThreadExecutor, kj::Timer& timer, kj::HttpHeaderTable::Builder& headerTableBuilder, InspectorServiceIsolateRegistrar& registrar) - : isolateThreadExecutorNotifierPair(kj::mv(isolateThreadExecutorNotifierPair)), + : isolateThreadExecutor(kj::mv(isolateThreadExecutor)), timer(timer), headerTable(headerTableBuilder.getFutureTable()), server(timer, headerTable, *this, kj::HttpServerSettings{.errorHandler = *this}), @@ -1291,7 +1291,7 @@ public: kj::Duration timerOffset = 0 * kj::MILLISECONDS; try { co_return co_await ref->attachInspector( - isolateThreadExecutorNotifierPair.clone(), timer, timerOffset, *webSocket); + isolateThreadExecutor->addRef(), timer, timerOffset, *webSocket); } catch (...) { auto exception = kj::getCaughtExceptionAsKj(); if (exception.getType() == kj::Exception::Type::DISCONNECTED) { @@ -1410,7 +1410,7 @@ public: } private: - ExecutorNotifierPair isolateThreadExecutorNotifierPair; + kj::Own isolateThreadExecutor; kj::Timer& timer; kj::HttpHeaderTable& headerTable; kj::HashMap> isolates; @@ -3488,25 +3488,23 @@ uint startInspector( // `startInspector()` is called on the Isolate thread. V8 requires CPU profiling to be started and // stopped on the same thread which executes JavaScript -- that is, the Isolate thread -- which // means we need to dispatch inspector messages on this thread. To help make that happen, we - // capture this thread's kj::Executor and create an XThreadNotifier tied to this thread here, and - // pass it into the InspectorService below. Later, when the InspectorService receives a WebSocket - // connection, it calls `Isolate::attachInspector()`, which starts a dispatch loop on the - // kj::Executor we create here. The InspectorService reads subsequent WebSocket inspector messages - // and feeds them to that dispatch loop via the XThreadNotifier we create here. - auto isolateThreadExecutorNotifierPair = ExecutorNotifierPair{}; + // capture this thread's kj::Executor here, and pass it into the InspectorService below. Later, + // when the InspectorService receives a WebSocket connection, it calls + // `Isolate::attachInspector()`, which uses the kj::Executor we create here to create a + // XThreadNotifier and start a dispatch loop. The InspectorService reads subsequent WebSocket + // inspector messages and feeds them to that dispatch loop via the XThreadNotifier. + auto isolateThreadExecutor = kj::getCurrentThreadExecutor().addRef(); // Start the InspectorService thread. - kj::Thread thread( - [inspectorAddress, &inspectorPort, ®istrar, - isolateThreadExecutorNotifierPair = kj::mv(isolateThreadExecutorNotifierPair)]() mutable { + kj::Thread thread([inspectorAddress, &inspectorPort, ®istrar, + isolateThreadExecutor = kj::mv(isolateThreadExecutor)]() mutable { kj::AsyncIoContext io = kj::setupAsyncIo(); kj::HttpHeaderTable::Builder headerTableBuilder; // Create the special inspector service. - auto inspectorService( - kj::heap(kj::mv(isolateThreadExecutorNotifierPair), - io.provider->getTimer(), headerTableBuilder, registrar)); + auto inspectorService(kj::heap( + kj::mv(isolateThreadExecutor), io.provider->getTimer(), headerTableBuilder, registrar)); auto ownHeaderTable = headerTableBuilder.build(); // Configure and start the inspector socket.