Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixup gc visitation in streams readers and writers #1598

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 55 additions & 25 deletions src/workerd/api/streams/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -318,16 +318,22 @@ class ReadableStreamController {
// passing along the closed promise that will be used to communicate state to the
// user code.
//
// The Reader will hold a reference to the controller that will be cleared when the reader
// is released or destroyed. The controller is guaranteed to either outlive or detach the
// reader so the ReadableStreamController& reference should remain valid.
// The Reader holds a strong reference to the controller. The Controller will hold a weak
// reference to the reader. It is ok for the reader itself to be freed/garbage collected
// while still being attached to the controller, but not the other way around.
virtual void attach(
ReadableStreamController& controller,
jsg::Promise<void> closedPromise) = 0;

// When a Reader lock is released, the controller will signal to the reader that it has been
// detached.
virtual void detach() = 0;

virtual kj::Own<WeakRef<Reader>> addWeakRef() = 0;

private:
static kj::Badge<Reader> getBadge() { return kj::Badge<Reader>(); }
friend class ReaderImpl;
};

struct ByobOptions {
Expand Down Expand Up @@ -479,12 +485,12 @@ class ReadableStreamController {

// Locks this controller to the given reader, returning true if the lock was successful, or false
// if the controller was already locked.
virtual bool lockReader(jsg::Lock& js, Reader& reader) = 0;
virtual bool lockReader(jsg::Lock& js, kj::Own<WeakRef<Reader>> reader) = 0;

// Removes the lock and releases the reader from this controller.
// maybeJs will be nullptr when the isolate lock is not available.
// If maybeJs is set, the reader's closed promise will be resolved.
virtual void releaseReader(Reader& reader, kj::Maybe<jsg::Lock&> maybeJs) = 0;
virtual void releaseReader(jsg::Lock& js, Reader& reader) = 0;

virtual kj::Maybe<PipeController&> tryPipeLock(jsg::Ref<WritableStream> destination) = 0;

Expand Down Expand Up @@ -574,8 +580,9 @@ class WritableStreamController {
// passing along the closed and ready promises that will be used to communicate state to the
// user code.
//
// The controller is guaranteed to either outlive the Writer or will detach the Writer so the
// WritableStreamController& reference should always remain valid.
// The Writer holds a strong reference to the controller. The Controller will hold a weak
// reference to the writer. It is ok for the writer itself to be freed/garbage collected
// while still being attached to the controller, but not the other way around.
virtual void attach(
WritableStreamController& controller,
jsg::Promise<void> closedPromise,
Expand All @@ -588,6 +595,12 @@ class WritableStreamController {
// The ready promise can be replaced whenever backpressure is signaled by the underlying
// controller.
virtual void replaceReadyPromise(jsg::Promise<void> readyPromise) = 0;

virtual kj::Own<WeakRef<Writer>> addWeakRef() = 0;

private:
static kj::Badge<Writer> getBadge() { return kj::Badge<Writer>(); }
friend class WritableStreamDefaultWriter;
};

struct PendingAbort {
Expand Down Expand Up @@ -672,12 +685,10 @@ class WritableStreamController {

// Locks this controller to the given writer, returning true if the lock was successful, or false
// if the controller was already locked.
virtual bool lockWriter(jsg::Lock& js, Writer& writer) = 0;
virtual bool lockWriter(jsg::Lock& js, kj::Own<WeakRef<Writer>>) = 0;

// Removes the lock and releases the writer from this controller.
// maybeJs will be nullptr when the isolate lock is not available.
// If maybeJs is set, the writer's closed and ready promises will be resolved.
virtual void releaseWriter(Writer& writer, kj::Maybe<jsg::Lock&> maybeJs) = 0;
virtual void releaseWriter(jsg::Lock& js, Writer& writer) = 0;

virtual kj::Maybe<v8::Local<v8::Value>> isErroring(jsg::Lock& js) = 0;

Expand Down Expand Up @@ -710,28 +721,37 @@ struct Locked {};

// When a reader is locked to a ReadableStream, a ReaderLock instance
// is used internally to represent the locked state in the ReadableStreamController.
// ReaderLocked maintains a weak referene to the actual Reader instance. It's ok
// for the Reader to be garbage collected while the ReadableStream is still alive but
// not vis versa. The Reader holds a strong reference to the ReadableStream only while
// it is attached.
class ReaderLocked {
public:
ReaderLocked(
ReadableStreamController::Reader& reader,
kj::Own<WeakRef<ReadableStreamController::Reader>> reader,
jsg::Promise<void>::Resolver closedFulfiller,
kj::Maybe<IoOwn<kj::Canceler>> canceler = kj::none)
: reader(reader),
: reader(kj::mv(reader)),
closedFulfiller(kj::mv(closedFulfiller)),
canceler(kj::mv(canceler)) {}

ReaderLocked(ReaderLocked&&) = default;
~ReaderLocked() noexcept(false) {
KJ_IF_SOME(r, reader) { r.detach(); }
if (reader.get() != nullptr) {
reader->runIfAlive([](auto& r) {
r.detach();
});
}
}
KJ_DISALLOW_COPY(ReaderLocked);

void visitForGc(jsg::GcVisitor& visitor) {
visitor.visit(closedFulfiller);
}

ReadableStreamController::Reader& getReader() {
return KJ_ASSERT_NONNULL(reader);
kj::Maybe<ReadableStreamController::Reader&> getReader() {
if (reader.get() == nullptr) return kj::none;
return reader->tryGet();
}

kj::Maybe<jsg::Promise<void>::Resolver>& getClosedFulfiller() {
Expand All @@ -743,34 +763,43 @@ class ReaderLocked {
}

private:
kj::Maybe<ReadableStreamController::Reader&> reader;
kj::Own<WeakRef<ReadableStreamController::Reader>> reader;
kj::Maybe<jsg::Promise<void>::Resolver> closedFulfiller;
kj::Maybe<IoOwn<kj::Canceler>> canceler;
};

// When a writer is locked to a WritableStream, a WriterLock instance
// is used internally to represent the locked state in the WritableStreamController.
// WriterLocked maintains a weak reference to the actual Writer instance. It's ok
// for the Writer to be garbage collected while the WritableStream is still alive but
// not vis versa. The Writer holds a strong reference to the WritableStream only while
// it is attached.
class WriterLocked {
public:
WriterLocked(
WritableStreamController::Writer& writer,
kj::Own<WeakRef<WritableStreamController::Writer>> writer,
jsg::Promise<void>::Resolver closedFulfiller,
kj::Maybe<jsg::Promise<void>::Resolver> readyFulfiller = kj::none)
: writer(writer),
: writer(kj::mv(writer)),
closedFulfiller(kj::mv(closedFulfiller)),
readyFulfiller(kj::mv(readyFulfiller)) {}

WriterLocked(WriterLocked&&) = default;
~WriterLocked() noexcept(false) {
KJ_IF_SOME(w, writer) { w.detach(); }
if (writer.get() != nullptr) {
writer->runIfAlive([&](auto& w) {
w.detach();
});
}
}

void visitForGc(jsg::GcVisitor& visitor) {
visitor.visit(closedFulfiller, readyFulfiller);
}

WritableStreamController::Writer& getWriter() {
return KJ_ASSERT_NONNULL(writer);
kj::Maybe<WritableStreamController::Writer&> getWriter() {
if (writer.get() == nullptr) return kj::none;
return writer->tryGet();
}

kj::Maybe<jsg::Promise<void>::Resolver>& getClosedFulfiller() {
Expand All @@ -782,14 +811,15 @@ class WriterLocked {
}

void setReadyFulfiller(jsg::PromiseResolverPair<void>& pair) {
KJ_IF_SOME(w, writer) {
if (writer.get() == nullptr) return;
writer->runIfAlive([&](auto& w) {
readyFulfiller = kj::mv(pair.resolver);
w.replaceReadyPromise(kj::mv(pair.promise));
}
});
}

private:
kj::Maybe<WritableStreamController::Writer&> writer;
kj::Own<WeakRef<WritableStreamController::Writer>> writer;
kj::Maybe<jsg::Promise<void>::Resolver> closedFulfiller;
kj::Maybe<jsg::Promise<void>::Resolver> readyFulfiller;
};
Expand Down
91 changes: 36 additions & 55 deletions src/workerd/api/streams/internal.c++
Original file line number Diff line number Diff line change
Expand Up @@ -471,12 +471,7 @@ kj::Maybe<kj::Promise<DeferredProxy<void>>> WritableStreamSink::tryPumpFrom(

// =======================================================================================

ReadableStreamInternalController::~ReadableStreamInternalController() noexcept(false) {
KJ_IF_SOME(locked, readState.tryGet<ReaderLocked>()) {
auto lock = kj::mv(locked);
readState.init<Unlocked>();
}
}
ReadableStreamInternalController::~ReadableStreamInternalController() noexcept(false) {}

jsg::Ref<ReadableStream> ReadableStreamInternalController::addRef() {
return KJ_ASSERT_NONNULL(owner).addRef();
Expand Down Expand Up @@ -761,16 +756,17 @@ kj::Maybe<kj::Own<ReadableStreamSource>> ReadableStreamInternalController::remov
KJ_UNREACHABLE;
}

bool ReadableStreamInternalController::lockReader(jsg::Lock& js, Reader& reader) {
bool ReadableStreamInternalController::lockReader(jsg::Lock& js, kj::Own<WeakRef<Reader>> reader) {
if (isLockedToReader()) {
return false;
}

auto prp = js.newPromiseAndResolver<void>();
prp.promise.markAsHandled(js);

auto lock = ReaderLocked(reader, kj::mv(prp.resolver),
auto lock = ReaderLocked(kj::mv(reader), kj::mv(prp.resolver),
IoContext::current().addObject(kj::heap<kj::Canceler>()));
// Take care not to access reader directly after this point. Use the lock.

KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
Expand All @@ -785,42 +781,30 @@ bool ReadableStreamInternalController::lockReader(jsg::Lock& js, Reader& reader)
}

readState = kj::mv(lock);
reader.attach(*this, kj::mv(prp.promise));

auto& inner = KJ_ASSERT_NONNULL(readState.get<ReaderLocked>().getReader());
inner.attach(*this, kj::mv(prp.promise));
return true;
}

void ReadableStreamInternalController::releaseReader(
Reader& reader,
kj::Maybe<jsg::Lock&> maybeJs) {
void ReadableStreamInternalController::releaseReader(jsg::Lock& js, Reader& reader) {
KJ_IF_SOME(locked, readState.tryGet<ReaderLocked>()) {
KJ_ASSERT(&locked.getReader() == &reader);
KJ_IF_SOME(js, maybeJs) {
JSG_REQUIRE(KJ_ASSERT_NONNULL(locked.getCanceler())->isEmpty(), TypeError,
"Cannot call releaseLock() on a reader with outstanding read promises.");
maybeRejectPromise<void>(js,
locked.getClosedFulfiller(),
js.v8TypeError("This ReadableStream reader has been released."_kj));
KJ_IF_SOME(r, locked.getReader()) {
KJ_ASSERT(&r == &reader);
}
auto lock = kj::mv(locked);

// When maybeJs is nullptr, that means releaseReader was called when the reader is
// being deconstructed and not as the result of explicitly calling releaseLock. In
// that case, we don't want to change the lock state itself because we do not have
// an isolate lock. Moving the lock above will free the lock state while keeping the
// ReadableStream marked as locked.
if (maybeJs != kj::none) {
readState.template init<Unlocked>();
}
}
}
JSG_REQUIRE(KJ_ASSERT_NONNULL(locked.getCanceler())->isEmpty(), TypeError,
"Cannot call releaseLock() on a reader with outstanding read promises.");
maybeRejectPromise<void>(js,
locked.getClosedFulfiller(),
js.v8TypeError("This ReadableStream reader has been released."_kj));

WritableStreamInternalController::~WritableStreamInternalController() noexcept(false) {
KJ_IF_SOME(locked, writeState.tryGet<WriterLocked>()) {
auto lock = kj::mv(locked);
writeState.init<Unlocked>();
readState.template init<Unlocked>();
}
}

WritableStreamInternalController::~WritableStreamInternalController() noexcept(false) {}

jsg::Ref<WritableStream> WritableStreamInternalController::addRef() {
return KJ_ASSERT_NONNULL(owner).addRef();
}
Expand Down Expand Up @@ -1246,7 +1230,7 @@ kj::Maybe<int> WritableStreamInternalController::getDesiredSize() {
KJ_UNREACHABLE;
}

bool WritableStreamInternalController::lockWriter(jsg::Lock& js, Writer& writer) {
bool WritableStreamInternalController::lockWriter(jsg::Lock& js, kj::Own<WeakRef<Writer>> writer) {
if (isLockedToWriter()) {
return false;
}
Expand All @@ -1257,7 +1241,8 @@ bool WritableStreamInternalController::lockWriter(jsg::Lock& js, Writer& writer)
auto readyPrp = js.newPromiseAndResolver<void>();
readyPrp.promise.markAsHandled(js);

auto lock = WriterLocked(writer, kj::mv(closedPrp.resolver), kj::mv(readyPrp.resolver));
auto lock = WriterLocked(kj::mv(writer), kj::mv(closedPrp.resolver), kj::mv(readyPrp.resolver));
// Careful not to access writer directly after this point. Access is through the lock.

KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
Expand All @@ -1274,30 +1259,26 @@ bool WritableStreamInternalController::lockWriter(jsg::Lock& js, Writer& writer)
}

writeState = kj::mv(lock);
writer.attach(*this, kj::mv(closedPrp.promise), kj::mv(readyPrp.promise));

auto& inner = KJ_ASSERT_NONNULL(writeState.get<WriterLocked>().getWriter());
inner.attach(*this, kj::mv(closedPrp.promise), kj::mv(readyPrp.promise));

return true;
}

void WritableStreamInternalController::releaseWriter(
Writer& writer,
kj::Maybe<jsg::Lock&> maybeJs) {
void WritableStreamInternalController::releaseWriter(jsg::Lock& js, Writer& writer) {
KJ_IF_SOME(locked, writeState.tryGet<WriterLocked>()) {
KJ_ASSERT(&locked.getWriter() == &writer);
KJ_IF_SOME(js, maybeJs) {
maybeRejectPromise<void>(js,
locked.getClosedFulfiller(),
js.v8TypeError("This WritableStream writer has been released."_kj));
KJ_IF_SOME(w, locked.getWriter()) {
// Just an extra verification.
KJ_ASSERT(&w == &writer);
}
auto lock = kj::mv(locked);

// When maybeJs is nullptr, that means releaseWriter was called when the writer is
// being deconstructed and not as the result of explicitly calling releaseLock and
// we do not have an isolate lock. In that case, we don't want to change the lock
// state itself. Moving the lock above will free the lock state while keeping the
// WritableStream marked as locked.
if (maybeJs != kj::none) {
writeState.template init<Unlocked>();
}
maybeRejectPromise<void>(js,
locked.getClosedFulfiller(),
js.v8TypeError("This WritableStream writer has been released."_kj));

auto lock = kj::mv(locked);
writeState.template init<Unlocked>();
}
}

Expand Down Expand Up @@ -1838,7 +1819,7 @@ void WritableStreamInternalController::visitForGc(jsg::GcVisitor& visitor) {
for (auto& event : queue) {
KJ_SWITCH_ONEOF(event.event) {
KJ_CASE_ONEOF(write, Write) {
visitor.visit(write.promise, write.ref);
visitor.visit(write.promise);
}
KJ_CASE_ONEOF(close, Close) {
visitor.visit(close.promise);
Expand Down
10 changes: 4 additions & 6 deletions src/workerd/api/streams/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,9 @@ class ReadableStreamInternalController: public ReadableStreamController {

bool isLockedToReader() const override { return !readState.is<Unlocked>(); }

bool lockReader(jsg::Lock& js, Reader& reader) override;
bool lockReader(jsg::Lock& js, kj::Own<WeakRef<Reader>> reader) override;

void releaseReader(Reader& reader, kj::Maybe<jsg::Lock&> maybeJs) override;
// See the comment for releaseReader in common.h for details on the use of maybeJs
void releaseReader(jsg::Lock& js, Reader& reader) override;

kj::Maybe<PipeController&> tryPipeLock(jsg::Ref<WritableStream> destination) override;

Expand Down Expand Up @@ -201,9 +200,9 @@ class WritableStreamInternalController: public WritableStreamController {

bool isLockedToWriter() const override { return !writeState.is<Unlocked>(); }

bool lockWriter(jsg::Lock& js, Writer& writer) override;
bool lockWriter(jsg::Lock& js, kj::Own<WeakRef<Writer>> writer) override;

void releaseWriter(Writer& writer, kj::Maybe<jsg::Lock&> maybeJs) override;
void releaseWriter(jsg::Lock& js, Writer& writer) override;
// See the comment for releaseWriter in common.h for details on the use of maybeJs

kj::Maybe<v8::Local<v8::Value>> isErroring(jsg::Lock& js) override {
Expand Down Expand Up @@ -283,7 +282,6 @@ class WritableStreamInternalController: public WritableStreamController {
kj::Maybe<jsg::Promise<void>::Resolver> promise;
std::shared_ptr<v8::BackingStore> ownBytes;
kj::ArrayPtr<const kj::byte> bytes;
kj::Maybe<jsg::Ref<WritableStream>> ref;
};
struct Close {
kj::Maybe<jsg::Promise<void>::Resolver> promise;
Expand Down
Loading
Loading