Skip to content

Commit f2f241a

Browse files
authored
stream: normalize Broadcast.from() byte inputs
Route non-Broadcastable inputs through from(). This makes strings and ArrayBuffer views byte inputs instead of generic iterables. Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5 PR-URL: #64082 Fixes: #64081 Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent 0c59d78 commit f2f241a

3 files changed

Lines changed: 34 additions & 7 deletions

File tree

lib/internal/streams/iter/broadcast.js

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ const {
4444
} = require('internal/streams/iter/types');
4545

4646
const {
47+
from,
4748
isAsyncIterable,
4849
isSyncIterable,
4950
} = require('internal/streams/iter/from');
@@ -799,7 +800,9 @@ const Broadcast = {
799800
return { __proto__: null, writer: { __proto__: null }, broadcast: bc };
800801
}
801802

802-
if (!isAsyncIterable(input) && !isSyncIterable(input)) {
803+
const source = from(input);
804+
805+
if (!isAsyncIterable(source) && !isSyncIterable(source)) {
803806
throw new ERR_INVALID_ARG_TYPE(
804807
'input', ['Broadcastable', 'AsyncIterable', 'Iterable'], input);
805808
}
@@ -810,8 +813,8 @@ const Broadcast = {
810813
const pump = async () => {
811814
const w = result.writer;
812815
try {
813-
if (isAsyncIterable(input)) {
814-
for await (const chunks of input) {
816+
if (isAsyncIterable(source)) {
817+
for await (const chunks of source) {
815818
signal?.throwIfAborted();
816819
if (ArrayIsArray(chunks)) {
817820
if (!w.writevSync(chunks)) {
@@ -821,8 +824,8 @@ const Broadcast = {
821824
await w.write(chunks, signal ? { signal } : undefined);
822825
}
823826
}
824-
} else if (isSyncIterable(input)) {
825-
for (const chunks of input) {
827+
} else if (isSyncIterable(source)) {
828+
for (const chunks of source) {
826829
signal?.throwIfAborted();
827830
if (ArrayIsArray(chunks)) {
828831
if (!w.writevSync(chunks)) {

test/parallel/test-stream-iter-broadcast-from.js

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,28 @@ async function testBroadcastFromStringChunks() {
4343
assert.strictEqual(data, 'foobar');
4444
}
4545

46+
async function testBroadcastFromStringInput() {
47+
const { broadcast: bc } = Broadcast.from('abc');
48+
const consumer = bc.push();
49+
const data = await text(consumer);
50+
assert.strictEqual(data, 'abc');
51+
}
52+
53+
async function testBroadcastFromUint8ArrayInput() {
54+
const { broadcast: bc } = Broadcast.from(new Uint8Array([97]));
55+
const consumer = bc.push();
56+
const data = await text(consumer);
57+
assert.strictEqual(data, 'a');
58+
}
59+
60+
async function testBroadcastFromDataViewInput() {
61+
const view = new DataView(new Uint8Array([104, 105]).buffer);
62+
const { broadcast: bc } = Broadcast.from(view);
63+
const consumer = bc.push();
64+
const data = await text(consumer);
65+
assert.strictEqual(data, 'hi');
66+
}
67+
4668
async function testBroadcastFromMultipleConsumers() {
4769
const source = from('shared-data');
4870
const { broadcast: bc } = Broadcast.from(source);
@@ -180,6 +202,9 @@ Promise.all([
180202
testBroadcastFromAsyncIterable(),
181203
testBroadcastFromNonArrayChunks(),
182204
testBroadcastFromStringChunks(),
205+
testBroadcastFromStringInput(),
206+
testBroadcastFromUint8ArrayInput(),
207+
testBroadcastFromDataViewInput(),
183208
testBroadcastFromMultipleConsumers(),
184209
testAbortSignal(),
185210
testAlreadyAbortedSignal(),

test/parallel/test-stream-iter-validation.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,8 @@ assert.throws(() => broadcast({ backpressure: 'bad' }), { code: 'ERR_INVALID_ARG
157157
writer.endSync();
158158
}
159159

160-
// Broadcast.from rejects non-iterable input
160+
// Broadcast.from rejects non-streamable input
161161
assert.throws(() => Broadcast.from(42), { code: 'ERR_INVALID_ARG_TYPE' });
162-
assert.throws(() => Broadcast.from('bad'), { code: 'ERR_INVALID_ARG_TYPE' });
163162

164163
// =============================================================================
165164
// share() / shareSync() validation

0 commit comments

Comments
 (0)