Skip to content

Commit

Permalink
Implement climem memory monitor service
Browse files Browse the repository at this point in the history
  • Loading branch information
jasnell committed Feb 1, 2024
1 parent e3439fb commit 78f5cdd
Show file tree
Hide file tree
Showing 13 changed files with 258 additions and 0 deletions.
5 changes: 5 additions & 0 deletions src/workerd/io/worker.c++
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <workerd/util/color-util.h>
#include <workerd/util/mimetype.h>
#include <workerd/util/stream-utils.h>
#include <workerd/util/string-buffer.h>
#include <workerd/util/thread-scopes.h>
#include <workerd/util/xthreadnotifier.h>
#include <workerd/api/actor-state.h>
Expand Down Expand Up @@ -1245,6 +1246,10 @@ Worker::Script::~Script() noexcept(false) {
});
}

const jsg::MemStats Worker::Isolate::getCurrentMemStats(jsg::Lock& lock) const {
return api->getCurrentMemStats(lock);
}

const Worker::Isolate& Worker::Isolate::from(jsg::Lock& js) {
auto ptr = js.v8Isolate->GetData(3);
KJ_ASSERT(ptr != nullptr);
Expand Down
6 changes: 6 additions & 0 deletions src/workerd/io/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ class Worker::Isolate: public kj::AtomicRefcounted {

kj::Own<const WeakIsolateRef> getWeakRef() const;

const jsg::MemStats getCurrentMemStats(jsg::Lock&) const;

private:
kj::Promise<AsyncLock> takeAsyncLockImpl(
kj::Maybe<kj::Own<IsolateObserver::LockTiming>> lockTiming) const;
Expand Down Expand Up @@ -442,6 +444,10 @@ class Worker::Api {
return kj::none;
}

virtual const jsg::MemStats getCurrentMemStats(jsg::Lock&) const {
return {};
}

// Set the module fallback service callback, if any.
using ModuleFallbackCallback =
kj::Maybe<kj::OneOf<kj::String, jsg::ModuleRegistry::ModuleInfo>>(
Expand Down
33 changes: 33 additions & 0 deletions src/workerd/jsg/jsg.c++
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "setup.h"
#include "workerd/jsg/util.h"
#include <workerd/util/thread-scopes.h>
#include <workerd/util/string-buffer.h>

namespace workerd::jsg {

Expand Down Expand Up @@ -320,4 +321,36 @@ kj::String Name::toString(jsg::Lock& js) {
KJ_UNREACHABLE;
}

kj::String jsg::MemStats::toString(
kj::Maybe<size_t> maybeRss) const {
StringBuffer<256> buf(256);
buf.append("{");

// These first ones are modeled after Node.js' process.memoryUsage() and use
// the same names for consistency. These are used to support the interaction
// with the climem tool
buf.append("\"rss\":", kj::str(maybeRss.orDefault(0)), ",");
buf.append("\"heapTotal\":", kj::str(total_heap_size), ",");
buf.append("\"heapUsed\":", kj::str(used_heap_size), ",");
buf.append("\"external\":", kj::str(external_memory), ",");
// We use the external metric for arrayBuffers too..
buf.append("\"arrayBuffers\":", kj::str(external_memory), ",");

// These additional fields aren't currently used by climem but are useful
// for additional context.
buf.append("\"totalSizeSizeExec\":", kj::str(total_heap_size_executable), ",");
buf.append("\"totalPhysicalSize\":", kj::str(total_physical_size), ",");
buf.append("\"totalAvailableSize\":", kj::str(total_available_size), ",");
buf.append("\"totalGlobalHandlesSize\":", kj::str(total_global_handles_size), ",");
buf.append("\"usedGlobalHandlesSize\":", kj::str(used_global_handles_size), ",");
buf.append("\"heapSizeLimit\":", kj::str(heap_size_limit), ",");
buf.append("\"mallocedMemory\":", kj::str(malloced_memory), ",");
buf.append("\"peakMallocedMemory\":", kj::str(peak_malloced_memory), ",");
buf.append("\"numberOfNativeContexts\":", kj::str(number_of_native_contexts), ",");
buf.append("\"numberOfDetachedContexts\":", kj::str(number_of_detached_contexts));

buf.append("}\n");
return buf.toString();
};

} // namespace workerd::jsg
19 changes: 19 additions & 0 deletions src/workerd/jsg/jsg.h
Original file line number Diff line number Diff line change
Expand Up @@ -2302,6 +2302,25 @@ auto runInV8Stack(auto callback) {
return V8StackScope::runInV8StackImpl(__builtin_frame_address(0), kj::mv(callback));
};


// =======================================================================================
struct MemStats {
size_t total_heap_size;
size_t total_heap_size_executable;
size_t total_physical_size;
size_t total_available_size;
size_t total_global_handles_size;
size_t used_global_handles_size;
size_t used_heap_size;
size_t heap_size_limit;
size_t malloced_memory;
size_t external_memory;
size_t peak_malloced_memory;
size_t number_of_native_contexts;
size_t number_of_detached_contexts;
kj::String toString(kj::Maybe<size_t> maybeRss) const;
};

// =======================================================================================
// inline implementation details

Expand Down
20 changes: 20 additions & 0 deletions src/workerd/jsg/setup.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,26 @@ class IsolateBase {

IsolateObserver& getObserver() { return *observer; }

inline const MemStats getCurrentMemStats(Lock&) const {
v8::HeapStatistics stats;
ptr->GetHeapStatistics(&stats);
return {
.total_heap_size = stats.total_heap_size(),
.total_heap_size_executable = stats.total_heap_size_executable(),
.total_physical_size = stats.total_physical_size(),
.total_available_size = stats.total_available_size(),
.total_global_handles_size = stats.total_global_handles_size(),
.used_global_handles_size = stats.used_global_handles_size(),
.used_heap_size = stats.used_heap_size(),
.heap_size_limit = stats.heap_size_limit(),
.malloced_memory = stats.malloced_memory(),
.external_memory = stats.external_memory(),
.peak_malloced_memory = stats.peak_malloced_memory(),
.number_of_native_contexts = stats.number_of_native_contexts(),
.number_of_detached_contexts = stats.number_of_detached_contexts(),
};
}

private:
template <typename TypeWrapper>
friend class Isolate;
Expand Down
99 changes: 99 additions & 0 deletions src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <workerd/io/compatibility-date.h>
#include <workerd/io/io-context.h>
#include <workerd/io/worker.h>
#include <workerd/util/mem-utils.h>
#include <time.h>
#include <openssl/bio.h>
#include <openssl/pem.h>
Expand Down Expand Up @@ -1075,6 +1076,93 @@ private:
friend class Server::InspectorService;
};

// The ClimemServiceRegistrar will expose a climem service port for each isolate that
// is attached.
class Server::ClimemServiceRegistrar final {
public:
class ClimemErrorHandler final: public kj::TaskSet::ErrorHandler {
public:
void taskFailed(kj::Exception&& exception) override {
KJ_LOG(ERROR, "Climem errored", exception);
}
};
static ClimemErrorHandler INSTANCE;

ClimemServiceRegistrar(kj::StringPtr address) : address(kj::str(address)) {}
~ClimemServiceRegistrar() noexcept(true) {}
KJ_DISALLOW_COPY_AND_MOVE(ClimemServiceRegistrar);

static kj::Promise<void> handleConnection(
kj::Own<kj::AsyncIoStream> conn,
kj::Timer& timer,
Worker::Isolate* isolate,
const Worker& worker,
kj::StringPtr name) {
KJ_LOG(INFO, kj::str("Climem connected to ", name));
try {
for (;;) {
co_await timer.afterDelay(100 * kj::MILLISECONDS);
auto asyncLock = co_await worker.takeAsyncLockWithoutRequest(nullptr);
auto stats = worker.runInLockScope(asyncLock, [&](auto& lock) {
return isolate->getCurrentMemStats(lock)
.toString(util::tryGetResidentSetMemory());
});
co_await conn->write(stats.begin(), stats.size());
}
} catch (...) {
auto exception = kj::getCaughtExceptionAsKj();
if (exception.getType() == kj::Exception::Type::DISCONNECTED) {
// Normal disconnection. Ignore and move on.
KJ_LOG(INFO, kj::str("Climem disconnected from ", name));
co_return;
}
kj::throwFatalException(kj::mv(exception));
}
}

void registerIsolate(kj::StringPtr name, Worker::Isolate* isolate) {
auto thread = kj::heap<kj::Thread>([&worker=worker, name, isolate, address=address.asPtr()] {
kj::AsyncIoContext io = kj::setupAsyncIo();
auto& network = io.provider->getNetwork();
auto& timer = io.provider->getTimer();

auto listen = (kj::coCapture([&worker, &network, &timer, name, address, isolate]()
-> kj::Promise<void>{
auto parsed = co_await network.parseAddress(address, 0);
auto listener = parsed->listen();
kj::TaskSet tasks(INSTANCE);

KJ_LOG(INFO, kj::str("Climem listening for ", name, " on ", listener->getPort()));

for (;;) {
auto conn = co_await listener->accept();
tasks.add(handleConnection(kj::mv(conn), timer, isolate,
KJ_ASSERT_NONNULL(worker), name));
}

co_return;
}))();

kj::NEVER_DONE.wait(io.waitScope);
});
thread->detach();
}

void setWorker(Worker& newWorker) {
worker = newWorker;
}

private:
kj::String address;
kj::Maybe<const Worker&> worker;

kj::Promise<void> attach(kj::Own<kj::AsyncIoStream> conn, kj::Timer& timer) {
return kj::NEVER_DONE;
}
};

Server::ClimemServiceRegistrar::ClimemErrorHandler Server::ClimemServiceRegistrar::INSTANCE;

// Implements the interface for the devtools inspector protocol.
//
// The InspectorService is created when workerd serve is called using the -i option
Expand Down Expand Up @@ -2558,6 +2646,9 @@ kj::Own<Server::Service> Server::makeWorker(kj::StringPtr name, config::Worker::
KJ_IF_SOME(isolateRegistrar, inspectorIsolateRegistrar) {
isolateRegistrar->registerIsolate(name, isolate.get());
}
KJ_IF_SOME(climemRegistrar, climemServiceRegistrar) {
climemRegistrar->registerIsolate(name, isolate.get());
}

if (conf.hasModuleFallback()) {
KJ_REQUIRE(experimental,
Expand Down Expand Up @@ -2763,6 +2854,10 @@ kj::Own<Server::Service> Server::makeWorker(kj::StringPtr name, config::Worker::
Worker::Lock::TakeSynchronously(kj::none),
errorReporter);

KJ_IF_SOME(climemRegistrar, climemServiceRegistrar) {
climemRegistrar->setWorker(*worker);
}

{
worker->runInLockScope(Worker::Lock::TakeSynchronously(kj::none), [&](Worker::Lock& lock) {
lock.validateHandlers(errorReporter);
Expand Down Expand Up @@ -3283,6 +3378,10 @@ void Server::startServices(jsg::V8System& v8System, config::Config::Reader confi
inspectorIsolateRegistrar = kj::mv(registrar);
}

KJ_IF_SOME(climemAddress, climemOverride) {
climemServiceRegistrar = kj::heap<ClimemServiceRegistrar>(climemAddress);
}

// Second pass: Build services.
for (auto serviceConf: config.getServices()) {
kj::StringPtr name = serviceConf.getName();
Expand Down
7 changes: 7 additions & 0 deletions src/workerd/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class Server: private kj::TaskSet::ErrorHandler {
void enableInspector(kj::String addr) {
inspectorOverride = kj::mv(addr);
}
void enableClimem(kj::String addr) {
climemOverride = kj::mv(addr);
}
void enableControl(uint fd) {
controlOverride = kj::heap<kj::FdOutputStream>(fd);
}
Expand Down Expand Up @@ -83,6 +86,8 @@ class Server: private kj::TaskSet::ErrorHandler {
class InspectorService;
class InspectorServiceIsolateRegistrar;

class ClimemServiceRegistrar;

private:
kj::Filesystem& fs;
kj::Timer& timer;
Expand All @@ -104,7 +109,9 @@ class Server: private kj::TaskSet::ErrorHandler {
kj::HashMap<kj::String, kj::String> externalOverrides;

kj::Maybe<kj::String> inspectorOverride;
kj::Maybe<kj::String> climemOverride;
kj::Maybe<kj::Own<InspectorServiceIsolateRegistrar>> inspectorIsolateRegistrar;
kj::Maybe<kj::Own<ClimemServiceRegistrar>> climemServiceRegistrar;
kj::Maybe<kj::Own<kj::FdOutputStream>> controlOverride;

struct GlobalContext;
Expand Down
4 changes: 4 additions & 0 deletions src/workerd/server/workerd-api.c++
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,10 @@ WorkerdApi::Global WorkerdApi::Global::clone() const {
return result;
}

const jsg::MemStats WorkerdApi::getCurrentMemStats(jsg::Lock& js) const {
return impl->jsgIsolate.getCurrentMemStats(js);
}

const WorkerdApi& WorkerdApi::from(const Worker::Api& api) {
return kj::downcast<const WorkerdApi>(api);
}
Expand Down
2 changes: 2 additions & 0 deletions src/workerd/server/workerd-api.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ class WorkerdApi final: public Worker::Api {
void setModuleFallbackCallback(
kj::Function<ModuleFallbackCallback>&& callback) const override;

const jsg::MemStats getCurrentMemStats(jsg::Lock&) const override;

private:
struct Impl;
kj::Own<Impl> impl;
Expand Down
6 changes: 6 additions & 0 deletions src/workerd/server/workerd.c++
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,8 @@ public:
"<addr> instead of the address specified in the config file.")
.addOptionWithArg({'i', "inspector-addr"}, CLI_METHOD(enableInspector), "<addr>",
"Enable the inspector protocol to connect to the address <addr>.")
.addOptionWithArg({'m', "climem"}, CLI_METHOD(enableClimem), "<addr>",
"Enable a local memory-monitor service")
#if defined(WORKERD_USE_PERFETTO)
// TODO(later): In the future, we might want to enable providing a perfetto
// TraceConfig structure here rather than just the categories.
Expand Down Expand Up @@ -787,6 +789,10 @@ public:
server.enableInspector(kj::str(param));
}

void enableClimem(kj::StringPtr param) {
server.enableClimem(kj::str(param));
}

void enableControl(kj::StringPtr param) {
int fd = KJ_UNWRAP_OR(param.tryParseAs<uint>(),
CLI_ERROR("Output value must be a file descriptor (non-negative integer)."));
Expand Down
1 change: 1 addition & 0 deletions src/workerd/util/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ wd_cc_library(
wd_cc_library(
name = "util",
srcs = [
"mem-utils.c++",
"mimetype.c++",
"stream-utils.c++",
"uuid.c++",
Expand Down
47 changes: 47 additions & 0 deletions src/workerd/util/mem-utils.c++
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#include "mem-utils.h"

#include <kj/debug.h>
#include <kj/filesystem.h>

#if !_WIN32
#include <fcntl.h>
#include <stdlib.h>
#include <unistd.h>
#endif

namespace workerd::util {

#if defined(__linux__)
kj::Maybe<size_t> tryGetResidentSetMemory() {
int fd_;
KJ_SYSCALL(fd_ = open("/proc/self/stat", O_RDONLY));
kj::AutoCloseFd fd(fd_);
auto text = kj::FdInputStream(kj::mv(fd)).readAllText();
auto p = text.begin();
for (kj::uint i = 0; i < 23; i++) {
p = strchr(p, ' ');
KJ_ASSERT(p != nullptr, "/proc/self/stat format not understood", text);
++p;
}
auto rss = strtoull(p, nullptr, 10);
size_t result = rss * getpagesize();
return result;
}
#elif defined(__APPLE__)
kj::Maybe<size_t> tryGetResidentSetMemory() {
// TODO(soon): Implement etting the RSS for macOS
return kj::none;
}
#elif defined(_WIN32)
kj::Maybe<size_t> tryGetResidentSetMemory() {
// TODO(soon): Implement getting the RSS for Windows
return kj::none;
}
#elif
kj::Maybe<size_t> tryGetResidentSetMemory() {
// For all other platforms we simply return nothing
return kj::none;
}
#endif

} // namespace workerd::util
Loading

0 comments on commit 78f5cdd

Please sign in to comment.