Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial implementation of DO-attached containers API #3354

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/workerd/api/actor-state.c++
Original file line number Diff line number Diff line change
Expand Up @@ -825,10 +825,13 @@ kj::OneOf<jsg::Ref<DurableObjectId>, kj::StringPtr> ActorState::getId() {
KJ_UNREACHABLE;
}

DurableObjectState::DurableObjectState(
Worker::Actor::Id actorId, kj::Maybe<jsg::Ref<DurableObjectStorage>> storage)
DurableObjectState::DurableObjectState(Worker::Actor::Id actorId,
kj::Maybe<jsg::Ref<DurableObjectStorage>> storage,
kj::Maybe<rpc::Container::Client> container)
: id(kj::mv(actorId)),
storage(kj::mv(storage)) {}
storage(kj::mv(storage)),
container(container.map(
[&](rpc::Container::Client& cap) { return jsg::alloc<Container>(kj::mv(cap)); })) {}

void DurableObjectState::waitUntil(kj::Promise<void> promise) {
IoContext::current().addWaitUntil(kj::mv(promise));
Expand Down
11 changes: 10 additions & 1 deletion src/workerd/api/actor-state.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// See actor.h for APIs used by other Workers to talk to Actors.

#include <workerd/api/actor.h>
#include <workerd/api/container.h>
#include <workerd/io/actor-cache.h>
#include <workerd/io/actor-id.h>
#include <workerd/io/compatibility-date.capnp.h>
Expand Down Expand Up @@ -462,7 +463,9 @@ class WebSocketRequestResponsePair: public jsg::Object {
// The type passed as the first parameter to durable object class's constructor.
class DurableObjectState: public jsg::Object {
public:
DurableObjectState(Worker::Actor::Id actorId, kj::Maybe<jsg::Ref<DurableObjectStorage>> storage);
DurableObjectState(Worker::Actor::Id actorId,
kj::Maybe<jsg::Ref<DurableObjectStorage>> storage,
kj::Maybe<rpc::Container::Client> container);

void waitUntil(kj::Promise<void> promise);

Expand All @@ -472,6 +475,10 @@ class DurableObjectState: public jsg::Object {
return storage.map([&](jsg::Ref<DurableObjectStorage>& p) { return p.addRef(); });
}

jsg::Optional<jsg::Ref<Container>> getContainer() {
return container.map([](jsg::Ref<Container>& c) { return c.addRef(); });
}

jsg::Promise<jsg::JsRef<jsg::JsValue>> blockConcurrencyWhile(
jsg::Lock& js, jsg::Function<jsg::Promise<jsg::JsRef<jsg::JsValue>>()> callback);

Expand Down Expand Up @@ -536,6 +543,7 @@ class DurableObjectState: public jsg::Object {
JSG_METHOD(waitUntil);
JSG_READONLY_INSTANCE_PROPERTY(id, getId);
JSG_READONLY_INSTANCE_PROPERTY(storage, getStorage);
JSG_READONLY_INSTANCE_PROPERTY(container, getContainer);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: these could probably be JSG_LAZY_READONLY_INSTANCE_PROPERTY

JSG_METHOD(blockConcurrencyWhile);
JSG_METHOD(acceptWebSocket);
JSG_METHOD(getWebSockets);
Expand Down Expand Up @@ -574,6 +582,7 @@ class DurableObjectState: public jsg::Object {
private:
Worker::Actor::Id id;
kj::Maybe<jsg::Ref<DurableObjectStorage>> storage;
kj::Maybe<jsg::Ref<Container>> container;

// Limits for Hibernatable WebSocket tags.

Expand Down
242 changes: 242 additions & 0 deletions src/workerd/api/container.c++
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
// Copyright (c) 2025 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

#include "container.h"

#include <workerd/api/http.h>
#include <workerd/io/io-context.h>

namespace workerd::api {

// =======================================================================================
// Basic lifecycle methods

Container::Container(rpc::Container::Client rpcClient)
: rpcClient(IoContext::current().addObject(kj::heap(kj::mv(rpcClient)))) {}

void Container::start(jsg::Lock& js, jsg::Optional<StartupOptions> maybeOptions) {
JSG_REQUIRE(!running, Error, "start() cannot be called on a container that is already running.");

StartupOptions options = kj::mv(maybeOptions).orDefault({});

auto req = rpcClient->startRequest();
KJ_IF_SOME(entrypoint, options.entrypoint) {
auto list = req.initEntrypoint(entrypoint.size());
for (auto i: kj::indices(entrypoint)) {
list.set(i, entrypoint[i]);
}
}
req.setEnableInternet(options.enableInternet);

IoContext::current().addTask(req.send().ignoreResult());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Might be just a slight bit better to auto& ioContext = IoContext::current() before the auto req = rpcClient->startRequest() just in case the IoContext::current() fails. Not crtical and definitely an edge case tho.


running = true;
}

jsg::Promise<void> Container::monitor(jsg::Lock& js) {
JSG_REQUIRE(running, Error, "monitor() cannot be called on a container that is not running.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What, if anything, should happen if this is called after "destroy()" is called but the destroy promise is still pending?


return IoContext::current()
.awaitIo(js, rpcClient->monitorRequest(capnp::MessageSize{4, 0}).send().ignoreResult())
.then(js, [this](jsg::Lock& js) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

safer to capture self=JSG_THIS rather than this? It's unlikely that the container will disappear while this is pending but likely best to be safe?

running = false;
KJ_IF_SOME(d, destroyReason) {
jsg::Value error = kj::mv(d);
destroyReason = kj::none;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking: this pattern still bugs. It's so easy for forget to set the var to kj::none after moving.

js.throwException(kj::mv(error));
}
}, [this](jsg::Lock& js, jsg::Value&& error) {
running = false;
destroyReason = kj::none;
js.throwException(kj::mv(error));
});
}

jsg::Promise<void> Container::destroy(jsg::Lock& js, jsg::Optional<jsg::Value> error) {
if (!running) return js.resolvedPromise();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If destroy is called multiple times it looks like this will end up sending the destroyRequest multiple times? Also, it ends up creating and returning multiple promises. Once it has been called, it likely should be arranged to return the same js promise instance back.


if (destroyReason == kj::none) {
destroyReason = kj::mv(error);
}

return IoContext::current().awaitIo(
js, rpcClient->destroyRequest(capnp::MessageSize{4, 0}).send().ignoreResult());
}

void Container::signal(jsg::Lock& js, int signo) {
JSG_REQUIRE(signo > 0 && signo <= 64, TypeError, "Invalid signal number.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: should be a RangeError?

JSG_REQUIRE(running, Error, "signal() cannot be called on a container that is not running.");

auto req = rpcClient->signalRequest(capnp::MessageSize{4, 0});
req.setSigno(signo);
IoContext::current().addTask(req.send().ignoreResult());
}

// =======================================================================================
// getTcpPort()

// `getTcpPort()` returns a `Fetcher`, on which `fetch()` and `connect()` can be called. `Fetcher`
// is a JavaScript wrapper around `WorkerInterface`, so we need to implement that.
class Container::TcpPortWorkerInterface final: public WorkerInterface {
public:
TcpPortWorkerInterface(capnp::ByteStreamFactory& byteStreamFactory,
const kj::HttpHeaderTable& headerTable,
rpc::Container::Port::Client port)
: byteStreamFactory(byteStreamFactory),
headerTable(headerTable),
port(kj::mv(port)) {}

// Implements fetch(), i.e., HTTP requests. We form a TCP connection, then run HTTP over it
// (as opposed to, say, speaking http-over-capnp to the container service).
kj::Promise<void> request(kj::HttpMethod method,
kj::StringPtr url,
const kj::HttpHeaders& headers,
kj::AsyncInputStream& requestBody,
kj::HttpService::Response& response) override {
// URLs should have been validated earlier in the stack, so parsing the URL should succeed.
auto parsedUrl = KJ_REQUIRE_NONNULL(kj::Url::tryParse(url, kj::Url::Context::HTTP_PROXY_REQUEST,
{.percentDecode = false, .allowEmpty = true}),
"invalid url?", url);

// We don't support TLS.
JSG_REQUIRE(parsedUrl.scheme != "https", Error,
"Connencting to a container using HTTPS is not currently supported; use HTTP instead. "
"TLS is unnecessary anyway, as the connection is already secure by default.");

// Schemes other than http: and https: should have been rejected earlier, but let's verify.
KJ_REQUIRE(parsedUrl.scheme == "http");

// We need to convert the URL from proxy format (full URL in request line) to host format
// (path in request line, hostname in Host header).
auto newHeaders = headers.cloneShallow();
newHeaders.set(kj::HttpHeaderId::HOST, parsedUrl.host);
auto noHostUrl = parsedUrl.toString(kj::Url::Context::HTTP_REQUEST);

// Make a TCP connection...
auto pipe = kj::newTwoWayPipe();
auto connectionPromise =
connectImpl(*pipe.ends[1]).then([]() -> kj::Promise<void> { return kj::NEVER_DONE; });

// ... and then stack an HttpClient on it ...
auto client = kj::newHttpClient(headerTable, *pipe.ends[0]);

// ... and then adapt that to an HttpService ...
auto service = kj::newHttpService(*client);

// ... and now we can just forward our call to that.
co_await connectionPromise.exclusiveJoin(
service->request(method, noHostUrl, newHeaders, requestBody, response));
}

// Implements connect(), i.e., forms a raw socket.
kj::Promise<void> connect(kj::StringPtr host,
const kj::HttpHeaders& headers,
kj::AsyncIoStream& connection,
ConnectResponse& response,
kj::HttpConnectSettings settings) override {
JSG_REQUIRE(!settings.useTls, Error,
"Connencting to a container using TLS is not currently supported. It is unnecessary "
"anyway, as the connection is already secure by default.");

auto promise = connectImpl(connection);

kj::HttpHeaders responseHeaders(headerTable);
response.accept(200, "OK", responseHeaders);

return promise;
}

// The only `CustomEvent` that can happen through `Fetcher` is a JSRPC call. Maybe we will
// support this someday? But not today.
kj::Promise<CustomEvent::Result> customEvent(kj::Own<CustomEvent> event) override {
return event->notSupported();
}

// There's no way to invoke the remaining event types via `Fetcher`.
kj::Promise<void> prewarm(kj::StringPtr url) override {
KJ_UNREACHABLE;
}
kj::Promise<ScheduledResult> runScheduled(kj::Date scheduledTime, kj::StringPtr cron) override {
KJ_UNREACHABLE;
}
kj::Promise<AlarmResult> runAlarm(kj::Date scheduledTime, uint32_t retryCount) override {
KJ_UNREACHABLE;
}

private:
capnp::ByteStreamFactory& byteStreamFactory;
const kj::HttpHeaderTable& headerTable;
rpc::Container::Port::Client port;

// Connect to the port and pump bytes to/from `connection`. Used by both request() and
// connect().
kj::Promise<void> connectImpl(kj::AsyncIoStream& connection) {
// A lot of the following is copied from
// capnp::HttpOverCapnpFactory::KjToCapnpHttpServiceAdapter::connect().
auto req = port.connectRequest(capnp::MessageSize{4, 1});
auto downPipe = kj::newOneWayPipe();
req.setDown(byteStreamFactory.kjToCapnp(kj::mv(downPipe.out)));
auto pipeline = req.send();

// Make sure the request message isn't pinned into memory through the co_await below.
{ auto drop = kj::mv(req); }

auto downPumpTask =
downPipe.in->pumpTo(connection)
.then([&connection, down = kj::mv(downPipe.in)](uint64_t) -> kj::Promise<void> {
connection.shutdownWrite();
return kj::NEVER_DONE;
});
auto up = pipeline.getUp();

auto upStream = byteStreamFactory.capnpToKjExplicitEnd(up);
auto upPumpTask = connection.pumpTo(*upStream)
.then([&upStream = *upStream](uint64_t) mutable {
return upStream.end();
}).then([up = kj::mv(up), upStream = kj::mv(upStream)]() mutable -> kj::Promise<void> {
return kj::NEVER_DONE;
});

co_await pipeline.ignoreResult();
}
};

// `Fetcher` actually wants us to give it a factory that creates a new `WorkerInterface` for each
// request, so this is that.
class Container::TcpPortOutgoingFactory final: public Fetcher::OutgoingFactory {
public:
TcpPortOutgoingFactory(capnp::ByteStreamFactory& byteStreamFactory,
const kj::HttpHeaderTable& headerTable,
rpc::Container::Port::Client port)
: byteStreamFactory(byteStreamFactory),
headerTable(headerTable),
port(kj::mv(port)) {}

kj::Own<WorkerInterface> newSingleUseClient(kj::Maybe<kj::String> cfStr) override {
// At present we have no use for `cfStr`.
return kj::heap<TcpPortWorkerInterface>(byteStreamFactory, headerTable, port);
}

private:
capnp::ByteStreamFactory& byteStreamFactory;
const kj::HttpHeaderTable& headerTable;
rpc::Container::Port::Client port;
};

jsg::Ref<Fetcher> Container::getTcpPort(jsg::Lock& js, int port) {
JSG_REQUIRE(port > 0 && port < 65536, TypeError, "Invalid port number: ", port);

auto req = rpcClient->getTcpPortRequest(capnp::MessageSize{4, 0});
req.setPort(port);

auto& ioctx = IoContext::current();

kj::Own<Fetcher::OutgoingFactory> factory = kj::heap<TcpPortOutgoingFactory>(
ioctx.getByteStreamFactory(), ioctx.getHeaderTable(), req.send().getPort());

return jsg::alloc<Fetcher>(
ioctx.addObject(kj::mv(factory)), Fetcher::RequiresHostAndProtocol::YES, true);
}

} // namespace workerd::api
71 changes: 71 additions & 0 deletions src/workerd/api/container.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) 2025 Cloudflare, Inc.
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

#pragma once
// APIs that an Actor (Durable Object) uses to access its own state.
//
// See actor.h for APIs used by other Workers to talk to Actors.

#include <workerd/io/container.capnp.h>
#include <workerd/io/io-own.h>
#include <workerd/jsg/jsg.h>

namespace workerd::api {

class Fetcher;

class Container: public jsg::Object {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add code comments explaining what a Container is

public:
Container(rpc::Container::Client rpcClient);

struct StartupOptions {
jsg::Optional<kj::Array<kj::String>> entrypoint;
bool enableInternet = false;

// TODO(containers): stdio interception

JSG_STRUCT(entrypoint, enableInternet);
};

bool getRunning() {
return running;
}

// Methods correspond closely to the RPC interface in `container.capnp`.
void start(jsg::Lock& js, jsg::Optional<StartupOptions> options);
jsg::Promise<void> monitor(jsg::Lock& js);
jsg::Promise<void> destroy(jsg::Lock& js, jsg::Optional<jsg::Value> error);
void signal(jsg::Lock& js, int signo);
jsg::Ref<Fetcher> getTcpPort(jsg::Lock& js, int port);

// TODO(containers): listenTcp()

JSG_RESOURCE_TYPE(Container) {
JSG_READONLY_PROTOTYPE_PROPERTY(running, getRunning);
JSG_METHOD(start);
JSG_METHOD(monitor);
JSG_METHOD(destroy);
JSG_METHOD(signal);
JSG_METHOD(getTcpPort);
}

private:
IoOwn<rpc::Container::Client> rpcClient;

// TODO(containers): Actually check if the container is already running when the DO starts.
bool running = false;

kj::Maybe<jsg::Value> destroyReason;

void visitForGc(jsg::GcVisitor& visitor) {
visitor.visit(destroyReason);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: implement visitForMemoryInfo(...) for the destroyReason field.


class TcpPortWorkerInterface;
class TcpPortOutgoingFactory;
};

#define EW_CONTAINER_ISOLATE_TYPES api::Container, api::Container::StartupOptions

} // namespace workerd::api
8 changes: 8 additions & 0 deletions src/workerd/io/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ wd_cc_library(
":actor-id",
":actor-storage_capnp",
":cdp_capnp",
":container_capnp",
":frankenvalue",
":io-gate",
":io-helpers",
Expand Down Expand Up @@ -303,6 +304,13 @@ wd_capnp_library(src = "features.capnp")

wd_capnp_library(src = "frankenvalue.capnp")

wd_capnp_library(
src = "container.capnp",
deps = [
"@capnp-cpp//src/capnp/compat:byte-stream_capnp",
],
)

kj_test(
src = "io-gate-test.c++",
deps = [
Expand Down
Loading
Loading