Skip to content

Commit

Permalink
Merge pull request #3193 from cloudflare/kenton/connection-meta
Browse files Browse the repository at this point in the history
Implement connection props.
  • Loading branch information
kentonv authored Dec 11, 2024
2 parents a6423f2 + c2cce70 commit 7518bb8
Show file tree
Hide file tree
Showing 45 changed files with 667 additions and 112 deletions.
19 changes: 19 additions & 0 deletions src/workerd/api/global-scope.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,16 +204,24 @@ class TestController: public jsg::Object {

class ExecutionContext: public jsg::Object {
public:
ExecutionContext(jsg::Lock& js): props(js, js.obj()) {}
ExecutionContext(jsg::Lock& js, jsg::JsValue props): props(js, props) {}

void waitUntil(kj::Promise<void> promise);
void passThroughOnException();

// Cancels the current execution context with the given exception, causing all execution to stop
// and throwing an error at the client.
void abort(jsg::Lock& js, jsg::Optional<jsg::Value> reason);

jsg::JsValue getProps(jsg::Lock& js) {
return props.getHandle(js);
}

JSG_RESOURCE_TYPE(ExecutionContext, CompatibilityFlags::Reader flags) {
JSG_METHOD(waitUntil);
JSG_METHOD(passThroughOnException);
JSG_LAZY_INSTANCE_PROPERTY(props, getProps);

if (flags.getWorkerdExperimental()) {
// TODO(soon): Before making this generally available we need to:
Expand All @@ -229,6 +237,17 @@ class ExecutionContext: public jsg::Object {
JSG_METHOD(abort);
}
}

void visitForMemoryInfo(jsg::MemoryTracker& tracker) const {
tracker.trackField("props", props);
}

private:
jsg::JsRef<jsg::JsValue> props;

void visitForGc(jsg::GcVisitor& visitor) {
visitor.visit(props);
}
};

// AlarmEventInfo is a jsg::Object used to pass alarm invocation info to an alarm handler.
Expand Down
13 changes: 7 additions & 6 deletions src/workerd/api/hibernatable-web-socket.c++
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ jsg::Ref<WebSocket> HibernatableWebSocketEvent::claimWebSocket(
kj::Promise<WorkerInterface::CustomEvent::Result> HibernatableWebSocketCustomEventImpl::run(
kj::Own<IoContext_IncomingRequest> incomingRequest,
kj::Maybe<kj::StringPtr> entrypointName,
Frankenvalue props,
kj::TaskSet& waitUntilTasks) {
// Mark the request as delivered because we're about to run some JS.
auto& context = incomingRequest->getContext();
Expand Down Expand Up @@ -100,28 +101,28 @@ kj::Promise<WorkerInterface::CustomEvent::Result> HibernatableWebSocketCustomEve

try {
co_await context.run(
[entrypointName = entrypointName, &context, eventParameters = kj::mv(eventParameters)](
Worker::Lock& lock) mutable {
[entrypointName = entrypointName, &context, eventParameters = kj::mv(eventParameters),
props = kj::mv(props)](Worker::Lock& lock) mutable {
KJ_SWITCH_ONEOF(eventParameters.eventType) {
KJ_CASE_ONEOF(text, HibernatableSocketParams::Text) {
return lock.getGlobalScope().sendHibernatableWebSocketMessage(kj::mv(text.message),
eventParameters.eventTimeoutMs, kj::mv(eventParameters.websocketId), lock,
lock.getExportedHandler(entrypointName, context.getActor()));
lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor()));
}
KJ_CASE_ONEOF(data, HibernatableSocketParams::Data) {
return lock.getGlobalScope().sendHibernatableWebSocketMessage(kj::mv(data.message),
eventParameters.eventTimeoutMs, kj::mv(eventParameters.websocketId), lock,
lock.getExportedHandler(entrypointName, context.getActor()));
lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor()));
}
KJ_CASE_ONEOF(close, HibernatableSocketParams::Close) {
return lock.getGlobalScope().sendHibernatableWebSocketClose(kj::mv(close),
eventParameters.eventTimeoutMs, kj::mv(eventParameters.websocketId), lock,
lock.getExportedHandler(entrypointName, context.getActor()));
lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor()));
}
KJ_CASE_ONEOF(e, HibernatableSocketParams::Error) {
return lock.getGlobalScope().sendHibernatableWebSocketError(kj::mv(e.error),
eventParameters.eventTimeoutMs, kj::mv(eventParameters.websocketId), lock,
lock.getExportedHandler(entrypointName, context.getActor()));
lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor()));
}
KJ_UNREACHABLE;
}
Expand Down
1 change: 1 addition & 0 deletions src/workerd/api/hibernatable-web-socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class HibernatableWebSocketCustomEventImpl final: public WorkerInterface::Custom

