Skip to content

Commit

Permalink
Fix regression with IPC (#16465)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarred-Sumner authored Jan 17, 2025
1 parent 9579e42 commit 399ec3e
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/bun.js/ipc.zig
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ const SocketIPCData = struct {
const bytes = getVersionPacket(this.mode);
if (bytes.len > 0) {
const n = this.socket.write(bytes, false);
if (n != bytes.len) {
if (n >= 0 and n < @as(i32, @intCast(bytes.len))) {
this.outgoing.write(bytes[@intCast(n)..]) catch bun.outOfMemory();
}
}
Expand Down
21 changes: 15 additions & 6 deletions src/bun.js/node/node_cluster_binding.zig
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
// Most of this code should be rewritten.
// - Usage of JSC.Strong here is likely to cause memory leaks.
// - These sequence numbers and ACKs shouldn't exist from JavaScript's perspective
// at all. It should happen in the protocol before it reaches JS.
// - We should not be creating JSFunction's in process.nextTick.
const std = @import("std");
const bun = @import("root").bun;
const Environment = bun.Environment;
Expand Down Expand Up @@ -35,6 +40,8 @@ pub fn sendHelperChild(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFram
return globalThis.throwInvalidArgumentTypeValue("message", "object", message);
}
if (callback.isFunction()) {
// TODO: remove this strong. This is expensive and would be an easy way to create a memory leak.
// These sequence numbers shouldn't exist from JavaScript's perspective at all.
child_singleton.callbacks.put(bun.default_allocator, child_singleton.seq, JSC.Strong.create(callback, globalThis)) catch bun.outOfMemory();
}

Expand All @@ -52,7 +59,7 @@ pub fn sendHelperChild(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFram
fn impl(globalThis_: *JSC.JSGlobalObject, callframe_: *JSC.CallFrame) bun.JSError!JSC.JSValue {
const arguments_ = callframe_.arguments_old(1).slice();
const ex = arguments_[0];
Process__emitErrorEvent(globalThis_, ex);
Process__emitErrorEvent(globalThis_, ex.toError() orelse ex);
return .undefined;
}
};
Expand All @@ -73,6 +80,7 @@ pub fn sendHelperChild(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFram
pub fn onInternalMessageChild(globalThis: *JSC.JSGlobalObject, callframe: *JSC.CallFrame) bun.JSError!JSC.JSValue {
log("onInternalMessageChild", .{});
const arguments = callframe.arguments_old(2).ptr;
// TODO: we should not create two JSC.Strong here. If absolutely necessary, a single Array. should be all we use.
child_singleton.worker = JSC.Strong.create(arguments[0], globalThis);
child_singleton.cb = JSC.Strong.create(arguments[1], globalThis);
try child_singleton.flush(globalThis);
Expand All @@ -85,17 +93,16 @@ pub fn handleInternalMessageChild(globalThis: *JSC.JSGlobalObject, message: JSC.
try child_singleton.dispatch(message, globalThis);
}

//
//
//

// TODO: rewrite this code.
/// Queue for messages sent between parent and child processes in an IPC environment. node:cluster sends json serialized messages
/// to describe different events it performs. It will send a message with an incrementing sequence number and then call a callback
/// when a message is recieved with an 'ack' property of the same sequence number.
pub const InternalMsgHolder = struct {
seq: i32 = 0,
callbacks: std.AutoArrayHashMapUnmanaged(i32, JSC.Strong) = .{},

// TODO: move this to an Array or a JS Object or something which doesn't
// individually create a Strong for every single IPC message...
callbacks: std.AutoArrayHashMapUnmanaged(i32, JSC.Strong) = .{},
worker: JSC.Strong = .{},
cb: JSC.Strong = .{},
messages: std.ArrayListUnmanaged(JSC.Strong) = .{},
Expand Down Expand Up @@ -211,6 +218,7 @@ pub fn onInternalMessagePrimary(globalThis: *JSC.JSGlobalObject, callframe: *JSC
const arguments = callframe.arguments_old(3).ptr;
const subprocess = arguments[0].as(bun.JSC.Subprocess).?;
const ipc_data = subprocess.ipc() orelse return .undefined;
// TODO: remove these strongs.
ipc_data.internal_msg_queue.worker = JSC.Strong.create(arguments[1], globalThis);
ipc_data.internal_msg_queue.cb = JSC.Strong.create(arguments[2], globalThis);
return .undefined;
Expand All @@ -221,6 +229,7 @@ pub fn handleInternalMessagePrimary(globalThis: *JSC.JSGlobalObject, subprocess:

const event_loop = globalThis.bunVM().eventLoop();

// TODO: investigate if "ack" and "seq" are observable and if they're not, remove them entirely.
if (try message.get(globalThis, "ack")) |p| {
if (!p.isUndefined()) {
const ack = p.toInt32();
Expand Down

0 comments on commit 399ec3e

Please sign in to comment.