Skip to content

Commit

Permalink
Merge pull request #2779 from cloudflare/mar/memory-cache-lwt
Browse files Browse the repository at this point in the history
Add lock wait time metrics for Memory Cache.
  • Loading branch information
mar-cf authored Sep 27, 2024
2 parents 59e6a73 + 76a905c commit 297ce16
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 21 deletions.
38 changes: 25 additions & 13 deletions src/workerd/api/memory-cache.c++
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ static bool hasExpired(const kj::Maybe<double>& expiration, bool allowOutsideIoC

SharedMemoryCache::SharedMemoryCache(kj::Maybe<const MemoryCacheProvider&> provider,
kj::StringPtr id,
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler)
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler,
const kj::MonotonicClock& timer)
: data(),
provider(provider),
id(kj::str(id)),
additionalResizeMemoryLimitHandler(additionalResizeMemoryLimitHandler) {}
additionalResizeMemoryLimitHandler(additionalResizeMemoryLimitHandler),
timer(timer) {}

SharedMemoryCache::~SharedMemoryCache() noexcept(false) {
KJ_IF_SOME(p, provider) {
Expand Down Expand Up @@ -219,8 +221,9 @@ void SharedMemoryCache::removeIfExistsWhileLocked(
kj::Own<const SharedMemoryCache> SharedMemoryCache::create(
kj::Maybe<const MemoryCacheProvider&> provider,
kj::StringPtr id,
kj::Maybe<AdditionalResizeMemoryLimitHandler&> handler) {
return kj::atomicRefcounted<const SharedMemoryCache>(provider, id, handler);
kj::Maybe<AdditionalResizeMemoryLimitHandler&> handler,
const kj::MonotonicClock& timer) {
return kj::atomicRefcounted<const SharedMemoryCache>(provider, id, handler, timer);
}

SharedMemoryCache::Use::Use(kj::Own<const SharedMemoryCache> cache, const Limits& limits)
Expand All @@ -240,14 +243,22 @@ SharedMemoryCache::Use::~Use() noexcept(false) {
}

kj::Maybe<kj::Own<CacheValue>> SharedMemoryCache::Use::getWithoutFallback(
const kj::String& key) const {
auto data = cache->data.lockExclusive();
const kj::String& key, SpanBuilder& span) const {
kj::Locked<ThreadUnsafeData> data = [&] {
auto memoryCacheLockRecord =
ScopedDurationTagger(span, memoryCachekLockWaitTimeTag, cache->timer);
return cache->data.lockExclusive();
}();
return cache->getWhileLocked(*data, key);
}

kj::OneOf<kj::Own<CacheValue>, kj::Promise<SharedMemoryCache::Use::GetWithFallbackOutcome>>
SharedMemoryCache::Use::getWithFallback(const kj::String& key) const {
auto data = cache->data.lockExclusive();
SharedMemoryCache::Use::getWithFallback(const kj::String& key, SpanBuilder& span) const {
kj::Locked<ThreadUnsafeData> data = [&] {
auto memoryCacheLockRecord =
ScopedDurationTagger(span, memoryCachekLockWaitTimeTag, cache->timer);
return cache->data.lockExclusive();
}();
KJ_IF_SOME(existingValue, cache->getWhileLocked(*data, key)) {
return kj::mv(existingValue);
} else KJ_IF_SOME(existingInProgress, data->inProgress.find(key)) {
Expand Down Expand Up @@ -374,7 +385,7 @@ jsg::Promise<jsg::JsRef<jsg::JsValue>> MemoryCache::read(jsg::Lock& js,
auto readSpan = IoContext::current().makeTraceSpan("memory_cache_read"_kjc);

KJ_IF_SOME(fallback, optionalFallback) {
KJ_SWITCH_ONEOF(cacheUse.getWithFallback(key.value)) {
KJ_SWITCH_ONEOF(cacheUse.getWithFallback(key.value, readSpan)) {
KJ_CASE_ONEOF(result, kj::Own<CacheValue>) {
// Optimization: Don't even release the isolate lock if the value is aleady in cache.
jsg::Deserializer deserializer(js, result->bytes.asPtr());
Expand Down Expand Up @@ -423,7 +434,7 @@ jsg::Promise<jsg::JsRef<jsg::JsValue>> MemoryCache::read(jsg::Lock& js,
}
KJ_UNREACHABLE;
} else {
KJ_IF_SOME(cacheValue, cacheUse.getWithoutFallback(key.value)) {
KJ_IF_SOME(cacheValue, cacheUse.getWithoutFallback(key.value, readSpan)) {
jsg::Deserializer deserializer(js, cacheValue->bytes.asPtr());
return js.resolvedPromise(jsg::JsRef(js, deserializer.readValue(js)));
}
Expand All @@ -433,10 +444,11 @@ jsg::Promise<jsg::JsRef<jsg::JsValue>> MemoryCache::read(jsg::Lock& js,

// ======================================================================================

MemoryCacheProvider::MemoryCacheProvider(
MemoryCacheProvider::MemoryCacheProvider(const kj::MonotonicClock& timer,
kj::Maybe<SharedMemoryCache::AdditionalResizeMemoryLimitHandler>
additionalResizeMemoryLimitHandler)
: additionalResizeMemoryLimitHandler(kj::mv(additionalResizeMemoryLimitHandler)) {}
: additionalResizeMemoryLimitHandler(kj::mv(additionalResizeMemoryLimitHandler)),
timer(timer) {}

MemoryCacheProvider::~MemoryCacheProvider() noexcept(false) {
// TODO(cleanup): Later, assuming progress is made on kj::Ptr<T>, we ought to be able
Expand All @@ -456,7 +468,7 @@ kj::Own<const SharedMemoryCache> MemoryCacheProvider::getInstance(
-> SharedMemoryCache::AdditionalResizeMemoryLimitHandler& {
return const_cast<SharedMemoryCache::AdditionalResizeMemoryLimitHandler&>(handler);
});
return SharedMemoryCache::create(provider, id, handler);
return SharedMemoryCache::create(provider, id, handler, timer);
};

KJ_IF_SOME(cid, cacheId) {
Expand Down
23 changes: 17 additions & 6 deletions src/workerd/api/memory-cache.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#pragma once

#include <workerd/io/trace.h>
#include <workerd/jsg/jsg.h>
#include <workerd/util/uuid.h>

#include <kj/hash.h>
#include <kj/map.h>
#include <kj/mutex.h>
#include <kj/table.h>
#include <kj/timer.h>

#include <set>

Expand Down Expand Up @@ -150,7 +152,8 @@ class SharedMemoryCache: public kj::AtomicRefcounted {

SharedMemoryCache(kj::Maybe<const MemoryCacheProvider&> provider,
kj::StringPtr id,
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler);
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler,
const kj::MonotonicClock& timer);

~SharedMemoryCache() noexcept(false);

Expand All @@ -160,7 +163,8 @@ class SharedMemoryCache: public kj::AtomicRefcounted {

static kj::Own<const SharedMemoryCache> create(kj::Maybe<const MemoryCacheProvider&> provider,
kj::StringPtr id,
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler);
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler,
const kj::MonotonicClock& timer);

public:
// RAII class that attaches itself to a cache, suggests cache limits to the
Expand All @@ -176,7 +180,8 @@ class SharedMemoryCache: public kj::AtomicRefcounted {
// Returns a cached value for the given key if one exists (and has not
// expired). If no such value exists, nothing is returned, regardless of any
// in-progress fallbacks trying to produce such a value.
kj::Maybe<kj::Own<CacheValue>> getWithoutFallback(const kj::String& key) const;
kj::Maybe<kj::Own<CacheValue>> getWithoutFallback(
const kj::String& key, SpanBuilder& span) const;

struct FallbackResult {
kj::Own<CacheValue> value;
Expand All @@ -191,7 +196,7 @@ class SharedMemoryCache: public kj::AtomicRefcounted {
// or to a FallbackDoneCallback. In the latter case, the caller should
// invoke the fallback function.
kj::OneOf<kj::Own<CacheValue>, kj::Promise<GetWithFallbackOutcome>> getWithFallback(
const kj::String& key) const;
const kj::String& key, SpanBuilder& span) const;

private:
// Creates a new FallbackDoneCallback associated with the given
Expand All @@ -210,6 +215,7 @@ class SharedMemoryCache: public kj::AtomicRefcounted {
void handleFallbackFailure(InProgress& inProgress) const;

kj::Own<const SharedMemoryCache> cache;
static constexpr auto memoryCachekLockWaitTimeTag = "memory_cache_lock_wait_time_ns"_kjc;
Limits limits;
};

Expand Down Expand Up @@ -453,6 +459,8 @@ class SharedMemoryCache: public kj::AtomicRefcounted {
// Same as above, the MemoryCacheProvider owns the actual handler here. Since that is guaranteed
// to outlive this SharedMemoryCache instance, so is the handler.
kj::Maybe<AdditionalResizeMemoryLimitHandler&> additionalResizeMemoryLimitHandler;

const kj::MonotonicClock& timer;
};

// JavaScript class that allows accessing an in-memory cache.
Expand Down Expand Up @@ -488,8 +496,9 @@ class MemoryCache: public jsg::Object {
// the in memory cache is being used.
class MemoryCacheProvider {
public:
MemoryCacheProvider(kj::Maybe<SharedMemoryCache::AdditionalResizeMemoryLimitHandler>
additionalResizeMemoryLimitHandler = kj::none);
MemoryCacheProvider(const kj::MonotonicClock& timer,
kj::Maybe<SharedMemoryCache::AdditionalResizeMemoryLimitHandler>
additionalResizeMemoryLimitHandler = kj::none);
KJ_DISALLOW_COPY_AND_MOVE(MemoryCacheProvider);
~MemoryCacheProvider() noexcept(false);

Expand All @@ -507,6 +516,8 @@ class MemoryCacheProvider {
// to avoid the use of the bare pointer to SharedMemoryCache* here. When the SharedMemoryCache
// is destroyed, it will remove itself from this cache by calling removeInstance.
kj::MutexGuarded<kj::HashMap<kj::String, const SharedMemoryCache*>> caches;

const kj::MonotonicClock& timer;
};

// clang-format off
Expand Down
15 changes: 15 additions & 0 deletions src/workerd/io/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -795,4 +795,19 @@ void WorkerTracer::setTrace(rpc::Trace::Reader reader) {
trace->mergeFrom(reader, pipelineLogLevel);
}

ScopedDurationTagger::ScopedDurationTagger(
SpanBuilder& span, kj::ConstString key, const kj::MonotonicClock& timer)
: span(span),
key(kj::mv(key)),
timer(timer),
startTime(timer.now()) {}

ScopedDurationTagger::~ScopedDurationTagger() noexcept(false) {
auto duration = timer.now() - startTime;
if (isPredictableModeForTest()) {
duration = 0 * kj::NANOSECONDS;
}
span.setTag(kj::mv(key), duration / kj::NANOSECONDS);
}

} // namespace workerd
17 changes: 17 additions & 0 deletions src/workerd/io/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -704,4 +704,21 @@ struct TraceParentContext {
SpanParent limeParentSpan;
};

// RAII object that measures the time duration over its lifetime. It tags this duration onto a
// given request span using a specified tag name. Ideal for automatically tracking and logging
// execution times within a scoped block.
class ScopedDurationTagger {
public:
explicit ScopedDurationTagger(
SpanBuilder& span, kj::ConstString key, const kj::MonotonicClock& timer);
~ScopedDurationTagger() noexcept(false);
KJ_DISALLOW_COPY_AND_MOVE(ScopedDurationTagger);

private:
SpanBuilder& span;
kj::ConstString key;
const kj::MonotonicClock& timer;
const kj::TimePoint startTime;
};

} // namespace workerd
2 changes: 1 addition & 1 deletion src/workerd/server/server.c++
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ Server::Server(kj::Filesystem& fs,
entropySource(entropySource),
reportConfigError(kj::mv(reportConfigError)),
consoleMode(consoleMode),
memoryCacheProvider(kj::heap<api::MemoryCacheProvider>()),
memoryCacheProvider(kj::heap<api::MemoryCacheProvider>(timer)),
tasks(*this) {}

Server::~Server() noexcept(false) {}
Expand Down
2 changes: 1 addition & 1 deletion src/workerd/tests/test-fixture.c++
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ TestFixture::TestFixture(SetupParams&& params)
false),
isolateLimitEnforcer(kj::heap<MockIsolateLimitEnforcer>()),
errorReporter(kj::heap<MockErrorReporter>()),
memoryCacheProvider(kj::heap<api::MemoryCacheProvider>()),
memoryCacheProvider(kj::heap<api::MemoryCacheProvider>(*timer)),
api(kj::heap<server::WorkerdApi>(testV8System,
params.featureFlags.orDefault(CompatibilityFlags::Reader()),
*isolateLimitEnforcer,
Expand Down

0 comments on commit 297ce16

Please sign in to comment.