kj::Promise<Result> run(kj::Own<IoContext_IncomingRequest> incomingRequest,
kj::Maybe<kj::StringPtr> entrypointName,
Frankenvalue props,
kj::TaskSet& waitUntilTasks) override;

kj::Promise<Result> sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
Expand Down
10 changes: 6 additions & 4 deletions src/workerd/api/queue.c++
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ jsg::Ref<QueueEvent> startQueueEvent(EventTarget& globalEventTarget,
kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
kj::Own<IoContext_IncomingRequest> incomingRequest,
kj::Maybe<kj::StringPtr> entrypointName,
Frankenvalue props,
kj::TaskSet& waitUntilTasks) {
incomingRequest->delivered();
auto& context = incomingRequest->getContext();
Expand Down Expand Up @@ -546,13 +547,14 @@ kj::Promise<WorkerInterface::CustomEvent::Result> QueueCustomEventImpl::run(
// can't just wait on their addEventListener handler to resolve because it can't be async).
context.addWaitUntil(context.run(
[this, entrypointName = entrypointName, &context, queueEvent = kj::addRef(*queueEventHolder),
&metrics = incomingRequest->getMetrics()](Worker::Lock& lock) mutable {
&metrics = incomingRequest->getMetrics(),
props = kj::mv(props)](Worker::Lock& lock) mutable {
jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock);

auto& typeHandler = lock.getWorker().getIsolate().getApi().getQueueTypeHandler(lock);
queueEvent->event =
startQueueEvent(lock.getGlobalScope(), kj::mv(params), context.addObject(result), lock,
lock.getExportedHandler(entrypointName, context.getActor()), typeHandler);
queueEvent->event = startQueueEvent(lock.getGlobalScope(), kj::mv(params),
context.addObject(result), lock,
lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor()), typeHandler);
}));

// TODO(soon): There's a good chance we'll want a different wall-clock timeout for queue handlers
Expand Down
1 change: 1 addition & 0 deletions src/workerd/api/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ class QueueCustomEventImpl final: public WorkerInterface::CustomEvent, public kj

kj::Promise<Result> run(kj::Own<IoContext_IncomingRequest> incomingRequest,
kj::Maybe<kj::StringPtr> entrypointName,
Frankenvalue props,
kj::TaskSet& waitUntilTasks) override;

