Skip to content

Commit

Permalink
Add destroy handler and custom event for Durable Objects
Browse files Browse the repository at this point in the history
  • Loading branch information
jqmmes committed Jan 15, 2024
1 parent b74607c commit d29af06
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 13 deletions.
63 changes: 63 additions & 0 deletions src/workerd/api/actor-destroy.c++
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2017-2022 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

#include "actor-destroy.h"
#include <workerd/api/global-scope.h>
#include <workerd/jsg/ser.h>

namespace workerd::api {

ActorDestroyEvent::ActorDestroyEvent() : ExtendableEvent("actorDestroy"){};

kj::Promise<WorkerInterface::CustomEvent::Result>
ActorDestroyCustomEventImpl::run(kj::Own<IoContext::IncomingRequest> incomingRequest,
kj::Maybe<kj::StringPtr> entrypointName) {

auto& context = incomingRequest->getContext();

// Mark the request as delivered because we're about to run some JS.
incomingRequest->delivered();

EventOutcome outcome = EventOutcome::OK;

try {
auto result = co_await context.run(
[entrypointName = entrypointName, &context](Worker::Lock& lock) mutable {
auto results = lock.getGlobalScope().actorDestroy(
lock, lock.getExportedHandler(entrypointName, context.getActor()));
return results;
});
// Now that we've completed the handler execution we can do all the necessary cleanup and cancel
// prevent anything else to be sent to the destroyed actor. As such, we abort the current
// IoContext and shutdown the actor.
context.getActorOrThrow().shutdown(
2, KJ_EXCEPTION(DISCONNECTED, "broken.dropped; Actor was destroyed."));
context.abort(KJ_EXCEPTION(DISCONNECTED, "broken.dropped; Actor was destroyed."));
co_return result;
} catch (kj::Exception e) {
if (auto desc = e.getDescription();
!jsg::isTunneledException(desc) && !jsg::isDoNotLogException(desc)) {
LOG_EXCEPTION("ActorDestroyCustomEventImpl"_kj, e);
}
outcome = EventOutcome::EXCEPTION;
}

co_return Result{
.outcome = outcome,
};
}

kj::Promise<WorkerInterface::CustomEvent::Result> ActorDestroyCustomEventImpl::sendRpc(
capnp::HttpOverCapnpFactory& httpOverCapnpFactory, capnp::ByteStreamFactory& byteStreamFactory,
kj::TaskSet& waitUntilTasks, rpc::EventDispatcher::Client dispatcher) {
auto req = dispatcher.castAs<rpc::EventDispatcher>().actorDestroyRequest();

return req.send().then([](auto resp) {
return WorkerInterface::CustomEvent::Result{
.outcome = resp.getOutcome(),
};
});
}

} // namespace workerd::api
52 changes: 52 additions & 0 deletions src/workerd/api/actor-destroy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) 2017-2022 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

#pragma once

#include <kj/debug.h>

#include <workerd/io/worker-interface.capnp.h>
#include <workerd/io/worker-interface.h>
#include <workerd/api/basics.h>

namespace workerd::api {

class ActorDestroyEvent final: public ExtendableEvent {
public:
explicit ActorDestroyEvent();

static jsg::Ref<ActorDestroyEvent> constructor(kj::String type) = delete;

JSG_RESOURCE_TYPE(ActorDestroyEvent) {
JSG_INHERIT(ExtendableEvent);
}
};

class ActorDestroyCustomEventImpl final: public WorkerInterface::CustomEvent,
public kj::Refcounted {
public:
ActorDestroyCustomEventImpl(uint16_t typeId): typeId(typeId) {}

kj::Promise<Result> run(
kj::Own<IoContext_IncomingRequest> incomingRequest,
kj::Maybe<kj::StringPtr> entrypointName) override;

kj::Promise<Result> sendRpc(
capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
capnp::ByteStreamFactory& byteStreamFactory,
kj::TaskSet& waitUntilTasks,
rpc::EventDispatcher::Client dispatcher) override;

uint16_t getType() override {
return typeId;
}

private:
uint16_t typeId;
};

#define EW_ACTOR_DESTROY_ISOLATE_TYPES \
api::ActorDestroyEvent, \
api::ActorDestroyExportedHandler
} // namespace workerd::api
21 changes: 12 additions & 9 deletions src/workerd/api/actor.c++
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

#include "actor.h"
#include "util.h"
#include "workerd/jsg/promise.h"
#include <workerd/io/features.h>
#include <workerd/api/actor-destroy.h>
#include <kj/encoding.h>
#include <kj/compat/http.h>
#include <capnp/compat/byte-stream.h>
Expand Down Expand Up @@ -172,24 +174,25 @@ jsg::Ref<DurableObjectNamespace> DurableObjectNamespace::jurisdiction(kj::String
idFactory->cloneWithJurisdiction(jurisdiction));
}

