Skip to content

Commit 1c2c995

Browse files
add extra logging for makeTimeoutPromise
This log adds context on what the output gate was locked on.
1 parent 03e24eb commit 1c2c995

File tree

5 files changed

+22
-20
lines changed

5 files changed

+22
-20
lines changed

src/workerd/io/actor-cache.c++

+10-10
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ void ActorCache::requireNotTerminal() {
215215
// we don't let the worker return stale state. This isn't strictly necessary but it does
216216
// mirror previous behavior wherein we would use disabled storage via `flushImpl()` and break
217217
// the output gate.
218-
ensureFlushScheduled({});
218+
ensureFlushScheduled({}, "requireNotTerminal intentionally breaking output gate"_kjc);
219219
}
220220

221221
kj::throwFatalException(kj::cp(e));
@@ -249,7 +249,7 @@ void ActorCache::evictOrOomIfNeeded(Lock& lock) {
249249
// that returns a promise which we'd then have to put somewhere so that we don't immediately
250250
// cancel it. Instead, we can ensure that a flush has been scheduled. `flushImpl()`, when
251251
// called, will throw an exception which breaks the gate.
252-
ensureFlushScheduled(WriteOptions());
252+
ensureFlushScheduled(WriteOptions(), "evictOrOomIfNeeded intentionally breaking output gate"_kjc);
253253
}
254254

255255
kj::throwFatalException(kj::mv(exception));
@@ -1853,7 +1853,7 @@ kj::Maybe<kj::Promise<void>> ActorCache::setAlarm(kj::Maybe<kj::Date> newAlarmTi
18531853
options.noCache
18541854
};
18551855

1856-
ensureFlushScheduled(options);
1856+
ensureFlushScheduled(options, "setAlarm flush"_kjc);
18571857

18581858
return getBackpressure();
18591859
}
@@ -1990,7 +1990,7 @@ ActorCache::DeleteAllResults ActorCache::deleteAll(WriteOptions options) {
19901990
.deletedDirty = kj::mv(deletedDirty),
19911991
.countFulfiller = kj::mv(paf.fulfiller)
19921992
};
1993-
ensureFlushScheduled(options);
1993+
ensureFlushScheduled(options, "deleteAll flush"_kjc);
19941994
} else {
19951995
// A previous deleteAll() was scheduled and hasn't been committed yet. This means that we
19961996
// can actually coalesce the two, and there's no need to commit any writes that happened
@@ -2108,10 +2108,10 @@ void ActorCache::putImpl(Lock& lock, kj::Own<Entry> newEntry,
21082108
addToDirtyList(*slot);
21092109
}
21102110

2111-
ensureFlushScheduled(options);
2111+
ensureFlushScheduled(options, "putImpl flush"_kjc);
21122112
}
21132113

2114-
void ActorCache::ensureFlushScheduled(const WriteOptions& options) {
2114+
void ActorCache::ensureFlushScheduled(const WriteOptions& options, kj::LiteralStringConst operationInfo) {
21152115
if (lru.options.neverFlush) {
21162116
// Skip all flushes. Used for preview sessions where data is strictly kept in memory.
21172117

@@ -2156,19 +2156,19 @@ void ActorCache::ensureFlushScheduled(const WriteOptions& options) {
21562156
if (options.allowUnconfirmed) {
21572157
// Don't apply output gate. But, if an exception is thrown, we still want to break the gate,
21582158
// so arrange for that.
2159-
flushPromise = flushPromise.catch_([this](kj::Exception&& e) {
2160-
return gate.lockWhile(kj::Promise<void>(kj::mv(e)));
2159+
flushPromise = flushPromise.catch_([this, opInfo = kj::mv(operationInfo)](kj::Exception&& e) {
2160+
return gate.lockWhile(kj::Promise<void>(kj::mv(e)), kj::mv(opInfo));
21612161
});
21622162
} else {
2163-
flushPromise = gate.lockWhile(kj::mv(flushPromise));
2163+
flushPromise = gate.lockWhile(kj::mv(flushPromise), kj::mv(operationInfo));
21642164
flushScheduledWithOutputGate = true;
21652165
}
21662166

21672167
lastFlush = flushPromise.fork();
21682168
} else if (!flushScheduledWithOutputGate && !options.allowUnconfirmed) {
21692169
// The flush has already been scheduled without the output gate, but we want to upgrade it to
21702170
// use the output gate now.
2171-
lastFlush = gate.lockWhile(lastFlush.addBranch()).fork();
2171+
lastFlush = gate.lockWhile(lastFlush.addBranch(), kj::mv(operationInfo)).fork();
21722172
flushScheduledWithOutputGate = true;
21732173
}
21742174
}

src/workerd/io/actor-cache.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ class ActorCache final: public ActorCacheInterface {
310310
auto p = reinterpret_cast<ActorCache*>(pointer);
311311
KJ_IF_SOME(d, p->currentAlarmTime.tryGet<DeferredAlarmDelete>()) {
312312
d.status = DeferredAlarmDelete::Status::READY;
313-
p->ensureFlushScheduled(WriteOptions { .noCache = d.noCache });
313+
p->ensureFlushScheduled(WriteOptions { .noCache = d.noCache }, "alarm disposer"_kjc);
314314
}
315315
}
316316
};
@@ -702,7 +702,7 @@ class ActorCache final: public ActorCacheInterface {
702702
kj::Promise<kj::Maybe<Value>> getImpl(kj::Own<Entry> entry, ReadOptions options);
703703

704704
// Ensure that we will flush dirty entries soon.
705-
void ensureFlushScheduled(const WriteOptions& options);
705+
void ensureFlushScheduled(const WriteOptions& options, kj::LiteralStringConst operationInfo);
706706

707707
// Schedule a read RPC. The given function will be invoked and provided with an
708708
// ActorStorage::Operations::Client on which the read operation should be performed. The function

src/workerd/io/actor-sqlite.c++

+2-2
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ kj::Maybe<kj::Promise<void>> ActorSqlite::ExplicitTxn::commit() {
107107
// We committed the root transaction, so it's time to signal any replication layer and lock
108108
// the output gate in the meantime.
109109
actorSqlite.commitTasks.add(
110-
actorSqlite.outputGate.lockWhile(actorSqlite.commitCallback()));
110+
actorSqlite.outputGate.lockWhile(actorSqlite.commitCallback(), "explicit txn commit"_kjc));
111111
}
112112

113113
// No backpressure for SQLite.
@@ -158,7 +158,7 @@ void ActorSqlite::onWrite() {
158158
{ auto drop = kj::mv(txn); }
159159

160160
return commitCallback();
161-
})));
161+
}), "committing write"_kjc));
162162
}
163163
}
164164

