Skip to content

Commit

Permalink
Cancel syncGets() when ActorCache destroyed
Browse files Browse the repository at this point in the history
This commit attempts to provide a mechanism to cancel the promise
that waits for the read batches to complete in syncGets(). This is
needed for when the `ActorCache` is destroyed, since we need to destroy
all `Entry`s before we destroy the `SharedLru`.
  • Loading branch information
MellowYarker committed Apr 27, 2024
1 parent f977450 commit 773db0c
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 2 deletions.
41 changes: 39 additions & 2 deletions src/workerd/io/actor-cache.c++
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ size_t ActorCache::Entry::setPresentValue(Value newValue) {
void ActorCache::Entry::setAbsentValue() {
KJ_ASSERT(valueStatus == EntryValueStatus::UNKNOWN);
valueStatus = EntryValueStatus::ABSENT;
KJ_ASSERT_NONNULL(maybeCache).lru.size.fetch_sub(value.size());
value = {};
}

ActorCache::SharedLru::SharedLru(Options options): options(options) {}
Expand Down Expand Up @@ -577,6 +579,19 @@ public:
}
}

// Drops all entries.
//
// This is called by the client that's receiving the stream of get results when the promise is
// either completed or cancelled. In the completion case, we've already updated all `Entry`s, so
// it is safe to drop them here (and we would be dropping the GetMultiStreamImpl anyways).
//
// In the case that the promise was cancelled, we're probably destroying ActorCache and need to
// destroy all references to `Entry`s before we can destroy the SharedLru. Either way, we won't be
// referring to the entries anymore so this is fine.
void clear() {
entries = {};
}

kj::Promise<void> values(ValuesContext context) override {
if (!endFulfiller->isWaiting()) {
// The original caller stopped listening. Try to cancel the stream by throwing.
Expand Down Expand Up @@ -700,6 +715,13 @@ kj::Promise<void> ActorCache::syncGets(
auto entryIt = getFlush.entries.begin();
// The collection of batch flush promises we'll be waiting on.
kj::Vector<kj::Promise<void>> promises;

// If ActorCache is dtor'd, we need each GetMultiStreamImpl to drop their strong refs to `Entry`s
// so that we clear out all entries from cache.
// TODO(cleanup): Is there a way to actually get the `GetMultiStreamImpl objects/capnp capabilities
// to be dtor'd instead? That would be a lot nicer.
kj::Vector<GetMultiStreamImpl*> clients;

for (size_t i = 0; i < getFlush.batches.size(); ++i) {
auto [p, f] = kj::newPromiseAndFulfiller<void>();
promises.add(kj::mv(p));
Expand All @@ -724,19 +746,34 @@ kj::Promise<void> ActorCache::syncGets(
// The storage-proxy will stream results to us by calling `GetMultiStreamImpl::values()`,
// where we will update the `Entry`s in `batchEntries` with their values from storage (or set
// them ABSENT if they weren't found).
rpc::ActorStorage::ListStream::Client streamClient =
kj::Own<GetMultiStreamImpl> client =
kj::heap<GetMultiStreamImpl>(*this, batchEntries.releaseAsArray(), kj::mv(f));

clients.add(client.get());

rpc::ActorStorage::ListStream::Client streamClient(kj::mv(client));
req.setStream(streamClient);
promises.add(req.send().ignoreResult().eagerlyEvaluate([](auto e) -> kj::Promise<void>{
return kj::mv(e);
}));
}
KJ_DASSERT(entryIt == getFlush.entries.end());

// After this point, the only strong references to `Entry`s should be in the individual
// GetMultiStreamImpl objects.
getFlush.batches.clear();
getFlush.entries.clear();

co_await kj::joinPromises(promises.releaseAsArray());
// The ActorCache must own this promise because we want the destruction of ActorCache to destroy
// the `Entry`s associated with each GetMultiStreamImpl above.
multiReadTask = kj::joinPromises(promises.releaseAsArray()).attach(
kj::defer([&](){
for (auto& client: clients) {
client->clear();
}
}));

co_await multiReadTask;
}

kj::OneOf<kj::Maybe<kj::Date>, kj::Promise<kj::Maybe<kj::Date>>> ActorCache::getAlarm(
Expand Down
12 changes: 12 additions & 0 deletions src/workerd/io/actor-cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,18 @@ class ActorCache final: public ActorCacheInterface {

ReadFulfiller readFulfiller;

// syncGets() creates a GetMultiStreamImpl for each batch that we read, and each
// GetMultiStreamImpl has an array of strong references to `Entry`s. Although we may be holding
// strong references, these `Entry`s are really sitting in the dirtyList, and GetMultiStreamImpl
// is filling the Entry with its value from storage. However, upon the destruction of the
// ActorCache, we need to make sure that all `Entry`s are dropped before we try to destroy our
// SharedLru, otherwise we may see use-after-frees.
//
// To ensure this destruction ordering is met, we store the joint get promises in ActorCache,
// and set up a kj::defer task that clears out the entries in each GetMultiStreamImpl upon the
// completion/cancellation of the joint get promise. See `syncGets()` for more details.
kj::Promise<void> multiReadTask = kj::READY_NOW;

// Did we hit a problem that makes the ActorCache unusable? If so this is the exception that
// describes the problem.
kj::Maybe<kj::Exception> maybeTerminalException;
Expand Down

0 comments on commit 773db0c

Please sign in to comment.