kj::Promise<void> DurableObjectNamespace::destroy(jsg::Lock& js, jsg::Ref<DurableObjectId> id) {\
return destroyImpl(js, kj::mv(id), ActorGetMode::GET_EXISTING);
}

kj::Promise<void> DurableObjectNamespace::destroyImpl(jsg::Lock& js, jsg::Ref<DurableObjectId> id, ActorGetMode mode) {
jsg::Promise<void> DurableObjectNamespace::destroy(jsg::Lock& js, jsg::Ref<DurableObjectId> id) {
JSG_REQUIRE(idFactory->matchesJurisdiction(id->getInner()), Error,
"destroy called on jurisdictional subnamespace with an ID from a different jurisdiction");

auto& context = IoContext::current();
auto actorChannel = context.getGlobalActorChannel(channel, id->getInner(), nullptr,
mode);

auto workerInterface = actorChannel->startRequest({});
// TODO(now): set correct span information.
auto actorChannel = context.getGlobalActorChannel(channel, id->getInner(), kj::none,
ActorGetMode::GET_EXISTING, SpanParent(kj::none));

// We now have a worker interface to work with.
auto workerInterface = actorChannel->startRequest({});

auto pair = js.newPromiseAndResolver<void>();
auto resolver = kj::mv(pair.resolver);

// The internal event code for actorDestroy events is 10.
auto actorDestroyEventCode = 10;
co_await workerInterface->customEvent(kj::heap<api::ActorDestroyCustomEventImpl>(actorDestroyEventCode));
return context.awaitIo(workerInterface->customEvent(kj::heap<api::ActorDestroyCustomEventImpl>(actorDestroyEventCode)).ignoreResult());
}

} // namespace workerd::api
4 changes: 2 additions & 2 deletions src/workerd/api/actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,11 @@ class DurableObjectNamespace: public jsg::Object {
// Creates a subnamespace with the jurisdiction hardcoded.
jsg::Ref<DurableObjectNamespace> jurisdiction(kj::String jurisdiction);

kj::Promise<void> destroy(jsg::Ref<DurableObjectId> id);
// kj::Promise<void> destroy(jsg::Ref<DurableObjectId> id);

// Destroys the durable object identified with id in this namespace including all data associated
// to it. This variation will get a durable object by ID if it already exists.
kj::Promise<void> destroy(jsg::Lock& js, jsg::Ref<DurableObjectId> id);
jsg::Promise<void> destroy(jsg::Lock& js, jsg::Ref<DurableObjectId> id);

JSG_RESOURCE_TYPE(DurableObjectNamespace, CompatibilityFlags::Reader flags) {
JSG_METHOD(newUniqueId);
Expand Down
76 changes: 76 additions & 0 deletions src/workerd/api/global-scope.c++
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <workerd/jsg/async-context.h>
#include <workerd/jsg/ser.h>
#include <workerd/jsg/util.h>
#include <workerd/jsg/promise.h>
#include <workerd/io/io-context.h>
#include <workerd/io/features.h>
#include <workerd/util/sentry.h>
Expand All @@ -23,6 +24,7 @@
#include <workerd/util/stream-utils.h>
#include <workerd/util/use-perfetto-categories.h>
#include <workerd/util/uncaught-exception-source.h>
#include <workerd/api/actor-destroy.h>

namespace workerd::api {

Expand Down Expand Up @@ -552,6 +554,80 @@ void ServiceWorkerGlobalScope::sendHibernatableWebSocketError(
}
}

kj::Promise<WorkerInterface::CustomEvent::Result> ServiceWorkerGlobalScope::actorDestroy(Worker::Lock& lock,
kj::Maybe<ExportedHandler&> exportedHandler) {

auto& context = IoContext::current();
// TODO(now): Prevent any other io events during destroy handler execution.

auto& handler = KJ_REQUIRE_NONNULL(exportedHandler);
if (handler.destroy == kj::none) {
lock.logWarningOnce(
"Calling actor destroy without any destroy() hander");
return WorkerInterface::CustomEvent::Result {
.outcome = EventOutcome::SCRIPT_NOT_FOUND
};
}


auto& destroy = KJ_ASSERT_NONNULL(handler.destroy);
return context.run([exportedHandler, &destroy,
maybeAsyncContext = jsg::AsyncContextFrame::currentRef(lock)]
(Worker::Lock& lock) mutable -> kj::Promise<WorkerInterface::CustomEvent::Result> {
jsg::AsyncContextFrame::Scope asyncScope(lock, maybeAsyncContext);

return destroy(lock).then([]() -> kj::Promise<WorkerInterface::CustomEvent::Result> {
return WorkerInterface::CustomEvent::Result {
.outcome = EventOutcome::OK
};
});
});

// return actorDestroyResultPromise;
// actorDestroyResultPromise;
// } else {
// KJ_DBG("CANCELED");
// return WorkerInterface::CustomEvent::Result {
// .outcome = EventOutcome::CANCELED
// };
// }




// // IoContext::current().blockConcurrencyWhile(lock, [exportedHandler](jsg::Lock& js)
// // mutable -> jsg::Promise<bool> {
// auto event = jsg::alloc<ActorDestroyEvent>();
// KJ_IF_SOME(h, exportedHandler) {
// KJ_IF_SOME(handler, h.destroy) {

// // return IoContext::current()
// // .run([exportedHandler, handler,
// // maybeAsyncContext = jsg::AsyncContextFrame::currentRef(lock)]
// // (Worker::Lock& lock) mutable -> kj::Promise<WorkerInterface::AlarmResult> {
// // jsg::AsyncContextFrame::Scope asyncScope(lock, maybeAsyncContext);
// // return handler(lock).then([]() -> kj::Promise<WorkerInterface::AlarmResult> {
// // return WorkerInterface::AlarmResult {
// // .retry = false,
// // .outcome = EventOutcome::OK
// // };
// // });
// // });

// // jsg::Lock& js(lock);

// // IoContext::current().run()

// IoContext::current().blockConcurrencyWhile(js, [](jsg::Lock& js){});
// // auto promise = (*handler)(js);
// auto promise = handler(lock);
// event->waitUntil(kj::mv(promise));
// }
// }
// return js.resolvedPromise(true);
// });
}

void ServiceWorkerGlobalScope::emitPromiseRejection(
jsg::Lock& js,
v8::PromiseRejectEvent event,
Expand Down
9 changes: 8 additions & 1 deletion src/workerd/api/global-scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,13 @@ struct ExportedHandler {
typedef kj::Promise<void> HibernatableWebSocketErrorHandler(jsg::Ref<WebSocket>, jsg::Value);
jsg::LenientOptional<jsg::Function<HibernatableWebSocketErrorHandler>> webSocketError;

typedef kj::Promise<void> ActorDestroyHandler();
jsg::LenientOptional<jsg::Function<ActorDestroyHandler>> destroy;

// Self-ref potentially allows extracting other custom handlers from the object.
jsg::SelfRef self;

JSG_STRUCT(fetch, tail, trace, scheduled, alarm, test, webSocketMessage, webSocketClose, webSocketError, self);
JSG_STRUCT(fetch, tail, trace, scheduled, alarm, test, webSocketMessage, webSocketClose, webSocketError, destroy, self);

JSG_STRUCT_TS_ROOT();
// ExportedHandler isn't included in the global scope, but we still want to
Expand All @@ -241,6 +244,7 @@ struct ExportedHandler {
webSocketClose: never;
webSocketError: never;
queue?: ExportedHandlerQueueHandler<Env, QueueHandlerMessage>;
destroy: never;
test?: ExportedHandlerTestHandler<Env>;
});
// Make `env` parameter generic
Expand Down Expand Up @@ -328,6 +332,9 @@ class ServiceWorkerGlobalScope: public WorkerGlobalScope {
Worker::Lock& lock,
kj::Maybe<ExportedHandler&> exportedHandler);

kj::Promise<WorkerInterface::CustomEvent::Result> actorDestroy(Worker::Lock& lock,
kj::Maybe<ExportedHandler&> exportedHandler);

void emitPromiseRejection(
jsg::Lock& js,
v8::PromiseRejectEvent event,
Expand Down
3 changes: 2 additions & 1 deletion src/workerd/api/worker-rpc.c++
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ private:
name == "alarm" ||
name == "webSocketMessage" ||
name == "webSocketClose" ||
name == "webSocketError") {
name == "webSocketError" ||
name == "destroy") {
return true;
}
return false;
Expand Down
3 changes: 3 additions & 0 deletions src/workerd/io/worker-interface.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ interface EventDispatcher @0xf20697475ec1752d {
# We use customEvent() to dispatch this event.
# In the future, we can add an argument to pass a capability to the server.

actorDestroy @10 () -> (outcome :EventOutcome);
# Runs the actor destroy event.

obsolete5 @5();
obsolete6 @6();
obsolete7 @7();
Expand Down
1 change: 1 addition & 0 deletions src/workerd/server/workerd-api.c++
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <workerd/jsg/util.h>
#include <workerd/jsg/setup.h>
#include <workerd/api/actor.h>
#include <workerd/api/actor-destroy.h>
#include <workerd/api/actor-state.h>
#include <workerd/api/analytics-engine.h>
#include <workerd/api/cache.h>
Expand Down

0 comments on commit d29af06

Please sign in to comment.