src/workerd/io/io-gate.h

+6-5
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,8 @@ class OutputGate {
225225
// Optionally make a promise which should be exclusiveJoin()ed with the lock promise to
226226
// implement a timeout. The returned promise should be something that throws an exception
227227
// after some timeout has expired.
228-
virtual kj::Promise<void> makeTimeoutPromise() { return kj::NEVER_DONE; }
228+
// The passed operationInfo will be logged separately to provide extra information.
229+
virtual kj::Promise<void> makeTimeoutPromise(kj::LiteralStringConst operationInfo) { return kj::NEVER_DONE; }
229230

230231
// Optionally track metrics. In practice these are implemented by MetricsCollector::Actor, but
231232
// we don't want to depend on that class from here.
@@ -245,7 +246,7 @@ class OutputGate {
245246
// If `promise` rejects, the exception will propagate to all future `wait()`s. If the returned
246247
// promise is canceled before completion, all future `wait()`s will also throw.
247248
template <typename T>
248-
kj::Promise<T> lockWhile(kj::Promise<T> promise);
249+
kj::Promise<T> lockWhile(kj::Promise<T> promise, kj::LiteralStringConst operationInfo);
249250

250251
// Wait until all preceding locks are released. The wait will not be affected by any future
251252
// call to `lockWhile()`.
@@ -277,13 +278,13 @@ class OutputGate {
277278
// inline implementation details
278279

279280
template <typename T>
280-
kj::Promise<T> OutputGate::lockWhile(kj::Promise<T> promise) {
281+
kj::Promise<T> OutputGate::lockWhile(kj::Promise<T> promise, kj::LiteralStringConst operationInfo) {
281282
auto fulfiller = lock();
282283

283284
if constexpr (std::is_void_v<T>) {
284-
promise = promise.exclusiveJoin(hooks.makeTimeoutPromise());
285+
promise = promise.exclusiveJoin(hooks.makeTimeoutPromise(operationInfo));
285286
} else {
286-
promise = promise.exclusiveJoin(hooks.makeTimeoutPromise()
287+
promise = promise.exclusiveJoin(hooks.makeTimeoutPromise(operationInfo)
287288
.then([]() -> T { KJ_UNREACHABLE; }));
288289
}
289290

src/workerd/io/worker.c++

+2-1
Original file line numberDiff line numberDiff line change
@@ -2951,12 +2951,13 @@ struct Worker::Actor::Impl {
29512951
void inputGateWaiterRemoved() override { metrics.inputGateWaiterRemoved(); }
29522952
// Implements InputGate::Hooks.
29532953

2954-
kj::Promise<void> makeTimeoutPromise() override {
2954+
kj::Promise<void> makeTimeoutPromise(kj::LiteralStringConst operationInfo) override {
29552955
// This really only protects against total hangs. Lowering the timeout drastically is risky,
29562956
// since low timeouts can spuriously fire when under heavy CPU load, failing requests that
29572957
// would otherwise succeed.
29582958
auto timeout = 30 * kj::SECONDS;
29592959
co_await timerChannel.afterLimitTimeout(timeout);
2960+
KJ_LOG(ERROR, "Durable Object storage operation exceeded timeout", operationInfo);
29602961
kj::throwFatalException(KJ_EXCEPTION(FAILED,
29612962
"broken.outputGateBroken; jsg.Error: Durable Object storage operation exceeded "
29622963
"timeout which caused object to be reset."));

0 commit comments

Comments
 (0)