Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor zlib to simplify our implementation #2679

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 33 additions & 40 deletions src/workerd/api/node/zlib-util.c++
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ public:
size_t size() const {
return builder.size();
}
bool empty() const {
return size() == 0;
}
size_t capacity() const {
return builder.capacity();
}
Expand Down Expand Up @@ -112,6 +109,24 @@ private:
builder = kj::mv(newBuilder);
}
};

template <typename Context>
kj::Array<kj::byte> syncProcessBuffer(Context& ctx, GrowableBuffer& result) {
do {
result.addChunk();
ctx.setOutputBuffer(kj::ArrayPtr(result.end(), result.available()));

ctx.work();

KJ_IF_SOME(error, ctx.getError()) {
JSG_FAIL_REQUIRE(Error, error.message);
}

result.adjustUnused(ctx.getAvailOut());
} while (ctx.getAvailOut() == 0);

return result.releaseAsArray();
}
} // namespace

void ZlibContext::initialize(int _level,
Expand Down Expand Up @@ -484,11 +499,11 @@ void ZlibUtil::CompressionStream<CompressionContext>::writeStream(jsg::Lock& js,

writing = true;

context()->setBuffers(input, inputLength, output, outputLength);
context()->setFlush(flush);
context.setBuffers(input, inputLength, output, outputLength);
context.setFlush(flush);

if constexpr (!async) {
context()->work();
context.work();
if (checkError(js)) {
updateWriteResult();
writing = false;
Expand All @@ -498,7 +513,7 @@ void ZlibUtil::CompressionStream<CompressionContext>::writeStream(jsg::Lock& js,

// On Node.js, this is called as a result of `ScheduleWork()` call.
// Since, we implement the whole thing as sync, we're going to ahead and call the whole thing here.
context()->work();
context.work();

// This is implemented slightly differently in Node.js
// Node.js calls AfterThreadPoolWork().
Expand Down Expand Up @@ -528,7 +543,7 @@ void ZlibUtil::CompressionStream<CompressionContext>::close() {

template <typename CompressionContext>
bool ZlibUtil::CompressionStream<CompressionContext>::checkError(jsg::Lock& js) {
KJ_IF_SOME(error, context()->getError()) {
KJ_IF_SOME(error, context.getError()) {
emitError(js, kj::mv(error));
return false;
}
Expand All @@ -547,7 +562,7 @@ template <typename CompressionContext>
void ZlibUtil::CompressionStream<CompressionContext>::updateWriteResult() {
KJ_IF_SOME(wr, writeResult) {
auto ptr = wr.template asArrayPtr<uint32_t>();
context()->getAfterWriteResult(&ptr[1], &ptr[0]);
context.getAfterWriteResult(&ptr[1], &ptr[0]);
}
}

Expand Down Expand Up @@ -585,7 +600,7 @@ void ZlibUtil::CompressionStream<CompressionContext>::write(jsg::Lock& js,

template <typename CompressionContext>
void ZlibUtil::CompressionStream<CompressionContext>::reset(jsg::Lock& js) {
KJ_IF_SOME(error, context()->resetStream()) {
KJ_IF_SOME(error, context.resetStream()) {
emitError(js, kj::mv(error));
}
}
Expand All @@ -602,13 +617,13 @@ void ZlibUtil::ZlibStream::initialize(int windowBits,
jsg::Function<void()> writeCallback,
jsg::Optional<kj::Array<kj::byte>> dictionary) {
initializeStream(kj::mv(writeState), kj::mv(writeCallback));
context()->setAllocationFunctions(AllocForZlib, FreeForZlib, this);
context()->initialize(level, windowBits, memLevel, strategy, kj::mv(dictionary));
context.setAllocationFunctions(AllocForZlib, FreeForZlib, this);
context.initialize(level, windowBits, memLevel, strategy, kj::mv(dictionary));
}

void ZlibUtil::ZlibStream::params(jsg::Lock& js, int _level, int _strategy) {
context()->setParams(_level, _strategy);
KJ_IF_SOME(err, context()->getError()) {
context.setParams(_level, _strategy);
KJ_IF_SOME(err, context.getError()) {
emitError(js, kj::mv(err));
}
}
Expand Down Expand Up @@ -771,10 +786,9 @@ bool ZlibUtil::BrotliCompressionStream<CompressionContext>::initialize(jsg::Lock
jsg::BufferSource writeResult,
jsg::Function<void()> writeCallback) {
this->initializeStream(kj::mv(writeResult), kj::mv(writeCallback));
auto maybeError =
this->context()->initialize(CompressionStream<CompressionContext>::AllocForBrotli,
CompressionStream<CompressionContext>::FreeForZlib,
static_cast<CompressionStream<CompressionContext>*>(this));
auto maybeError = context().initialize(CompressionStream<CompressionContext>::AllocForBrotli,
CompressionStream<CompressionContext>::FreeForZlib,
static_cast<CompressionStream<CompressionContext>*>(this));

KJ_IF_SOME(err, maybeError) {
this->emitError(js, kj::mv(err));
Expand All @@ -788,7 +802,7 @@ bool ZlibUtil::BrotliCompressionStream<CompressionContext>::initialize(jsg::Lock
continue;
}

KJ_IF_SOME(err, this->context()->setParams(i, results[i])) {
KJ_IF_SOME(err, context().setParams(i, results[i])) {
this->emitError(js, kj::mv(err));
return false;
}
Expand All @@ -810,8 +824,6 @@ void* ZlibUtil::CompressionStream<CompressionContext>::AllocForBrotli(void* data
auto* ctx = static_cast<CompressionStream*>(data);
auto memory = kj::heapArray<uint8_t>(size);
auto begin = memory.begin();
// TODO(soon): Check if we need to store the size of the block in the pointer like Node.js
*reinterpret_cast<size_t*>(begin) = size;
ctx->allocations.insert(begin, kj::mv(memory));
return begin + sizeof(size_t);
}
Expand All @@ -823,25 +835,6 @@ void ZlibUtil::CompressionStream<CompressionContext>::FreeForZlib(void* data, vo
auto real_pointer = static_cast<uint8_t*>(pointer) - sizeof(size_t);
JSG_REQUIRE(ctx->allocations.erase(real_pointer), Error, "Zlib allocation should exist"_kj);
}
namespace {
template <typename Context>
static kj::Array<kj::byte> syncProcessBuffer(Context& ctx, GrowableBuffer& result) {
do {
result.addChunk();
ctx.setOutputBuffer(kj::ArrayPtr(result.end(), result.available()));

ctx.work();

KJ_IF_SOME(error, ctx.getError()) {
JSG_FAIL_REQUIRE(Error, error.message);
}

result.adjustUnused(ctx.getAvailOut());
} while (ctx.getAvailOut() == 0);

return result.releaseAsArray();
}
} // namespace

kj::Array<kj::byte> ZlibUtil::zlibSync(
ZlibUtil::InputSource data, ZlibContext::Options opts, ZlibModeValue mode) {
Expand Down
13 changes: 7 additions & 6 deletions src/workerd/api/node/zlib-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ class ZlibUtil final: public jsg::Object {
template <class CompressionContext>
class CompressionStream: public jsg::Object {
public:
explicit CompressionStream(ZlibMode _mode): context_(_mode) {}
explicit CompressionStream(ZlibMode _mode): context(_mode) {}
CompressionStream() = default;
// TODO(soon): Find a way to add noexcept(false) to this destructor.
~CompressionStream();
Expand Down Expand Up @@ -338,8 +338,10 @@ class ZlibUtil final: public jsg::Object {
}

protected:
CompressionContext* context() {
return &context_;
CompressionContext context;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This creates a use-after-free error because allocations will be destroyed before context.


CompressionContext& getContext() {
return context;
}

void initializeStream(jsg::BufferSource _write_result, jsg::Function<void()> writeCallback);
Expand All @@ -357,7 +359,6 @@ class ZlibUtil final: public jsg::Object {
// context to avoid `heap-use-after-free` ASan error.
kj::HashMap<uint8_t*, kj::Array<uint8_t>> allocations;

CompressionContext context_;
bool initialized = false;
bool writing = false;
bool pending_close = false;
Expand Down Expand Up @@ -420,8 +421,8 @@ class ZlibUtil final: public jsg::Object {
JSG_METHOD(params);
}

CompressionContext* context() {
return this->CompressionStream<CompressionContext>::context();
CompressionContext& context() {
return this->CompressionStream<CompressionContext>::getContext();
}
};

Expand Down
Loading