From 9f101d30a35a060feda4582fc04c1776d426ba02 Mon Sep 17 00:00:00 2001 From: Joe Lee Date: Thu, 5 Sep 2024 11:12:50 -0700 Subject: [PATCH] sqlite actor alarms: store alarm time in sqlite Also, enforce similar alarm properties in ActorSqlite as we do in ActorCache. --- src/workerd/io/BUILD.bazel | 10 + src/workerd/io/actor-sqlite-test.c++ | 440 +++++++++++++++++++++++++++ src/workerd/io/actor-sqlite.c++ | 174 ++++++++++- src/workerd/io/actor-sqlite.h | 49 +++ 4 files changed, 667 insertions(+), 6 deletions(-) create mode 100644 src/workerd/io/actor-sqlite-test.c++ diff --git a/src/workerd/io/BUILD.bazel b/src/workerd/io/BUILD.bazel index e4581ad0228c..d3ff05f00f14 100644 --- a/src/workerd/io/BUILD.bazel +++ b/src/workerd/io/BUILD.bazel @@ -314,6 +314,16 @@ kj_test( ], ) +kj_test( + src = "actor-sqlite-test.c++", + deps = [ + ":actor", + ":io-gate", + "//src/workerd/util:test", + "//src/workerd/util:test-util", + ], +) + kj_test( src = "promise-wrapper-test.c++", deps = [":io"], diff --git a/src/workerd/io/actor-sqlite-test.c++ b/src/workerd/io/actor-sqlite-test.c++ new file mode 100644 index 000000000000..62ebaba2c8cb --- /dev/null +++ b/src/workerd/io/actor-sqlite-test.c++ @@ -0,0 +1,440 @@ +// Copyright (c) 2024 Cloudflare, Inc. +// Licensed under the Apache 2.0 license found in the LICENSE file or at: +// https://opensource.org/licenses/Apache-2.0 + +#include "actor-sqlite.h" +#include +#include +#include "io-gate.h" +#include +#include + +namespace workerd { +namespace { + +static constexpr kj::Date oneMs = 1 * kj::MILLISECONDS + kj::UNIX_EPOCH; +static constexpr kj::Date twoMs = 2 * kj::MILLISECONDS + kj::UNIX_EPOCH; + +template +kj::Promise eagerlyReportExceptions(kj::Promise promise, kj::SourceLocation location = {}) { + return promise.eagerlyEvaluate([location](kj::Exception&& e) -> T { + KJ_LOG_AT(ERROR, location, e); + kj::throwFatalException(kj::mv(e)); + }); +} + +// Expect that a synchronous result is returned. +template +T expectSync(kj::OneOf> result, kj::SourceLocation location = {}) { + KJ_SWITCH_ONEOF(result) { + KJ_CASE_ONEOF(promise, kj::Promise) { + KJ_FAIL_ASSERT_AT(location, "result was unexpectedly asynchronous"); + } + KJ_CASE_ONEOF(value, T) { + return kj::mv(value); + } + } + KJ_UNREACHABLE; +} + +struct ActorSqliteTestOptions final { + bool monitorOutputGate = true; +}; + +class ActorSqliteTestHooks; + +struct ActorSqliteTest final { + kj::EventLoop loop; + kj::WaitScope ws; + + OutputGate gate; + kj::Own vfsDir; + SqliteDatabase::Vfs vfs; + SqliteDatabase db; + + kj::Vector>> commitFulfillers; + kj::Vector calls; + + class ActorSqliteTestHooks final: public ActorSqlite::Hooks { + public: + ActorSqliteTestHooks(ActorSqliteTest& parent): parent(parent) {} + + kj::Promise> getAlarm() override { + parent.calls.add(kj::str("getAlarm")); + return kj::Maybe(kj::none); + } + + kj::Promise setAlarm(kj::Maybe newAlarmTime) override { + auto time = newAlarmTime.map([](auto& t) { return kj::str(t); }).orDefault(kj::str("none")); + parent.calls.add(kj::str("setAlarm(", time, ")")); + return kj::READY_NOW; + } + + private: + ActorSqliteTest& parent; + }; + ActorSqliteTestHooks hooks = ActorSqliteTestHooks(*this); + + ActorSqlite actor; + + kj::Promise gateBrokenPromise; + kj::UnwindDetector unwindDetector; + + ActorSqliteTest(ActorSqliteTestOptions options = {}) + : ws(loop), + vfsDir(kj::newInMemoryDirectory(kj::nullClock())), + vfs(*vfsDir), + db(vfs, kj::Path({"foo"}), kj::WriteMode::CREATE | kj::WriteMode::MODIFY), + actor(kj::attachRef(db), gate, KJ_BIND_METHOD(*this, commitCallback), hooks), + gateBrokenPromise(options.monitorOutputGate ? eagerlyReportExceptions(gate.onBroken()) + : kj::Promise(kj::READY_NOW)) {} + + ~ActorSqliteTest() noexcept(false) { + // Make sure if the output gate has been broken, the exception was reported. This is important + // to report errors thrown inside flush(), since those won't otherwise propagate into the test + // body. + if (!unwindDetector.isUnwinding()) { + gateBrokenPromise.poll(ws); + expectCalls({}, "unexpected calls at end of test"); + } + } + + kj::Promise commitCallback() { + auto [promise, fulfiller] = kj::newPromiseAndFulfiller(); + calls.add(kj::str("commit")); + commitFulfillers.add(kj::mv(fulfiller)); + return kj::mv(promise); + } + + void resolveCommits(int expectedCount, kj::SourceLocation location = {}) { + ws.poll(); + KJ_ASSERT_AT(expectedCount == commitFulfillers.size(), location); + auto fulfillers = kj::mv(commitFulfillers); + for (auto& f: fulfillers) { + f->fulfill(); + } + } + + void rejectCommits(int expectedCount, kj::SourceLocation location = {}) { + ws.poll(); + KJ_ASSERT_AT(expectedCount == commitFulfillers.size(), location); + auto fulfillers = kj::mv(commitFulfillers); + for (auto& f: fulfillers) { + f->reject(KJ_EXCEPTION(FAILED, "a_rejected_commit")); + } + } + + void expectCalls(std::initializer_list expCalls, + kj::StringPtr message = ""_kj, + kj::SourceLocation location = {}) { + KJ_ASSERT_AT(calls == heapArray(expCalls), location, kj::str(message)); + calls.clear(); + } + + // A few driver methods for convenience. + auto getAlarm(ActorCache::ReadOptions options = {}) { + return actor.getAlarm(options); + } + auto put(kj::StringPtr key, kj::StringPtr value, ActorCache::WriteOptions options = {}) { + return actor.put(kj::str(key), kj::heapArray(value.asBytes()), options); + } + auto setAlarm(kj::Maybe newTime, ActorCache::WriteOptions options = {}) { + return actor.setAlarm(newTime, options); + } +}; + +KJ_TEST("initial alarm value is unset") { + ActorSqliteTest test; + + auto time = expectSync(test.getAlarm()); + KJ_ASSERT(time == kj::none); + test.resolveCommits(0); +} + +KJ_TEST("can set and get alarm") { + ActorSqliteTest test; + + test.setAlarm(oneMs); + test.resolveCommits(1); + test.expectCalls({"setAlarm(1ms)", "commit"}); + + auto time = expectSync(test.getAlarm()); + KJ_ASSERT(time == oneMs); + test.resolveCommits(0); +} + +KJ_TEST("alarm write happens transactionally with storage ops") { + ActorSqliteTest test; + + // TODO(test): This probably isn't actually testing transactionality yet? But pretty sure it's + // still transactional under the hood: + test.setAlarm(oneMs); + test.put("foo", "bar"); + test.resolveCommits(1); + test.expectCalls({"setAlarm(1ms)", "commit"}); + + auto time = expectSync(test.getAlarm()); + KJ_ASSERT(time == oneMs); + test.resolveCommits(0); +} + +KJ_TEST("can clear alarm") { + ActorSqliteTest test; + + test.setAlarm(oneMs); + test.resolveCommits(1); + test.expectCalls({"setAlarm(1ms)", "commit"}); + + auto initTime = expectSync(test.getAlarm()); + KJ_ASSERT(initTime == oneMs); + test.resolveCommits(0); + + test.setAlarm(kj::none); + test.resolveCommits(1); + test.expectCalls({"setAlarm(none)", "commit"}); + + auto time = expectSync(test.getAlarm()); + KJ_ASSERT(time == kj::none); +} + +KJ_TEST("can set alarm twice") { + ActorSqliteTest test; + + test.setAlarm(oneMs); + test.setAlarm(twoMs); + test.resolveCommits(1); + test.expectCalls({"setAlarm(2ms)", "commit"}); + + auto time = expectSync(test.getAlarm()); + KJ_ASSERT(time == twoMs); + test.resolveCommits(0); +} + +KJ_TEST("setting duplicate alarm is no-op") { + ActorSqliteTest test; + + test.setAlarm(kj::none); + test.resolveCommits(0); + + test.setAlarm(oneMs); + test.resolveCommits(1); + test.expectCalls({"setAlarm(1ms)", "commit"}); + + test.setAlarm(oneMs); + test.resolveCommits(0); +} + +KJ_TEST("tells alarm handler to cancel when committed alarm is empty") { + ActorSqliteTest test; + + test.setAlarm(oneMs); + test.resolveCommits(1); + test.expectCalls({"setAlarm(1ms)", "commit"}); + + test.setAlarm(kj::none); + test.resolveCommits(1); + test.expectCalls({"setAlarm(none)", "commit"}); + test.ws.poll(); // needs additional poll? + + KJ_ASSERT(test.actor.armAlarmHandler(oneMs, false) == kj::none); + test.resolveCommits(0); +} + +KJ_TEST("tells alarm handler to cancel when committed alarm does not match requested alarm") { + ActorSqliteTest test; + + test.setAlarm(oneMs); + test.resolveCommits(1); + test.expectCalls({"setAlarm(1ms)", "commit"}); + test.ws.poll(); // needs additional poll? + + KJ_ASSERT(test.actor.armAlarmHandler(twoMs, false) == kj::none); + test.resolveCommits(0); +} + +KJ_TEST("dirty alarm during handler does not cancel alarm") { + ActorSqliteTest test; + + test.setAlarm(oneMs); + test.resolveCommits(1); + test.expectCalls({"setAlarm(1ms)", "commit"}); + test.setAlarm(twoMs); + { auto maybeWrite = KJ_ASSERT_NONNULL(test.actor.armAlarmHandler(oneMs, false)); } + test.resolveCommits(1); + test.expectCalls({"setAlarm(2ms)", "commit"}); +} + +KJ_TEST("getAlarm() returns null during handler") { + ActorSqliteTest test; + + test.setAlarm(oneMs); + test.resolveCommits(1); + test.expectCalls({"setAlarm(1ms)", "commit"}); + + { + auto maybeWrite = KJ_ASSERT_NONNULL(test.actor.armAlarmHandler(oneMs, false)); + test.resolveCommits(0); + + auto time = expectSync(test.getAlarm()); + KJ_ASSERT(time == kj::none); + } + test.resolveCommits(1); + test.expectCalls({"setAlarm(none)", "commit"}); +} + +KJ_TEST("alarm handler handle clears alarm when dropped with no writes") { + ActorSqliteTest test; + + test.setAlarm(oneMs); + test.resolveCommits(1); + test.expectCalls({"setAlarm(1ms)", "commit"}); + + { auto maybeWrite = KJ_ASSERT_NONNULL(test.actor.armAlarmHandler(oneMs, false)); } + test.resolveCommits(1); + test.expectCalls({"setAlarm(none)", "commit"}); + auto time = expectSync(test.getAlarm()); + KJ_ASSERT(time == kj::none); +} + +KJ_TEST("alarm handler handle does not clear alarm when dropped with writes") { + ActorSqliteTest test; + + test.setAlarm(oneMs); + test.resolveCommits(1); + test.expectCalls({"setAlarm(1ms)", "commit"}); + + { + auto maybeWrite = KJ_ASSERT_NONNULL(test.actor.armAlarmHandler(oneMs, false)); + test.setAlarm(twoMs); + } + test.resolveCommits(1); + test.expectCalls({"setAlarm(2ms)", "commit"}); + auto time = expectSync(test.getAlarm()); + KJ_ASSERT(time == twoMs); +} + +KJ_TEST("can cancel deferred alarm deletion during handler") { + ActorSqliteTest test; + + test.setAlarm(oneMs); + test.resolveCommits(1); + test.expectCalls({"setAlarm(1ms)", "commit"}); + + { + auto maybeWrite = KJ_ASSERT_NONNULL(test.actor.armAlarmHandler(oneMs, false)); + test.actor.cancelDeferredAlarmDeletion(); + } + test.resolveCommits(0); + + auto time = expectSync(test.getAlarm()); + KJ_ASSERT(time == oneMs); +} + +KJ_TEST("canceling deferred alarm deletion outside handler has no effect") { + ActorSqliteTest test; + + test.setAlarm(oneMs); + test.resolveCommits(1); + test.expectCalls({"setAlarm(1ms)", "commit"}); + + { auto maybeWrite = KJ_ASSERT_NONNULL(test.actor.armAlarmHandler(oneMs, false)); } + test.resolveCommits(1); + test.actor.cancelDeferredAlarmDeletion(); + test.expectCalls({"setAlarm(none)", "commit"}); + + auto time = expectSync(test.getAlarm()); + KJ_ASSERT(time == kj::none); +} + +KJ_TEST("canceling deferred alarm deletion outside handler edge case") { + // Presumably harmless to cancel deletion if the client requests it after the handler ends but + // before the event loop runs the commit code? Trying to cancel deletion outside the handler is + // a bit of a contract violation anyway -- maybe we should just assert against it? + ActorSqliteTest test; + + test.setAlarm(oneMs); + test.resolveCommits(1); + test.expectCalls({"setAlarm(1ms)", "commit"}); + + { auto maybeWrite = KJ_ASSERT_NONNULL(test.actor.armAlarmHandler(oneMs, false)); } + test.actor.cancelDeferredAlarmDeletion(); + test.resolveCommits(1); + test.expectCalls({"commit"}); + + auto time = expectSync(test.getAlarm()); + KJ_ASSERT(time == kj::none); +} + +KJ_TEST("canceling deferred alarm deletion is idempotent") { + ActorSqliteTest test; + + // Not sure if important, but matches ActorCache behavior. + test.setAlarm(oneMs); + test.resolveCommits(1); + test.expectCalls({"setAlarm(1ms)", "commit"}); + + { + auto maybeWrite = KJ_ASSERT_NONNULL(test.actor.armAlarmHandler(oneMs, false)); + test.actor.cancelDeferredAlarmDeletion(); + test.actor.cancelDeferredAlarmDeletion(); + } + test.resolveCommits(0); + + auto time = expectSync(test.getAlarm()); + KJ_ASSERT(time == oneMs); +} + +KJ_TEST("handler alarm is not deleted when commit fails") { + ActorSqliteTest test({.monitorOutputGate = false}); + + auto promise = test.gate.onBroken(); + + test.setAlarm(oneMs); + test.resolveCommits(1); + test.expectCalls({"setAlarm(1ms)", "commit"}); + { + auto time = expectSync(test.getAlarm()); + KJ_ASSERT(time == oneMs); + } + + { + auto handle = test.actor.armAlarmHandler(oneMs, false); + + auto time = expectSync(test.getAlarm()); + KJ_ASSERT(time == kj::none); + } + // TODO(soon): shouldn't call setAlarm to clear on rejected commit? Or OK to assume client will + // detect and call cancelDeferredAlarmDeletion() on failure? + test.rejectCommits(1); + test.expectCalls({"setAlarm(none)", "commit"}); + + KJ_EXPECT_THROW_MESSAGE("a_rejected_commit", promise.wait(test.ws)); +} + +KJ_TEST("getAlarm/setAlarm check for brokenness") { + ActorSqliteTest test({.monitorOutputGate = false}); + + auto promise = test.gate.onBroken(); + + // Break gate + test.put("foo", "bar"); + test.expectCalls({}); + test.rejectCommits(1); + test.expectCalls({"commit"}); + + KJ_EXPECT_THROW_MESSAGE("a_rejected_commit", promise.wait(test.ws)); + + // Apparently we don't actually set brokenness until the taskFailed handler runs, but presumably + // this is OK? + test.getAlarm(); + + // Ensure taskFailed handler runs and notices brokenness: + test.ws.poll(); + + KJ_EXPECT_THROW_MESSAGE("a_rejected_commit", test.getAlarm()); + KJ_EXPECT_THROW_MESSAGE("a_rejected_commit", test.setAlarm(kj::none)); + test.expectCalls({}); +} + +} // namespace +} // namespace workerd diff --git a/src/workerd/io/actor-sqlite.c++ b/src/workerd/io/actor-sqlite.c++ index e1781c3da131..55643ae6bcc5 100644 --- a/src/workerd/io/actor-sqlite.c++ +++ b/src/workerd/io/actor-sqlite.c++ @@ -18,6 +18,7 @@ ActorSqlite::ActorSqlite(kj::Own dbParam, commitCallback(kj::mv(commitCallback)), hooks(hooks), kv(*db), + metadata(*db), commitTasks(*this) { db->onWrite(KJ_BIND_METHOD(*this, onWrite)); } @@ -109,7 +110,7 @@ kj::Maybe> ActorSqlite::ExplicitTxn::commit() { if (parent == kj::none) { // We committed the root transaction, so it's time to signal any replication layer and lock // the output gate in the meantime. - actorSqlite.commitTasks.add(actorSqlite.outputGate.lockWhile(actorSqlite.commitCallback())); + actorSqlite.commitTasks.add(actorSqlite.outputGate.lockWhile(actorSqlite.commitImpl())); } // No backpressure for SQLite. @@ -157,11 +158,68 @@ void ActorSqlite::onWrite() { // rather than after the callback. { auto drop = kj::mv(txn); } - return commitCallback(); + return commitImpl(); }))); } } +kj::Promise ActorSqlite::commitImpl() { + bool needsAlarmFlush = false; + kj::Maybe newAlarmTime; + KJ_SWITCH_ONEOF(currentAlarmTime) { + KJ_CASE_ONEOF(_, UnknownAlarmTime) { + // Haven't tried to write alarm yet, so don't need to flush. + } + KJ_CASE_ONEOF(knownAlarmTime, KnownAlarmTime) { + if (knownAlarmTime.status == KnownAlarmTime::Status::DIRTY) { + knownAlarmTime.status = KnownAlarmTime::Status::FLUSHING; + needsAlarmFlush = true; + newAlarmTime = knownAlarmTime.time; + } + } + KJ_CASE_ONEOF(deferredDelete, DeferredAlarmDelete) { + if (deferredDelete.status == DeferredAlarmDelete::Status::READY) { + deferredDelete.status = DeferredAlarmDelete::Status::FLUSHING; + needsAlarmFlush = true; + newAlarmTime = kj::none; + } + } + } + + // TODO(soon): ActorCache implements a 4x retry of failed flushes... Do we need anything similar + // for alarm scheduling? + + // We assume that exceptions thrown during commit will propagate to the caller, such that they + // will ensure cancelDeferredAlarmDeletion() is called, if necessary. + + if (needsAlarmFlush) { + // TODO(soon): fix sequencing of alarm scheduling vs. commitCallback(). + co_await hooks.setAlarm(newAlarmTime); + co_await commitCallback(); + } else { + co_await commitCallback(); + } + + KJ_SWITCH_ONEOF(currentAlarmTime) { + KJ_CASE_ONEOF(_, UnknownAlarmTime) { + // Hadn't tried to write alarm yet, so don't need to update state. + } + KJ_CASE_ONEOF(knownAlarmTime, KnownAlarmTime) { + if (knownAlarmTime.status == KnownAlarmTime::Status::FLUSHING) { + knownAlarmTime.status = KnownAlarmTime::Status::CLEAN; + } + } + KJ_CASE_ONEOF(deferredDelete, DeferredAlarmDelete) { + if (deferredDelete.status == DeferredAlarmDelete::Status::FLUSHING) { + currentAlarmTime = KnownAlarmTime{ + .status = KnownAlarmTime::Status::CLEAN, + .time = kj::none, + }; + } + } + } +} + void ActorSqlite::taskFailed(kj::Exception&& exception) { // The output gate should already have been broken since it wraps all commits tasks. So, we // don't have to report anything here, the exception will already propagate elsewhere. We @@ -177,6 +235,29 @@ void ActorSqlite::requireNotBroken() { } } +void ActorSqlite::maybeDeleteDeferredAlarm() { + KJ_IF_SOME(d, currentAlarmTime.tryGet()) { + // TODO(now): Do we need to do anything special, if we've already started flushing the alarm? + d.status = DeferredAlarmDelete::Status::READY; + // TODO(now): Do we need to do anything special to ensure that the commit code for this sqlite + // state change completes? + metadata.setAlarm(kj::none); + } +} + +void ActorSqlite::ensureAlarmTimeInitialized() { + // If we ensure that we call this function before each mutation of db alarm state, it should + // ensure that it can be called anywhere else and still get a "CLEAN" (committed) alarm time + // from the database. + + KJ_IF_SOME(_, currentAlarmTime.tryGet()) { + currentAlarmTime = KnownAlarmTime{ + .status = KnownAlarmTime::Status::CLEAN, + .time = metadata.getAlarm(), + }; + } +} + // ======================================================================================= // ActorCacheInterface implementation @@ -205,8 +286,22 @@ kj::OneOf, kj::Promise>> ActorSqlite::getAlarm( ReadOptions options) { requireNotBroken(); + ensureAlarmTimeInitialized(); - return hooks.getAlarm(); + KJ_SWITCH_ONEOF(currentAlarmTime) { + KJ_CASE_ONEOF(_, UnknownAlarmTime) { + KJ_FAIL_ASSERT("expected known alarm time"); + } + KJ_CASE_ONEOF(knownAlarmTime, KnownAlarmTime) { + return knownAlarmTime.time; + } + KJ_CASE_ONEOF(_, DeferredAlarmDelete) { + // An alarm handler is currently running, and a new alarm time has not been set yet. + // We need to return that there is no alarm. + return kj::Maybe(kj::none); + } + } + KJ_UNREACHABLE; } kj::OneOf> ActorSqlite:: @@ -270,8 +365,33 @@ kj::OneOf> ActorSqlite::delete_(kj::Array keys, Wri kj::Maybe> ActorSqlite::setAlarm( kj::Maybe newAlarmTime, WriteOptions options) { requireNotBroken(); + ensureAlarmTimeInitialized(); + + // TODO(soon): Need special logic to handle case where actor is using alarms without using + // other storage? + + KJ_IF_SOME(t, currentAlarmTime.tryGet()) { + // If we're in the alarm handler and haven't set the time yet, + // we can't perform this optimization as currentAlarmTime will be equal + // to the currently running time but we indicate to the actor in getAlarm() that there + // is no alarm set, therefore we need to act like that in setAlarm(). + // + // After the first write in the handler occurs, which would set KnownAlarmTime, + // the logic here is correct again as currentAlarmTime would match what we are reporting + // to the user from getAlarm(). + // + // So, we only apply this for KnownAlarmTime. - return hooks.setAlarm(newAlarmTime); + if (t.time == newAlarmTime) { + return kj::none; + } + } + metadata.setAlarm(newAlarmTime); + currentAlarmTime = KnownAlarmTime{ + .status = ActorSqlite::KnownAlarmTime::Status::DIRTY, + .time = newAlarmTime, + }; + return kj::none; } kj::Own ActorSqlite::startTransaction() { @@ -281,6 +401,7 @@ kj::Own ActorSqlite::startTransaction() { } ActorCacheInterface::DeleteAllResults ActorSqlite::deleteAll(WriteOptions options) { + // TODO(soon): handle deleteAll and alarm state interaction. requireNotBroken(); uint count = kv.deleteAll(); @@ -331,11 +452,52 @@ void ActorSqlite::shutdown(kj::Maybe maybeException) { } kj::Maybe> ActorSqlite::armAlarmHandler(kj::Date scheduledTime, bool noCache) { - return hooks.armAlarmHandler(scheduledTime, noCache); + ensureAlarmTimeInitialized(); + + KJ_ASSERT(!currentAlarmTime.is()); + + bool alarmDeleteNeeded = true; + KJ_IF_SOME(t, currentAlarmTime.tryGet()) { + if (t.time != scheduledTime) { + if (t.status == KnownAlarmTime::Status::CLEAN) { + // If there's a clean scheduledTime that is different from ours, this run should be + // canceled. + return kj::none; + } else { + // There's a alarm write that hasn't been set yet pending for a time different than ours -- + // We won't cancel the alarm because it hasn't been confirmed, but we shouldn't delete + // the pending write. + alarmDeleteNeeded = false; + } + } + } + + // TODO(now): Here, similar to the ActorCache code, we're assuming that if the alarm time + // matches but is not in a clean state (i.e. is dirty or flushing), that it's OK to schedule the + // delete, and that it's OK to use the current alarm time to initialize the alarm state back to + // "clean" if the alarm is cancelled. Is that OK? + + if (alarmDeleteNeeded) { + currentAlarmTime = DeferredAlarmDelete{ + .status = DeferredAlarmDelete::Status::WAITING, + .timeToDelete = scheduledTime, + }; + } + + static const DeferredAlarmDeleter disposer; + return kj::Own(this, disposer); } void ActorSqlite::cancelDeferredAlarmDeletion() { - hooks.cancelDeferredAlarmDeletion(); + KJ_IF_SOME(_, currentAlarmTime.tryGet()) { + // TODO(now): Should we do anything special if client tries to cancel alarm deletion after + // flush has already been issued? + // TODO(now): Is it correct to reset alarm to CLEAN state here? + currentAlarmTime = KnownAlarmTime{ + .status = KnownAlarmTime::Status::CLEAN, + .time = metadata.getAlarm(), + }; + } } kj::Maybe> ActorSqlite::onNoPendingFlush() { diff --git a/src/workerd/io/actor-sqlite.h b/src/workerd/io/actor-sqlite.h index 56bf363163f8..a375f1236f3e 100644 --- a/src/workerd/io/actor-sqlite.h +++ b/src/workerd/io/actor-sqlite.h @@ -6,6 +6,7 @@ #include "actor-cache.h" #include +#include namespace workerd { @@ -22,8 +23,11 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH // for alarm operations. class Hooks { public: + // TODO(cleanup): rename to scheduleAlarm()/getScheduledAlarm()?: virtual kj::Promise> getAlarm(); virtual kj::Promise setAlarm(kj::Maybe newAlarmTime); + + // TODO(cleanup): no longer used, remove: virtual kj::Maybe> armAlarmHandler(kj::Date scheduledTime, bool noCache); virtual void cancelDeferredAlarmDeletion(); @@ -86,6 +90,7 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH kj::Function()> commitCallback; Hooks& hooks; SqliteKv kv; + SqliteMetadata metadata; SqliteDatabase::Statement beginTxn = db->prepare("BEGIN TRANSACTION"); SqliteDatabase::Statement commitTxn = db->prepare("COMMIT TRANSACTION"); @@ -156,13 +161,57 @@ class ActorSqlite final: public ActorCacheInterface, private kj::TaskSet::ErrorH // transactions should be used in the meantime. kj::OneOf currentTxn = NoTxn(); + // Backs the `kj::Own` returned by `armAlarmHandler()`. + class DeferredAlarmDeleter: public kj::Disposer { + public: + // The `Own` returned by `armAlarmHandler()` is actually set up to point to the + // `ActorSqlite` itself, but with an alternate disposer that deletes the alarm rather than + // the whole object. + void disposeImpl(void* pointer) const { + reinterpret_cast(pointer)->maybeDeleteDeferredAlarm(); + } + }; + + struct UnknownAlarmTime {}; + struct KnownAlarmTime { + enum class Status { + CLEAN, // Alarm time has been committed + DIRTY, // Alarm time has been changed and not yet committed + FLUSHING // Alarm time is being committed in an ongoing transaction + } status; + kj::Maybe time; + }; + struct DeferredAlarmDelete { + enum class Status { + WAITING, // Alarm handler is running, and alarm value has not changed + READY, // Alarm handler completed, deletion is pending, but has not yet been committed + FLUSHING // Alarm deletion is being committed in an ongoing transaction + } status; + kj::Date timeToDelete; + // TODO(correctness): ActorCache tracks a "wasDeleted" flag; needed here too? + }; + kj::OneOf currentAlarmTime = + UnknownAlarmTime{}; + kj::TaskSet commitTasks; void onWrite(); + kj::Promise commitImpl(); void taskFailed(kj::Exception&& exception) override; void requireNotBroken(); + + // If alarm is in an uninitialized state, sets alarm time from db state. + // + // TODO(cleanup): Alternately, we could eliminate the UnknownAlarmTime state and just init the + // alarm to a known time at startup, but that requires a db query in the constructor. Are the + // perf savings worth the extra complexity? + void ensureAlarmTimeInitialized(); + + // Called when alarm handler token object is destroyed, to delete alarm if not reset or + // cancelled during handler. + void maybeDeleteDeferredAlarm(); }; } // namespace workerd