Skip to content

Commit

Permalink
Merge pull request #2571 from cloudflare/harris/EW-8447-fix-cpu-profi…
Browse files Browse the repository at this point in the history
…ling-harder

EW-8447 Fix CPU profiling again
  • Loading branch information
harrishancock authored Aug 21, 2024
2 parents 8cf4628 + a992145 commit 7e856d7
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 79 deletions.
73 changes: 61 additions & 12 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2311,8 +2311,10 @@ struct MessageQueue {

class Worker::Isolate::InspectorChannelImpl final: public v8_inspector::V8Inspector::Channel {
public:
InspectorChannelImpl(kj::Own<const Worker::Isolate> isolateParam, kj::WebSocket& webSocket)
: ioHandler(webSocket),
InspectorChannelImpl(kj::Own<const Worker::Isolate> isolateParam,
kj::Own<const kj::Executor> isolateThreadExecutor,
kj::WebSocket& webSocket)
: ioHandler(kj::mv(isolateThreadExecutor), webSocket),
state(kj::heap<State>(this, kj::mv(isolateParam))) {
ioHandler.connect(*this);
}
Expand Down Expand Up @@ -2561,10 +2563,11 @@ private:
// the InspectorChannelImpl and the InspectorClient.
class WebSocketIoHandler final {
public:
WebSocketIoHandler(kj::WebSocket& webSocket): webSocket(webSocket) {
WebSocketIoHandler(kj::Own<const kj::Executor> isolateThreadExecutor, kj::WebSocket& webSocket)
: isolateThreadExecutor(kj::mv(isolateThreadExecutor)),
webSocket(webSocket) {
// Assume we are being instantiated on the InspectorService thread, the thread that will do
// I/O for CDP messages. Messages are delivered to the InspectorChannelImpl on the Isolate thread.
incomingQueueNotifier = XThreadNotifier::create();
outgoingQueueNotifier = XThreadNotifier::create();
}

Expand Down Expand Up @@ -2595,7 +2598,34 @@ private:
// Message pumping promise that should be evaluated on the InspectorService
// thread.
kj::Promise<void> messagePump() {
return receiveLoop().exclusiveJoin(dispatchLoop()).exclusiveJoin(transmitLoop());
// Although inspector I/O must happen on the InspectorService thread (to make sure breakpoints
// don't block inspector I/O), inspector messages must be actually dispatched on the Isolate
// thread. So, we run the dispatch loop on the Isolate thread.
//
// Note that the above comment is only really accurate in vanilla workerd. In the case of the
// internal Cloudflare Workers runtime, `isolateThreadExecutor` may actually refer to the
// current thread's `kj::Executor`. That's fine; calling `executeAsync()` on the current
// thread's executor just posts the task to the event loop, and everything works as expected.

// Since the dispatch loop and the receive loop communicate over a XThreadNotifier, and
// XThreadNotifiers must be created on the thread which will call their `awaitNotification()`
// function, we awkwardly perform two `executeAsync()`s here, one to create the
// XThreadNotifier, then another to spawn the dispatch loop.
//
// We create a new XThreadNotifier for each `messagePump()` call, rather than try to re-use
// one long-term, because XThreadNotifiers' `awaitNotification()` function is not cancel-safe.
// That is, once its promise is cancelled, the notifier is broken.
auto incomingQueueNotifier =
co_await isolateThreadExecutor->executeAsync([]() { return XThreadNotifier::create(); });

auto dispatchLoopPromise = isolateThreadExecutor->executeAsync(
[this, notifier = kj::atomicAddRef(*incomingQueueNotifier)]() mutable {
return dispatchLoop(kj::mv(notifier));
});

co_return co_await receiveLoop(kj::mv(incomingQueueNotifier))
.exclusiveJoin(kj::mv(dispatchLoopPromise))
.exclusiveJoin(transmitLoop());
}

void send(kj::String message) {
Expand Down Expand Up @@ -2636,7 +2666,8 @@ private:
outgoingQueueNotifier->notify();
}

kj::Promise<void> receiveLoop() {
// Must be called on the InspectorService thread.
kj::Promise<void> receiveLoop(kj::Own<XThreadNotifier> incomingQueueNotifier) {
for (;;) {
auto message = co_await webSocket.receive(MAX_MESSAGE_SIZE);
KJ_SWITCH_ONEOF(message) {
Expand All @@ -2658,7 +2689,8 @@ private:
}
}

kj::Promise<void> dispatchLoop() {
// Must be called on the Isolate thread.
kj::Promise<void> dispatchLoop(kj::Own<XThreadNotifier> incomingQueueNotifier) {
for (;;) {
co_await incomingQueueNotifier->awaitNotification();
KJ_IF_SOME(c, channel) {
Expand All @@ -2667,6 +2699,7 @@ private:
}
}

// Must be called on the InspectorService thread.
kj::Promise<void> transmitLoop() {
for (;;) {
co_await outgoingQueueNotifier->awaitNotification();
Expand All @@ -2693,10 +2726,17 @@ private:
}
}

// We need access to the Isolate thread's kj::Executor to run the inspector dispatch loop. This
// doesn't actually have to be an Own, because the Isolate thread will destroy the Isolate
// before it exits, but it doesn't hurt.
kj::Own<const kj::Executor> isolateThreadExecutor;

kj::MutexGuarded<MessageQueue> incomingQueue;
kj::Own<XThreadNotifier> incomingQueueNotifier;
// The notifier for `incomingQueue`, `incomingQueueNotifier`, is created once per
// `messagePump()` call, and never re-used, so it doesn't live here.

kj::MutexGuarded<MessageQueue> outgoingQueue;
// This XThreadNotifier must be created on the InspectorService thread.
kj::Own<XThreadNotifier> outgoingQueueNotifier;

kj::WebSocket& webSocket; // only accessed on the InspectorService thread.
Expand Down Expand Up @@ -2850,11 +2890,20 @@ kj::Promise<void> Worker::Isolate::attachInspector(kj::Timer& timer,
headers.set(controlHeaderId, "{\"ewLog\":{\"status\":\"ok\"}}");
auto webSocket = response.acceptWebSocket(headers);

return attachInspector(timer, timerOffset, *webSocket).attach(kj::mv(webSocket));
// This `attachInspector()` overload is used by the internal Cloudflare Workers runtime, which has
// no concept of a single Isolate thread. Instead, it's okay for all inspector messages to be
// dispatched on the calling thread.
auto executor = kj::getCurrentThreadExecutor().addRef();

return attachInspector(kj::mv(executor), timer, timerOffset, *webSocket)
.attach(kj::mv(webSocket));
}

kj::Promise<void> Worker::Isolate::attachInspector(
kj::Timer& timer, kj::Duration timerOffset, kj::WebSocket& webSocket) const {
kj::Own<const kj::Executor> isolateThreadExecutor,
kj::Timer& timer,
kj::Duration timerOffset,
kj::WebSocket& webSocket) const {
KJ_REQUIRE(impl->inspector != kj::none);

return jsg::runInV8Stack([&](jsg::V8StackScope& stackScope) {
Expand All @@ -2872,8 +2921,8 @@ kj::Promise<void> Worker::Isolate::attachInspector(

lockedSelf.impl->inspectorClient.setInspectorTimerInfo(timer, timerOffset);

auto channel =
kj::heap<Worker::Isolate::InspectorChannelImpl>(kj::atomicAddRef(*this), webSocket);
auto channel = kj::heap<Worker::Isolate::InspectorChannelImpl>(
kj::atomicAddRef(*this), kj::mv(isolateThreadExecutor), webSocket);
lockedSelf.currentInspectorSession = *channel;
lockedSelf.impl->inspectorClient.setChannel(*channel);

Expand Down
22 changes: 20 additions & 2 deletions src/workerd/io/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,14 +315,32 @@ class Worker::Isolate: public kj::AtomicRefcounted {
uint getLockSuccessCount() const;

// Accepts a connection to the V8 inspector and handles requests until the client disconnects.
// Also adds a special JSON value to the header identified by `controlHeaderId`, for compatibility
// with internal Cloudflare systems.
//
// This overload will dispatch all inspector messages on the _calling thread's_ `kj::Executor`.
// When linked against vanilla V8, this means that CPU profiling will only profile JavaScript
// running on the _calling thread_, which will most likely only be inspector console commands, and
// is not typically desired.
//
// For the above reason , this overload is curently only suitable for use by the internal Workers
// Runtime codebase, which patches V8 to profile whichever thread currently holds the `v8::Locker`
// for this Isolate.
kj::Promise<void> attachInspector(kj::Timer& timer,
kj::Duration timerOffset,
kj::HttpService::Response& response,
const kj::HttpHeaderTable& headerTable,
kj::HttpHeaderId controlHeaderId) const;

kj::Promise<void> attachInspector(
kj::Timer& timer, kj::Duration timerOffset, kj::WebSocket& webSocket) const;
// Accepts a connection to the V8 inspector and handles requests until the client disconnects.
//
// This overload will dispatch all inspector messages on the `kj::Executor` passed in via
// `isolateThreadExecutor`. For CPU profiling to work as expected, this `kj::Executor` must be
// associated with the same thread which executes the Worker's JavaScript.
kj::Promise<void> attachInspector(kj::Own<const kj::Executor> isolateThreadExecutor,
kj::Timer& timer,
kj::Duration timerOffset,
kj::WebSocket& webSocket) const;

// Log a warning to the inspector if attached, and log an INFO severity message.
void logWarning(kj::StringPtr description, Worker::Lock& lock);
Expand Down
28 changes: 22 additions & 6 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1230,10 +1230,12 @@ private:
// to define the inspector socket.
class Server::InspectorService final: public kj::HttpService, public kj::HttpServerErrorHandler {
public:
InspectorService(kj::Timer& timer,
InspectorService(kj::Own<const kj::Executor> isolateThreadExecutor,
kj::Timer& timer,
kj::HttpHeaderTable::Builder& headerTableBuilder,
InspectorServiceIsolateRegistrar& registrar)
: timer(timer),
: isolateThreadExecutor(kj::mv(isolateThreadExecutor)),
timer(timer),
headerTable(headerTableBuilder.getFutureTable()),
server(timer, headerTable, *this, kj::HttpServerSettings{.errorHandler = *this}),
registrar(registrar) {
Expand Down Expand Up @@ -1288,7 +1290,8 @@ public:
auto webSocket = response.acceptWebSocket(responseHeaders);
kj::Duration timerOffset = 0 * kj::MILLISECONDS;
try {
co_return co_await ref->attachInspector(timer, timerOffset, *webSocket);
co_return co_await ref->attachInspector(
isolateThreadExecutor->addRef(), timer, timerOffset, *webSocket);
} catch (...) {
auto exception = kj::getCaughtExceptionAsKj();
if (exception.getType() == kj::Exception::Type::DISCONNECTED) {
Expand Down Expand Up @@ -1407,6 +1410,7 @@ public:
}

private:
kj::Own<const kj::Executor> isolateThreadExecutor;
kj::Timer& timer;
kj::HttpHeaderTable& headerTable;
kj::HashMap<kj::String, kj::Own<const Worker::Isolate::WeakIsolateRef>> isolates;
Expand Down Expand Up @@ -3481,14 +3485,26 @@ uint startInspector(
static constexpr uint DEFAULT_PORT = 9229;
kj::MutexGuarded<uint> inspectorPort(UNASSIGNED_PORT);

kj::Thread thread([inspectorAddress, &inspectorPort, &registrar]() {
// `startInspector()` is called on the Isolate thread. V8 requires CPU profiling to be started and
// stopped on the same thread which executes JavaScript -- that is, the Isolate thread -- which
// means we need to dispatch inspector messages on this thread. To help make that happen, we
// capture this thread's kj::Executor here, and pass it into the InspectorService below. Later,
// when the InspectorService receives a WebSocket connection, it calls
// `Isolate::attachInspector()`, which uses the kj::Executor we create here to create a
// XThreadNotifier and start a dispatch loop. The InspectorService reads subsequent WebSocket
// inspector messages and feeds them to that dispatch loop via the XThreadNotifier.
auto isolateThreadExecutor = kj::getCurrentThreadExecutor().addRef();

// Start the InspectorService thread.
kj::Thread thread([inspectorAddress, &inspectorPort, &registrar,
isolateThreadExecutor = kj::mv(isolateThreadExecutor)]() mutable {
kj::AsyncIoContext io = kj::setupAsyncIo();

kj::HttpHeaderTable::Builder headerTableBuilder;

// Create the special inspector service.
auto inspectorService(
kj::heap<Server::InspectorService>(io.provider->getTimer(), headerTableBuilder, registrar));
auto inspectorService(kj::heap<Server::InspectorService>(
kj::mv(isolateThreadExecutor), io.provider->getTimer(), headerTableBuilder, registrar));
auto ownHeaderTable = headerTableBuilder.build();

// Configure and start the inspector socket.
Expand Down
126 changes: 67 additions & 59 deletions src/workerd/server/tests/inspector/driver.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ beforeEach(async () => {
});

await workerd.start();

// We wait for the worker's HTTP port to come online before starting the test case. If we don't,
// and the inspector port comes online first, there's a chance the inspector connection will fail
// with 404 because the isolate doesn't exist yet.
await workerd.getListenPort('http');
});

// Stop workerd.
Expand All @@ -49,68 +54,71 @@ async function connectInspector(port) {
});
}

// TODO(soon): This test reproduces a null pointer dereference in workerd (possibly the same issue
// as https://github.com/cloudflare/workerd/issues/2564), but the test doesn't fail. :(
test('Can repeatedly connect and disconnect to the inspector port', async () => {
for (let i = 0; i < 5; ++i) {
let inspectorClient = await connectInspector(
await workerd.getListenInspectorPort()
async function profileAndExpectDeriveBitsFrames(inspectorClient) {
// Enable and start profiling.
await inspectorClient.Profiler.enable();
await inspectorClient.Profiler.start();

// Drive the worker with a test request. A single one is sufficient.
let httpPort = await workerd.getListenPort('http');
const response = await fetch(`http://localhost:${httpPort}`);
await response.arrayBuffer();

// Stop and disable profiling.
const profile = await inspectorClient.Profiler.stop();
await inspectorClient.Profiler.disable();

// Figure out which function name was most frequently sampled.
let hitCountMap = new Map();

for (let node of profile.profile.nodes) {
if (hitCountMap.get(node.callFrame.functionName) === undefined) {
hitCountMap.set(node.callFrame.functionName, 0);
}
hitCountMap.set(
node.callFrame.functionName,
hitCountMap.get(node.callFrame.functionName) + node.hitCount
);
}

// Drive the worker with a test request.
let httpPort = await workerd.getListenPort('http');
const response = await fetch(`http://localhost:${httpPort}`);
let body = await response.arrayBuffer();
console.log(body);
let max = {
name: null,
count: 0,
};

await inspectorClient.close();
for (let [name, count] of hitCountMap) {
if (count > max.count) {
max.name = name;
max.count = count;
}
}

// The most CPU-intensive function our test script runs is `deriveBits()`, so we expect that to be
// the most frequently sampled function.
assert.equal(max.name, 'deriveBits');
assert.notEqual(max.count, 0);
}

// Regression test for https://github.com/cloudflare/workerd/issues/1754.
//
// At one time, workerd produced only "(program)" frames.
test('Profiler mostly sees deriveBits() frames', async () => {
let inspectorClient = await connectInspector(
await workerd.getListenInspectorPort()
);
await profileAndExpectDeriveBitsFrames(inspectorClient);
await inspectorClient.close();
});

// TODO(soon): Re-enable once https://github.com/cloudflare/workerd/issues/2564 is solved.
// test("Profiler mostly sees deriveBits() frames", async () => {
// let inspectorClient = await connectInspector(await workerd.getListenInspectorPort());

// // Enable and start profiling.
// await inspectorClient.Profiler.enable();
// await inspectorClient.Profiler.start();

// // Drive the worker with a test request. A single one is sufficient.
// let httpPort = await workerd.getListenPort("http");
// const response = await fetch(`http://localhost:${httpPort}`);
// await response.arrayBuffer();

// // Stop and disable profiling.
// const profile = await inspectorClient.Profiler.stop();
// await inspectorClient.Profiler.disable();

// // Figure out which function name was most frequently sampled.
// let hitCountMap = new Map();

// for (let node of profile.profile.nodes) {
// if (hitCountMap.get(node.callFrame.functionName) === undefined) {
// hitCountMap.set(node.callFrame.functionName, 0);
// }
// hitCountMap.set(node.callFrame.functionName,
// hitCountMap.get(node.callFrame.functionName) + node.hitCount);
// }

// let max = {
// name: null,
// count: 0,
// };

// for (let [name, count] of hitCountMap) {
// if (count > max.count) {
// max.name = name;
// max.count = count;
// }
// }

// // The most CPU-intensive function our test script runs is `deriveBits()`, so we expect that to be
// // the most frequently sampled function.
// assert.equal(max.name, "deriveBits");
// assert.notEqual(max.count, 0);

// await inspectorClient.close();
// });
// Regression test for https://github.com/cloudflare/workerd/issues/2564.
//
// At one time, workerd segfaulted on the second inspector connection.
test('Can repeatedly reconnect the inspector and profiling still works', async () => {
for (let i = 0; i < 4; ++i) {
let inspectorClient = await connectInspector(
await workerd.getListenInspectorPort()
);
await profileAndExpectDeriveBitsFrames(inspectorClient);
await inspectorClient.close();
}
});

0 comments on commit 7e856d7

Please sign in to comment.