kj::Promise<Result> sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
Expand Down
13 changes: 8 additions & 5 deletions src/workerd/api/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@ jsg::Ref<TraceMetrics> UnsafeTraceMetrics::fromTrace(jsg::Ref<TraceItem> item) {
namespace {
kj::Promise<void> sendTracesToExportedHandler(kj::Own<IoContext::IncomingRequest> incomingRequest,
kj::Maybe<kj::StringPtr> entrypointNamePtr,
Frankenvalue props,
kj::ArrayPtr<kj::Own<Trace>> traces) {
// Mark the request as delivered because we're about to run some JS.
incomingRequest->delivered();
Expand All @@ -624,11 +625,12 @@ kj::Promise<void> sendTracesToExportedHandler(kj::Own<IoContext::IncomingRequest
// and its members until this task completes.
auto entrypointName = entrypointNamePtr.map([](auto s) { return kj::str(s); });
try {
co_await context.run([&context, nonEmptyTraces = kj::mv(nonEmptyTraces),
entrypointName = kj::mv(entrypointName)](Worker::Lock& lock) mutable {
co_await context.run(
[&context, nonEmptyTraces = kj::mv(nonEmptyTraces), entrypointName = kj::mv(entrypointName),
props = kj::mv(props)](Worker::Lock& lock) mutable {
jsg::AsyncContextFrame::StorageScope traceScope = context.makeAsyncTraceScope(lock);

auto handler = lock.getExportedHandler(entrypointName, context.getActor());
auto handler = lock.getExportedHandler(entrypointName, kj::mv(props), context.getActor());
return lock.getGlobalScope().sendTraces(nonEmptyTraces.asPtr(), lock, handler);
});
} catch (kj::Exception e) {
Expand All @@ -652,10 +654,11 @@ kj::Promise<void> sendTracesToExportedHandler(kj::Own<IoContext::IncomingRequest

auto TraceCustomEventImpl::run(kj::Own<IoContext::IncomingRequest> incomingRequest,
kj::Maybe<kj::StringPtr> entrypointNamePtr,
Frankenvalue props,
kj::TaskSet& waitUntilTasks) -> kj::Promise<Result> {
// Don't bother to wait around for the handler to run, just hand it off to the waitUntil tasks.
waitUntilTasks.add(
sendTracesToExportedHandler(kj::mv(incomingRequest), entrypointNamePtr, traces));
waitUntilTasks.add(sendTracesToExportedHandler(
kj::mv(incomingRequest), entrypointNamePtr, kj::mv(props), traces));

return Result{
.outcome = EventOutcome::OK,
Expand Down
1 change: 1 addition & 0 deletions src/workerd/api/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ class TraceCustomEventImpl final: public WorkerInterface::CustomEvent {

kj::Promise<Result> run(kj::Own<IoContext::IncomingRequest> incomingRequest,
kj::Maybe<kj::StringPtr> entrypointName,
Frankenvalue props,
kj::TaskSet& waitUntilTasks) override;

kj::Promise<Result> sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
Expand Down
16 changes: 11 additions & 5 deletions src/workerd/api/worker-rpc.c++
Original file line number Diff line number Diff line change
Expand Up @@ -1666,18 +1666,21 @@ class EntrypointJsRpcTarget final: public JsRpcTargetBase {
public:
EntrypointJsRpcTarget(IoContext& ioCtx,
kj::Maybe<kj::StringPtr> entrypointName,
Frankenvalue props,
kj::Maybe<kj::Own<WorkerTracer>> tracer)
: JsRpcTargetBase(ioCtx),
// Most of the time we don't really have to clone this but it's hard to fully prove, so
// let's be safe.
entrypointName(entrypointName.map([](kj::StringPtr s) { return kj::str(s); })),
props(kj::mv(props)),
tracer(kj::mv(tracer)) {}

TargetInfo getTargetInfo(Worker::Lock& lock, IoContext& ioCtx) override {
jsg::Lock& js = lock;

auto handler = KJ_REQUIRE_NONNULL(lock.getExportedHandler(entrypointName, ioCtx.getActor()),
"Failed to get handler to worker.");
auto handler =
KJ_REQUIRE_NONNULL(lock.getExportedHandler(entrypointName, kj::mv(props), ioCtx.getActor()),
"Failed to get handler to worker.");

if (handler->missingSuperclass) {
// JS RPC is not enabled on the server side, we cannot call any methods.
Expand Down Expand Up @@ -1709,6 +1712,7 @@ class EntrypointJsRpcTarget final: public JsRpcTargetBase {

private:
kj::Maybe<kj::String> entrypointName;
Frankenvalue props;
kj::Maybe<kj::Own<WorkerTracer>> tracer;

bool isReservedName(kj::StringPtr name) override {
Expand Down Expand Up @@ -1781,15 +1785,17 @@ class JsRpcSessionCustomEventImpl::ServerTopLevelMembrane final: public capnp::M
kj::Promise<WorkerInterface::CustomEvent::Result> JsRpcSessionCustomEventImpl::run(
kj::Own<IoContext::IncomingRequest> incomingRequest,
kj::Maybe<kj::StringPtr> entrypointName,
Frankenvalue props,
kj::TaskSet& waitUntilTasks) {
IoContext& ioctx = incomingRequest->getContext();

incomingRequest->delivered();

auto [donePromise, doneFulfiller] = kj::newPromiseAndFulfiller<void>();
capFulfiller->fulfill(capnp::membrane(kj::heap<EntrypointJsRpcTarget>(ioctx, entrypointName,
mapAddRef(incomingRequest->getWorkerTracer())),
kj::refcounted<ServerTopLevelMembrane>(kj::mv(doneFulfiller))));
capFulfiller->fulfill(
capnp::membrane(kj::heap<EntrypointJsRpcTarget>(ioctx, entrypointName, kj::mv(props),
mapAddRef(incomingRequest->getWorkerTracer())),
kj::refcounted<ServerTopLevelMembrane>(kj::mv(doneFulfiller))));

KJ_DEFER({
// waitUntil() should allow extending execution on the server side even when the client
Expand Down
1 change: 1 addition & 0 deletions src/workerd/api/worker-rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ class JsRpcSessionCustomEventImpl final: public WorkerInterface::CustomEvent {

kj::Promise<Result> run(kj::Own<IoContext::IncomingRequest> incomingRequest,
kj::Maybe<kj::StringPtr> entrypointName,
Frankenvalue props,
kj::TaskSet& waitUntilTasks) override;

kj::Promise<Result> sendRpc(capnp::HttpOverCapnpFactory& httpOverCapnpFactory,
Expand Down
13 changes: 13 additions & 0 deletions src/workerd/io/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ wd_cc_library(
srcs = [
"compatibility-date.c++",
"features.c++",
"frankenvalue.c++",
"hibernation-manager.c++",
"io-context.c++",
"io-own.c++",
Expand All @@ -58,6 +59,7 @@ wd_cc_library(
hdrs = [
"compatibility-date.h",
"features.h",
"frankenvalue.h",
"hibernation-manager.h",
"io-channels.h",
"io-context.h",
Expand Down Expand Up @@ -87,6 +89,7 @@ wd_cc_library(
":actor-id",
":actor-storage_capnp",
":cdp_capnp",
":frankenvalue_capnp",
":io-gate",
":io-helpers",
":limit-enforcer",
Expand Down Expand Up @@ -217,6 +220,7 @@ wd_cc_library(
hdrs = ["worker-interface.h"],
visibility = ["//visibility:public"],
deps = [
":frankenvalue_capnp",
":worker-interface_capnp",
"@capnp-cpp//src/capnp:capnp-rpc",
"@capnp-cpp//src/capnp:capnpc",
Expand Down Expand Up @@ -273,6 +277,8 @@ wd_capnp_library(src = "compatibility-date.capnp")

wd_capnp_library(src = "features.capnp")

wd_capnp_library(src = "frankenvalue.capnp")

kj_test(
src = "io-gate-test.c++",
deps = [
Expand Down Expand Up @@ -328,3 +334,10 @@ kj_test(
"//src/workerd/util:thread-scopes",
],
)

kj_test(
src = "frankenvalue-test.c++",
deps = [
":io",
],
)
56 changes: 56 additions & 0 deletions src/workerd/io/frankenvalue-test.c++
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#include "frankenvalue.h"

#include <workerd/jsg/jsg-test.h>

#include <capnp/message.h>
#include <kj/debug.h>
#include <kj/test.h>

namespace workerd {
namespace {

jsg::V8System v8System;
class ContextGlobalObject: public jsg::Object, public jsg::ContextGlobal {};

struct TestContext: public ContextGlobalObject {
JSG_RESOURCE_TYPE(TestContext) {}
};
JSG_DECLARE_ISOLATE_TYPE(TestIsolate, TestContext);

KJ_TEST("Frankenvalue") {
jsg::test::Evaluator<TestContext, TestIsolate> e(v8System);

e.run([&](jsg::Lock& js) {
// Create a value based on JSON.
Frankenvalue value = Frankenvalue::fromJson(kj::str(R"({"baz": 321, "qux": "xyz"})"_kj));

// prop1 is empty.
value.setProperty(kj::str("prop1"), {});

// prop2 is a V8-serialized value.
value.setProperty(kj::str("prop2"), ({
auto obj = js.obj();
obj.set(js, "foo", js.num(123));
obj.set(js, "bar", js.str("abc"_kj));
Frankenvalue::fromJs(js, obj);
}));

// Round trip through capnp.
{
capnp::MallocMessageBuilder message;
auto builder = message.initRoot<rpc::Frankenvalue>();
value.toCapnp(builder);
value = Frankenvalue::fromCapnp(builder.asReader());
}

// Use clone().
value = value.clone();

// Back to JS, then JSON, then check that nothing was lost.
KJ_EXPECT(js.serializeJson(value.toJs(js)) ==
R"({"baz":321,"qux":"xyz","prop1":{},"prop2":{"foo":123,"bar":"abc"}})"_kj);
});
}

} // namespace
} // namespace workerd
Loading

0 comments on commit 7518bb8

Please sign in to comment.