Skip to content

Commit

Permalink
Merge pull request #2565 from cloudflare/harris/revert-cpu-profiling-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell authored Aug 20, 2024
2 parents b22fdb2 + e8e9829 commit a142944
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 154 deletions.
52 changes: 8 additions & 44 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2339,11 +2339,8 @@ struct MessageQueue {

class Worker::Isolate::InspectorChannelImpl final: public v8_inspector::V8Inspector::Channel {
public:
InspectorChannelImpl(kj::Own<const Worker::Isolate> isolateParam,
ExecutorNotifierPair isolateThreadExecutorNotifierPair,
kj::WebSocket& webSocket)
: ioHandler(kj::mv(isolateThreadExecutorNotifierPair), webSocket),
state(kj::heap<State>(this, kj::mv(isolateParam))) {
InspectorChannelImpl(kj::Own<const Worker::Isolate> isolateParam, kj::WebSocket& webSocket)
: ioHandler(webSocket), state(kj::heap<State>(this, kj::mv(isolateParam))) {
ioHandler.connect(*this);
}

Expand Down Expand Up @@ -2596,12 +2593,11 @@ private:
// the InspectorChannelImpl and the InspectorClient.
class WebSocketIoHandler final {
public:
WebSocketIoHandler(ExecutorNotifierPair isolateThreadExecutorNotifierPair, kj::WebSocket& webSocket)
: isolateThreadExecutor(kj::mv(isolateThreadExecutorNotifierPair.executor)),
incomingQueueNotifier(kj::mv(isolateThreadExecutorNotifierPair.notifier)),
webSocket(webSocket) {
WebSocketIoHandler(kj::WebSocket& webSocket)
: webSocket(webSocket) {
// Assume we are being instantiated on the InspectorService thread, the thread that will do
// I/O for CDP messages. Messages are delivered to the InspectorChannelImpl on the Isolate thread.
incomingQueueNotifier = XThreadNotifier::create();
outgoingQueueNotifier = XThreadNotifier::create();
}

Expand Down Expand Up @@ -2634,20 +2630,7 @@ private:
// Message pumping promise that should be evaluated on the InspectorService
// thread.
kj::Promise<void> messagePump() {
// Although inspector I/O must happen on the InspectorService thread (to make sure breakpoints
// don't block inspector I/O), inspector messages must be actually dispatched on the Isolate
// thread. So, we run the dispatch loop on the Isolate thread.
//
// Note that the above comment is only really accurate in vanilla workerd. In the case of the
// internal Cloudflare Workers runtime, `isolateThreadExecutor` may actually refer to the
// current thread's `kj::Executor`. That's fine; calling `executeAsync()` on the current
// thread's executor just posts the task to the event loop, and everything works as expected.
auto dispatchLoopPromise = isolateThreadExecutor->executeAsync([this]() {
return dispatchLoop();
});
return receiveLoop()
.exclusiveJoin(kj::mv(dispatchLoopPromise))
.exclusiveJoin(transmitLoop());
return receiveLoop().exclusiveJoin(dispatchLoop()).exclusiveJoin(transmitLoop());
}

void send(kj::String message) {
Expand Down Expand Up @@ -2688,7 +2671,6 @@ private:
outgoingQueueNotifier->notify();
}

// Must be called on the InspectorService thread.
kj::Promise<void> receiveLoop() {
for (;;) {
auto message = co_await webSocket.receive(MAX_MESSAGE_SIZE);
Expand All @@ -2711,7 +2693,6 @@ private:
}
}

// Must be called on the Isolate thread.
kj::Promise<void> dispatchLoop() {
for (;;) {
co_await incomingQueueNotifier->awaitNotification();
Expand All @@ -2721,7 +2702,6 @@ private:
}
}

// Must be called on the InspectorService thread.
kj::Promise<void> transmitLoop() {
for (;;) {
co_await outgoingQueueNotifier->awaitNotification();
Expand All @@ -2748,17 +2728,10 @@ private:
}
}

// We need access to the Isolate thread's kj::Executor to run the inspector dispatch loop. This
// doesn't actually have to be an Own, because the Isolate thread will destroy the Isolate
// before it exits, but it doesn't hurt.
kj::Own<const kj::Executor> isolateThreadExecutor;

kj::MutexGuarded<MessageQueue> incomingQueue;
// This XThreadNotifier must be created on the Isolate thread.
kj::Own<XThreadNotifier> incomingQueueNotifier;

kj::MutexGuarded<MessageQueue> outgoingQueue;
// This XThreadNotifier must be created on the InspectorService thread.
kj::Own<XThreadNotifier> outgoingQueueNotifier;

kj::WebSocket& webSocket; // only accessed on the InspectorService thread.
Expand Down Expand Up @@ -2908,17 +2881,11 @@ kj::Promise<void> Worker::Isolate::attachInspector(
headers.set(controlHeaderId, "{\"ewLog\":{\"status\":\"ok\"}}");
auto webSocket = response.acceptWebSocket(headers);

// This `attachInspector()` overload is used by the internal Cloudflare Workers runtime, which has
// no concept of a single Isolate thread. Instead, it's okay for all inspector messages to be
// dispatched on the calling thread.
auto executorNotifierPair = ExecutorNotifierPair{};

return attachInspector(kj::mv(executorNotifierPair), timer, timerOffset, *webSocket)
return attachInspector(timer, timerOffset, *webSocket)
.attach(kj::mv(webSocket));
}

kj::Promise<void> Worker::Isolate::attachInspector(
ExecutorNotifierPair isolateThreadExecutorNotifierPair,
kj::Timer& timer,
kj::Duration timerOffset,
kj::WebSocket& webSocket) const {
Expand All @@ -2939,10 +2906,7 @@ kj::Promise<void> Worker::Isolate::attachInspector(

lockedSelf.impl->inspectorClient.setInspectorTimerInfo(timer, timerOffset);

auto channel = kj::heap<Worker::Isolate::InspectorChannelImpl>(
kj::atomicAddRef(*this),
kj::mv(isolateThreadExecutorNotifierPair),
webSocket);
auto channel = kj::heap<Worker::Isolate::InspectorChannelImpl>(kj::atomicAddRef(*this), webSocket);
lockedSelf.currentInspectorSession = *channel;
lockedSelf.impl->inspectorClient.setChannel(*channel);

Expand Down
17 changes: 0 additions & 17 deletions src/workerd/io/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,31 +288,14 @@ class Worker::Isolate: public kj::AtomicRefcounted {
uint getLockSuccessCount() const;

// Accepts a connection to the V8 inspector and handles requests until the client disconnects.
// Also adds a special JSON value to the header identified by `controlHeaderId`, for compatibility
// with internal Cloudflare systems.
//
// This overload will dispatch all inspector messages on the _calling thread's_ `kj::Executor`.
// When linked against vanilla V8, this means that CPU profiling will only profile JavaScript
// running on the _calling thread_, which will most likely only be inspector console commands, and
// is not typically desired.
//
// For the above reason , this overload is curently only suitable for use by the internal Workers
// Runtime codebase, which patches V8 to profile whichever thread currently holds the `v8::Locker`
// for this Isolate.
kj::Promise<void> attachInspector(
kj::Timer& timer,
kj::Duration timerOffset,
kj::HttpService::Response& response,
const kj::HttpHeaderTable& headerTable,
kj::HttpHeaderId controlHeaderId) const;

// Accepts a connection to the V8 inspector and handles requests until the client disconnects.
//
// This overload will dispatch all inspector messages on the `kj::Executor` passed in via
// `isolateThreadExecutorNotifierPair`. For CPU profiling to work as expected, this `kj::Executor`
// must be associated with the same thread which executes the Worker's JavaScript.
kj::Promise<void> attachInspector(
ExecutorNotifierPair isolateThreadExecutorNotifierPair,
kj::Timer& timer,
kj::Duration timerOffset,
kj::WebSocket& webSocket) const;
Expand Down
28 changes: 4 additions & 24 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1165,12 +1165,10 @@ private:
class Server::InspectorService final: public kj::HttpService, public kj::HttpServerErrorHandler {
public:
InspectorService(
ExecutorNotifierPair isolateThreadExecutorNotifierPair,
kj::Timer& timer,
kj::HttpHeaderTable::Builder& headerTableBuilder,
InspectorServiceIsolateRegistrar& registrar)
: isolateThreadExecutorNotifierPair(kj::mv(isolateThreadExecutorNotifierPair)),
timer(timer),
: timer(timer),
headerTable(headerTableBuilder.getFutureTable()),
server(timer, headerTable, *this, kj::HttpServerSettings {
.errorHandler = *this
Expand Down Expand Up @@ -1228,8 +1226,7 @@ public:
auto webSocket = response.acceptWebSocket(responseHeaders);
kj::Duration timerOffset = 0 * kj::MILLISECONDS;
try {
co_return co_await ref->attachInspector(
isolateThreadExecutorNotifierPair.clone(), timer, timerOffset, *webSocket);
co_return co_await ref->attachInspector(timer, timerOffset, *webSocket);
} catch (...) {
auto exception = kj::getCaughtExceptionAsKj();
if (exception.getType() == kj::Exception::Type::DISCONNECTED) {
Expand Down Expand Up @@ -1347,7 +1344,6 @@ public:
}

private:
ExecutorNotifierPair isolateThreadExecutorNotifierPair;
kj::Timer& timer;
kj::HttpHeaderTable& headerTable;
kj::HashMap<kj::String, kj::Own<const Worker::Isolate::WeakIsolateRef>> isolates;
Expand Down Expand Up @@ -3441,30 +3437,14 @@ uint startInspector(kj::StringPtr inspectorAddress,
static constexpr uint DEFAULT_PORT = 9229;
kj::MutexGuarded<uint> inspectorPort(UNASSIGNED_PORT);

// `startInspector()` is called on the Isolate thread. V8 requires CPU profiling to be started and
// stopped on the same thread which executes JavaScript -- that is, the Isolate thread -- which
// means we need to dispatch inspector messages on this thread. To help make that happen, we
// capture this thread's kj::Executor and create an XThreadNotifier tied to this thread here, and
// pass it into the InspectorService below. Later, when the InspectorService receives a WebSocket
// connection, it calls `Isolate::attachInspector()`, which starts a dispatch loop on the
// kj::Executor we create here. The InspectorService reads subsequent WebSocket inspector messages
// and feeds them to that dispatch loop via the XThreadNotifier we create here.
auto isolateThreadExecutorNotifierPair = ExecutorNotifierPair{};

// Start the InspectorService thread.
kj::Thread thread([inspectorAddress, &inspectorPort, &registrar,
isolateThreadExecutorNotifierPair = kj::mv(isolateThreadExecutorNotifierPair)]() mutable {
kj::Thread thread([inspectorAddress, &inspectorPort, &registrar](){
kj::AsyncIoContext io = kj::setupAsyncIo();

kj::HttpHeaderTable::Builder headerTableBuilder;

// Create the special inspector service.
auto inspectorService(
kj::heap<Server::InspectorService>(
kj::mv(isolateThreadExecutorNotifierPair),
io.provider->getTimer(),
headerTableBuilder,
registrar));
kj::heap<Server::InspectorService>(io.provider->getTimer(), headerTableBuilder, registrar));
auto ownHeaderTable = headerTableBuilder.build();

// Configure and start the inspector socket.
Expand Down
127 changes: 73 additions & 54 deletions src/workerd/server/tests/inspector/driver.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ import assert from "node:assert";
import CDP from "chrome-remote-interface";
import { WorkerdServerHarness } from "@workerd/test/server-harness.mjs";

// Globals that are reset for each test.
// Global that is reset for each test.
let workerd;
let inspectorClient;

assert(env.WORKERD_BINARY !== undefined, "You must set the WORKERD_BINARY environment variable.");
assert(env.WORKERD_CONFIG !== undefined, "You must set the WORKERD_CONFIG environment variable.");

// Start workerd and connect to its inspector port with our CDP library.
// Start workerd.
beforeEach(async () => {
workerd = new WorkerdServerHarness({
workerdBinary: env.WORKERD_BINARY,
Expand All @@ -22,9 +21,18 @@ beforeEach(async () => {
});

await workerd.start();
});

inspectorClient = await CDP({
port: await workerd.getListenInspectorPort(),
// Stop workerd.
afterEach(async () => {
const [code, signal] = await workerd.stop();
assert(code === 0 || signal === "SIGTERM");
workerd = null;
});

async function connectInspector(port) {
return await CDP({
port,

// Hard-coded to match a service name expected in the `workerdConfig` file.
target: "/main",
Expand All @@ -33,57 +41,68 @@ beforeEach(async () => {
// implement the inspector protocol message in question.
local: true,
});
});

// Stop both our CDP client and workerd.
afterEach(async () => {
await inspectorClient.close();
inspectorClient = null;
}

const [code, signal] = await workerd.stop();
assert(code === 0 || signal === "SIGTERM");
workerd = null;
});
// TODO(soon): This test reproduces a null pointer dereference in workerd (possibly the same issue
// as https://github.com/cloudflare/workerd/issues/2564), but the test doesn't fail. :(
test("Can repeatedly connect and disconnect to the inspector port", async () => {
for (let i = 0; i < 5; ++i) {
let inspectorClient = await connectInspector(await workerd.getListenInspectorPort());

test("Profiler mostly sees deriveBits() frames", async () => {
// Enable and start profiling.
await inspectorClient.Profiler.enable();
await inspectorClient.Profiler.start();

// Drive the worker with a test request. A single one is sufficient.
let httpPort = await workerd.getListenPort("http");
const response = await fetch(`http://localhost:${httpPort}`);
await response.arrayBuffer();

// Stop and disable profiling.
const profile = await inspectorClient.Profiler.stop();
await inspectorClient.Profiler.disable();

// Figure out which function name was most frequently sampled.
let hitCountMap = new Map();

for (let node of profile.profile.nodes) {
if (hitCountMap.get(node.callFrame.functionName) === undefined) {
hitCountMap.set(node.callFrame.functionName, 0);
}
hitCountMap.set(node.callFrame.functionName,
hitCountMap.get(node.callFrame.functionName) + node.hitCount);
}

let max = {
name: null,
count: 0,
};
// Drive the worker with a test request.
let httpPort = await workerd.getListenPort("http");
const response = await fetch(`http://localhost:${httpPort}`);
let body = await response.arrayBuffer();
console.log(body);

for (let [name, count] of hitCountMap) {
if (count > max.count) {
max.name = name;
max.count = count;
}
await inspectorClient.close();
}

// The most CPU-intensive function our test script runs is `deriveBits()`, so we expect that to be
// the most frequently sampled function.
assert.equal(max.name, "deriveBits");
assert.notEqual(max.count, 0);
});

// TODO(soon): Re-enable once https://github.com/cloudflare/workerd/issues/2564 is solved.
// test("Profiler mostly sees deriveBits() frames", async () => {
// let inspectorClient = await connectInspector(await workerd.getListenInspectorPort());

// // Enable and start profiling.
// await inspectorClient.Profiler.enable();
// await inspectorClient.Profiler.start();

// // Drive the worker with a test request. A single one is sufficient.
// let httpPort = await workerd.getListenPort("http");
// const response = await fetch(`http://localhost:${httpPort}`);
// await response.arrayBuffer();

// // Stop and disable profiling.
// const profile = await inspectorClient.Profiler.stop();
// await inspectorClient.Profiler.disable();

// // Figure out which function name was most frequently sampled.
// let hitCountMap = new Map();

// for (let node of profile.profile.nodes) {
// if (hitCountMap.get(node.callFrame.functionName) === undefined) {
// hitCountMap.set(node.callFrame.functionName, 0);
// }
// hitCountMap.set(node.callFrame.functionName,
// hitCountMap.get(node.callFrame.functionName) + node.hitCount);
// }

// let max = {
// name: null,
// count: 0,
// };

// for (let [name, count] of hitCountMap) {
// if (count > max.count) {
// max.name = name;
// max.count = count;
// }
// }

// // The most CPU-intensive function our test script runs is `deriveBits()`, so we expect that to be
// // the most frequently sampled function.
// assert.equal(max.name, "deriveBits");
// assert.notEqual(max.count, 0);

// await inspectorClient.close();
// });
15 changes: 0 additions & 15 deletions src/workerd/util/xthreadnotifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,4 @@ class XThreadNotifier final: public kj::AtomicRefcounted {
kj::MutexGuarded<kj::PromiseCrossThreadFulfillerPair<void>> paf;
};


// Convenience struct for creating and passing around a kj::Executor and XThreadNotifier. The
// default constructor creates a pair of the objects which are both tied to the current thread.
struct ExecutorNotifierPair {
kj::Own<const kj::Executor> executor = kj::getCurrentThreadExecutor().addRef();
kj::Own<XThreadNotifier> notifier = XThreadNotifier::create();

ExecutorNotifierPair clone() {
return {
.executor = executor->addRef(),
.notifier = kj::atomicAddRef(*notifier),
};
}
};

} // namespace workerd

0 comments on commit a142944

Please sign in to comment.