Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EW-8447 Revert CPU profiling fix #2565

Merged
merged 3 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 8 additions & 44 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -2339,11 +2339,8 @@ struct MessageQueue {

class Worker::Isolate::InspectorChannelImpl final: public v8_inspector::V8Inspector::Channel {
public:
InspectorChannelImpl(kj::Own<const Worker::Isolate> isolateParam,
ExecutorNotifierPair isolateThreadExecutorNotifierPair,
kj::WebSocket& webSocket)
: ioHandler(kj::mv(isolateThreadExecutorNotifierPair), webSocket),
state(kj::heap<State>(this, kj::mv(isolateParam))) {
InspectorChannelImpl(kj::Own<const Worker::Isolate> isolateParam, kj::WebSocket& webSocket)
: ioHandler(webSocket), state(kj::heap<State>(this, kj::mv(isolateParam))) {
ioHandler.connect(*this);
}

Expand Down Expand Up @@ -2596,12 +2593,11 @@ private:
// the InspectorChannelImpl and the InspectorClient.
class WebSocketIoHandler final {
public:
WebSocketIoHandler(ExecutorNotifierPair isolateThreadExecutorNotifierPair, kj::WebSocket& webSocket)
: isolateThreadExecutor(kj::mv(isolateThreadExecutorNotifierPair.executor)),
incomingQueueNotifier(kj::mv(isolateThreadExecutorNotifierPair.notifier)),
webSocket(webSocket) {
WebSocketIoHandler(kj::WebSocket& webSocket)
: webSocket(webSocket) {
// Assume we are being instantiated on the InspectorService thread, the thread that will do
// I/O for CDP messages. Messages are delivered to the InspectorChannelImpl on the Isolate thread.
incomingQueueNotifier = XThreadNotifier::create();
outgoingQueueNotifier = XThreadNotifier::create();
}

Expand Down Expand Up @@ -2634,20 +2630,7 @@ private:
// Message pumping promise that should be evaluated on the InspectorService
// thread.
kj::Promise<void> messagePump() {
// Although inspector I/O must happen on the InspectorService thread (to make sure breakpoints
// don't block inspector I/O), inspector messages must be actually dispatched on the Isolate
// thread. So, we run the dispatch loop on the Isolate thread.
//
// Note that the above comment is only really accurate in vanilla workerd. In the case of the
// internal Cloudflare Workers runtime, `isolateThreadExecutor` may actually refer to the
// current thread's `kj::Executor`. That's fine; calling `executeAsync()` on the current
// thread's executor just posts the task to the event loop, and everything works as expected.
auto dispatchLoopPromise = isolateThreadExecutor->executeAsync([this]() {
return dispatchLoop();
});
return receiveLoop()
.exclusiveJoin(kj::mv(dispatchLoopPromise))
.exclusiveJoin(transmitLoop());
return receiveLoop().exclusiveJoin(dispatchLoop()).exclusiveJoin(transmitLoop());
}

void send(kj::String message) {
Expand Down Expand Up @@ -2688,7 +2671,6 @@ private:
outgoingQueueNotifier->notify();
}

// Must be called on the InspectorService thread.
kj::Promise<void> receiveLoop() {
for (;;) {
auto message = co_await webSocket.receive(MAX_MESSAGE_SIZE);
Expand All @@ -2711,7 +2693,6 @@ private:
}
}

// Must be called on the Isolate thread.
kj::Promise<void> dispatchLoop() {
for (;;) {
co_await incomingQueueNotifier->awaitNotification();
Expand All @@ -2721,7 +2702,6 @@ private:
}
}

// Must be called on the InspectorService thread.
kj::Promise<void> transmitLoop() {
for (;;) {
co_await outgoingQueueNotifier->awaitNotification();
Expand All @@ -2748,17 +2728,10 @@ private:
}
}

// We need access to the Isolate thread's kj::Executor to run the inspector dispatch loop. This
// doesn't actually have to be an Own, because the Isolate thread will destroy the Isolate
// before it exits, but it doesn't hurt.
kj::Own<const kj::Executor> isolateThreadExecutor;

kj::MutexGuarded<MessageQueue> incomingQueue;
// This XThreadNotifier must be created on the Isolate thread.
kj::Own<XThreadNotifier> incomingQueueNotifier;

kj::MutexGuarded<MessageQueue> outgoingQueue;
// This XThreadNotifier must be created on the InspectorService thread.
kj::Own<XThreadNotifier> outgoingQueueNotifier;

kj::WebSocket& webSocket; // only accessed on the InspectorService thread.
Expand Down Expand Up @@ -2908,17 +2881,11 @@ kj::Promise<void> Worker::Isolate::attachInspector(
headers.set(controlHeaderId, "{\"ewLog\":{\"status\":\"ok\"}}");
auto webSocket = response.acceptWebSocket(headers);

// This `attachInspector()` overload is used by the internal Cloudflare Workers runtime, which has
// no concept of a single Isolate thread. Instead, it's okay for all inspector messages to be
// dispatched on the calling thread.
auto executorNotifierPair = ExecutorNotifierPair{};

return attachInspector(kj::mv(executorNotifierPair), timer, timerOffset, *webSocket)
return attachInspector(timer, timerOffset, *webSocket)
.attach(kj::mv(webSocket));
}

kj::Promise<void> Worker::Isolate::attachInspector(
ExecutorNotifierPair isolateThreadExecutorNotifierPair,
kj::Timer& timer,
kj::Duration timerOffset,
kj::WebSocket& webSocket) const {
Expand All @@ -2939,10 +2906,7 @@ kj::Promise<void> Worker::Isolate::attachInspector(

lockedSelf.impl->inspectorClient.setInspectorTimerInfo(timer, timerOffset);

auto channel = kj::heap<Worker::Isolate::InspectorChannelImpl>(
kj::atomicAddRef(*this),
kj::mv(isolateThreadExecutorNotifierPair),
webSocket);
auto channel = kj::heap<Worker::Isolate::InspectorChannelImpl>(kj::atomicAddRef(*this), webSocket);
lockedSelf.currentInspectorSession = *channel;
lockedSelf.impl->inspectorClient.setChannel(*channel);

Expand Down
17 changes: 0 additions & 17 deletions src/workerd/io/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -288,31 +288,14 @@ class Worker::Isolate: public kj::AtomicRefcounted {
uint getLockSuccessCount() const;

// Accepts a connection to the V8 inspector and handles requests until the client disconnects.
// Also adds a special JSON value to the header identified by `controlHeaderId`, for compatibility
// with internal Cloudflare systems.
//
// This overload will dispatch all inspector messages on the _calling thread's_ `kj::Executor`.
// When linked against vanilla V8, this means that CPU profiling will only profile JavaScript
// running on the _calling thread_, which will most likely only be inspector console commands, and
// is not typically desired.
//
// For the above reason , this overload is curently only suitable for use by the internal Workers
// Runtime codebase, which patches V8 to profile whichever thread currently holds the `v8::Locker`
// for this Isolate.
kj::Promise<void> attachInspector(
kj::Timer& timer,
kj::Duration timerOffset,
kj::HttpService::Response& response,
const kj::HttpHeaderTable& headerTable,
kj::HttpHeaderId controlHeaderId) const;

// Accepts a connection to the V8 inspector and handles requests until the client disconnects.
//
// This overload will dispatch all inspector messages on the `kj::Executor` passed in via
// `isolateThreadExecutorNotifierPair`. For CPU profiling to work as expected, this `kj::Executor`
// must be associated with the same thread which executes the Worker's JavaScript.
kj::Promise<void> attachInspector(
ExecutorNotifierPair isolateThreadExecutorNotifierPair,
kj::Timer& timer,
kj::Duration timerOffset,
kj::WebSocket& webSocket) const;
Expand Down
28 changes: 4 additions & 24 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1165,12 +1165,10 @@ private:
class Server::InspectorService final: public kj::HttpService, public kj::HttpServerErrorHandler {
public:
InspectorService(
ExecutorNotifierPair isolateThreadExecutorNotifierPair,
kj::Timer& timer,
kj::HttpHeaderTable::Builder& headerTableBuilder,
InspectorServiceIsolateRegistrar& registrar)
: isolateThreadExecutorNotifierPair(kj::mv(isolateThreadExecutorNotifierPair)),
timer(timer),
: timer(timer),
headerTable(headerTableBuilder.getFutureTable()),
server(timer, headerTable, *this, kj::HttpServerSettings {
.errorHandler = *this
Expand Down Expand Up @@ -1228,8 +1226,7 @@ public:
auto webSocket = response.acceptWebSocket(responseHeaders);
kj::Duration timerOffset = 0 * kj::MILLISECONDS;
try {
co_return co_await ref->attachInspector(
isolateThreadExecutorNotifierPair.clone(), timer, timerOffset, *webSocket);
co_return co_await ref->attachInspector(timer, timerOffset, *webSocket);
} catch (...) {
auto exception = kj::getCaughtExceptionAsKj();
if (exception.getType() == kj::Exception::Type::DISCONNECTED) {
Expand Down Expand Up @@ -1347,7 +1344,6 @@ public:
}

private:
ExecutorNotifierPair isolateThreadExecutorNotifierPair;
kj::Timer& timer;
kj::HttpHeaderTable& headerTable;
kj::HashMap<kj::String, kj::Own<const Worker::Isolate::WeakIsolateRef>> isolates;
Expand Down Expand Up @@ -3441,30 +3437,14 @@ uint startInspector(kj::StringPtr inspectorAddress,
static constexpr uint DEFAULT_PORT = 9229;
kj::MutexGuarded<uint> inspectorPort(UNASSIGNED_PORT);

// `startInspector()` is called on the Isolate thread. V8 requires CPU profiling to be started and
// stopped on the same thread which executes JavaScript -- that is, the Isolate thread -- which
// means we need to dispatch inspector messages on this thread. To help make that happen, we
// capture this thread's kj::Executor and create an XThreadNotifier tied to this thread here, and
// pass it into the InspectorService below. Later, when the InspectorService receives a WebSocket
// connection, it calls `Isolate::attachInspector()`, which starts a dispatch loop on the
// kj::Executor we create here. The InspectorService reads subsequent WebSocket inspector messages
// and feeds them to that dispatch loop via the XThreadNotifier we create here.
auto isolateThreadExecutorNotifierPair = ExecutorNotifierPair{};

// Start the InspectorService thread.
kj::Thread thread([inspectorAddress, &inspectorPort, &registrar,
isolateThreadExecutorNotifierPair = kj::mv(isolateThreadExecutorNotifierPair)]() mutable {
kj::Thread thread([inspectorAddress, &inspectorPort, &registrar](){
kj::AsyncIoContext io = kj::setupAsyncIo();

kj::HttpHeaderTable::Builder headerTableBuilder;

// Create the special inspector service.
auto inspectorService(
kj::heap<Server::InspectorService>(
kj::mv(isolateThreadExecutorNotifierPair),
io.provider->getTimer(),
headerTableBuilder,
registrar));
kj::heap<Server::InspectorService>(io.provider->getTimer(), headerTableBuilder, registrar));
auto ownHeaderTable = headerTableBuilder.build();

// Configure and start the inspector socket.
Expand Down
127 changes: 73 additions & 54 deletions src/workerd/server/tests/inspector/driver.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ import assert from "node:assert";
import CDP from "chrome-remote-interface";
import { WorkerdServerHarness } from "@workerd/test/server-harness.mjs";

// Globals that are reset for each test.
// Global that is reset for each test.
let workerd;
let inspectorClient;

assert(env.WORKERD_BINARY !== undefined, "You must set the WORKERD_BINARY environment variable.");
assert(env.WORKERD_CONFIG !== undefined, "You must set the WORKERD_CONFIG environment variable.");

// Start workerd and connect to its inspector port with our CDP library.
// Start workerd.
beforeEach(async () => {
workerd = new WorkerdServerHarness({
workerdBinary: env.WORKERD_BINARY,
Expand All @@ -22,9 +21,18 @@ beforeEach(async () => {
});

await workerd.start();
});

inspectorClient = await CDP({
port: await workerd.getListenInspectorPort(),
// Stop workerd.
afterEach(async () => {
const [code, signal] = await workerd.stop();
assert(code === 0 || signal === "SIGTERM");
workerd = null;
});

async function connectInspector(port) {
return await CDP({
port,

// Hard-coded to match a service name expected in the `workerdConfig` file.
target: "/main",
Expand All @@ -33,57 +41,68 @@ beforeEach(async () => {
// implement the inspector protocol message in question.
local: true,
});
});

// Stop both our CDP client and workerd.
afterEach(async () => {
await inspectorClient.close();
inspectorClient = null;
}

const [code, signal] = await workerd.stop();
assert(code === 0 || signal === "SIGTERM");
workerd = null;
});
// TODO(soon): This test reproduces a null pointer dereference in workerd (possibly the same issue
// as https://github.com/cloudflare/workerd/issues/2564), but the test doesn't fail. :(
test("Can repeatedly connect and disconnect to the inspector port", async () => {
for (let i = 0; i < 5; ++i) {
let inspectorClient = await connectInspector(await workerd.getListenInspectorPort());

test("Profiler mostly sees deriveBits() frames", async () => {
// Enable and start profiling.
await inspectorClient.Profiler.enable();
await inspectorClient.Profiler.start();

// Drive the worker with a test request. A single one is sufficient.
let httpPort = await workerd.getListenPort("http");
const response = await fetch(`http://localhost:${httpPort}`);
await response.arrayBuffer();

// Stop and disable profiling.
const profile = await inspectorClient.Profiler.stop();
await inspectorClient.Profiler.disable();

// Figure out which function name was most frequently sampled.
let hitCountMap = new Map();

for (let node of profile.profile.nodes) {
if (hitCountMap.get(node.callFrame.functionName) === undefined) {
hitCountMap.set(node.callFrame.functionName, 0);
}
hitCountMap.set(node.callFrame.functionName,
hitCountMap.get(node.callFrame.functionName) + node.hitCount);
}

let max = {
name: null,
count: 0,
};
// Drive the worker with a test request.
let httpPort = await workerd.getListenPort("http");
const response = await fetch(`http://localhost:${httpPort}`);
let body = await response.arrayBuffer();
console.log(body);

for (let [name, count] of hitCountMap) {
if (count > max.count) {
max.name = name;
max.count = count;
}
await inspectorClient.close();
}

// The most CPU-intensive function our test script runs is `deriveBits()`, so we expect that to be
// the most frequently sampled function.
assert.equal(max.name, "deriveBits");
assert.notEqual(max.count, 0);
});

// TODO(soon): Re-enable once https://github.com/cloudflare/workerd/issues/2564 is solved.
// test("Profiler mostly sees deriveBits() frames", async () => {
// let inspectorClient = await connectInspector(await workerd.getListenInspectorPort());

// // Enable and start profiling.
// await inspectorClient.Profiler.enable();
// await inspectorClient.Profiler.start();

// // Drive the worker with a test request. A single one is sufficient.
// let httpPort = await workerd.getListenPort("http");
// const response = await fetch(`http://localhost:${httpPort}`);
// await response.arrayBuffer();

// // Stop and disable profiling.
// const profile = await inspectorClient.Profiler.stop();
// await inspectorClient.Profiler.disable();

// // Figure out which function name was most frequently sampled.
// let hitCountMap = new Map();

// for (let node of profile.profile.nodes) {
// if (hitCountMap.get(node.callFrame.functionName) === undefined) {
// hitCountMap.set(node.callFrame.functionName, 0);
// }
// hitCountMap.set(node.callFrame.functionName,
// hitCountMap.get(node.callFrame.functionName) + node.hitCount);
// }

// let max = {
// name: null,
// count: 0,
// };

// for (let [name, count] of hitCountMap) {
// if (count > max.count) {
// max.name = name;
// max.count = count;
// }
// }

// // The most CPU-intensive function our test script runs is `deriveBits()`, so we expect that to be
// // the most frequently sampled function.
// assert.equal(max.name, "deriveBits");
// assert.notEqual(max.count, 0);

// await inspectorClient.close();
// });
15 changes: 0 additions & 15 deletions src/workerd/util/xthreadnotifier.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,4 @@ class XThreadNotifier final: public kj::AtomicRefcounted {
kj::MutexGuarded<kj::PromiseCrossThreadFulfillerPair<void>> paf;
};


// Convenience struct for creating and passing around a kj::Executor and XThreadNotifier. The
// default constructor creates a pair of the objects which are both tied to the current thread.
struct ExecutorNotifierPair {
kj::Own<const kj::Executor> executor = kj::getCurrentThreadExecutor().addRef();
kj::Own<XThreadNotifier> notifier = XThreadNotifier::create();

ExecutorNotifierPair clone() {
return {
.executor = executor->addRef(),
.notifier = kj::atomicAddRef(*notifier),
};
}
};

} // namespace workerd
Loading