Skip to content

Commit 084c994

Browse files
committed
stream: reject iter consumers on abort
Make stream/iter async consumers observe abort signals while waiting for a pending async iterator read. This lets bytes(), text(), arrayBuffer(), and array() reject promptly with the abort reason instead of waiting for another batch. Move the shared abort-aware iterator wrapper to stream/iter utils so pull and consumers use the same helper. Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5
1 parent 4d3198c commit 084c994

4 files changed

Lines changed: 116 additions & 84 deletions

File tree

lib/internal/streams/iter/consumers.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ const {
4848

4949
const {
5050
concatBytes,
51+
yieldAbortable,
5152
} = require('internal/streams/iter/utils');
5253

5354
const {
@@ -121,7 +122,9 @@ async function collectAsync(source, signal, limit) {
121122
signal?.throwIfAborted();
122123

123124
// Normalize source via from() - accepts strings, ArrayBuffers, protocols, etc.
124-
const normalized = from(source);
125+
const abortableSource = signal && isAsyncIterable(source) ?
126+
yieldAbortable(source, signal) : source;
127+
const normalized = from(abortableSource);
125128
const chunks = [];
126129

127130
// Fast path: no signal and no limit
@@ -136,8 +139,9 @@ async function collectAsync(source, signal, limit) {
136139

137140
// Slow path: with signal or limit checks
138141
let totalBytes = 0;
142+
const iterable = signal ? yieldAbortable(normalized, signal) : normalized;
139143

140-
for await (const batch of normalized) {
144+
for await (const batch of iterable) {
141145
signal?.throwIfAborted();
142146
for (let i = 0; i < batch.length; i++) {
143147
const chunk = batch[i];

lib/internal/streams/iter/pull.js

Lines changed: 1 addition & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,12 @@ const {
1414
ArrayPrototypeSlice,
1515
PromisePrototypeThen,
1616
PromiseResolve,
17-
PromiseWithResolvers,
18-
SafePromisePrototypeFinally,
19-
SafePromiseRace,
2017
SymbolAsyncIterator,
2118
SymbolIterator,
2219
TypedArrayPrototypeGetByteLength,
2320
Uint8Array,
2421
} = primordials;
2522

26-
const {
27-
markPromiseAsHandled,
28-
} = internalBinding('util');
29-
3023
const {
3124
codes: {
3225
ERR_INVALID_ARG_TYPE,
@@ -60,6 +53,7 @@ const {
6053
parsePullArgs,
6154
toUint8Array,
6255
wrapError,
56+
yieldAbortable,
6357
} = require('internal/streams/iter/utils');
6458

6559
const {
@@ -690,81 +684,6 @@ async function* applyValidatedStatefulAsyncTransform(source, transform, options)
690684
options.signal?.throwIfAborted();
691685
}
692686

693-
function getOnAbort(reject, signal) {
694-
return () => reject(signal.reason);
695-
}
696-
697-
/**
698-
* Read one item from an async iterator, rejecting early if the signal aborts.
699-
* @param {AsyncIterator} iterator - The iterator to read from.
700-
* @param {AbortSignal|undefined} signal - Optional abort signal.
701-
* @returns {Promise<IteratorResult<Uint8Array[]>>|IteratorResult<Uint8Array[]>}
702-
*/
703-
function abortableNext(iterator, signal) {
704-
if (signal === undefined) {
705-
return iterator.next();
706-
}
707-
708-
signal.throwIfAborted();
709-
710-
const next = iterator.next();
711-
const { promise, reject } = PromiseWithResolvers();
712-
const onAbort = getOnAbort(reject, signal);
713-
signal.addEventListener('abort', onAbort, { __proto__: null, once: true });
714-
if (signal.aborted) {
715-
onAbort();
716-
}
717-
718-
return SafePromisePrototypeFinally(SafePromiseRace([next, promise]), () => {
719-
signal.removeEventListener('abort', onAbort);
720-
});
721-
}
722-
723-
/**
724-
* Wrap an async source so each pending read is abort-aware.
725-
* @param {AsyncIterable<Uint8Array[]>} source - The source to read from.
726-
* @param {AbortSignal|undefined} signal - Optional abort signal.
727-
* @returns {AsyncIterable<Uint8Array[]>}
728-
*/
729-
function yieldAbortable(source, signal) {
730-
if (signal === undefined) {
731-
return source;
732-
}
733-
734-
return {
735-
__proto__: null,
736-
async *[SymbolAsyncIterator]() {
737-
const iterator = source[SymbolAsyncIterator]();
738-
let completed = false;
739-
let aborted = false;
740-
741-
try {
742-
while (true) {
743-
const { done, value } = await abortableNext(iterator, signal);
744-
if (done) {
745-
completed = true;
746-
return;
747-
}
748-
signal.throwIfAborted();
749-
yield value;
750-
}
751-
} catch (error) {
752-
aborted = signal.aborted;
753-
throw error;
754-
} finally {
755-
if (!completed && typeof iterator.return === 'function') {
756-
const result = iterator.return();
757-
if (aborted) {
758-
markPromiseAsHandled(result);
759-
} else {
760-
await result;
761-
}
762-
}
763-
}
764-
},
765-
};
766-
}
767-
768687
/**
769688
* Create an async pipeline from source through transforms.
770689
* @yields {Uint8Array[]}

lib/internal/streams/iter/utils.js

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,22 @@ const {
88
MathMin,
99
NumberMAX_SAFE_INTEGER,
1010
PromiseResolve,
11+
PromiseWithResolvers,
12+
SafePromisePrototypeFinally,
13+
SafePromiseRace,
1114
String,
15+
SymbolAsyncIterator,
1216
TypedArrayPrototypeGetBuffer,
1317
TypedArrayPrototypeGetByteLength,
1418
TypedArrayPrototypeGetByteOffset,
1519
TypedArrayPrototypeSet,
1620
Uint8Array,
1721
} = primordials;
1822

23+
const {
24+
markPromiseAsHandled,
25+
} = internalBinding('util');
26+
1927
const { TextEncoder } = require('internal/encoding');
2028
const {
2129
codes: {
@@ -69,6 +77,81 @@ function onSignalAbort(signal, handler) {
6977
}
7078
}
7179

80+
function getOnAbort(reject, signal) {
81+
return () => reject(signal.reason);
82+
}
83+
84+
/**
85+
* Read one item from an async iterator, rejecting early if the signal aborts.
86+
* @param {AsyncIterator} iterator - The iterator to read from.
87+
* @param {AbortSignal|undefined} signal - Optional abort signal.
88+
* @returns {Promise<IteratorResult<Uint8Array[]>>|IteratorResult<Uint8Array[]>}
89+
*/
90+
function abortableNext(iterator, signal) {
91+
if (signal === undefined) {
92+
return iterator.next();
93+
}
94+
95+
signal.throwIfAborted();
96+
97+
const next = iterator.next();
98+
const { promise, reject } = PromiseWithResolvers();
99+
const onAbort = getOnAbort(reject, signal);
100+
signal.addEventListener('abort', onAbort, { __proto__: null, once: true });
101+
if (signal.aborted) {
102+
onAbort();
103+
}
104+
105+
return SafePromisePrototypeFinally(SafePromiseRace([next, promise]), () => {
106+
signal.removeEventListener('abort', onAbort);
107+
});
108+
}
109+
110+
/**
111+
* Wrap an async source so each pending read is abort-aware.
112+
* @param {AsyncIterable<Uint8Array[]>} source - The source to read from.
113+
* @param {AbortSignal|undefined} signal - Optional abort signal.
114+
* @returns {AsyncIterable<Uint8Array[]>}
115+
*/
116+
function yieldAbortable(source, signal) {
117+
if (signal === undefined) {
118+
return source;
119+
}
120+
121+
return {
122+
__proto__: null,
123+
async *[SymbolAsyncIterator]() {
124+
const iterator = source[SymbolAsyncIterator]();
125+
let completed = false;
126+
let aborted = false;
127+
128+
try {
129+
while (true) {
130+
const { done, value } = await abortableNext(iterator, signal);
131+
if (done) {
132+
completed = true;
133+
return;
134+
}
135+
signal.throwIfAborted();
136+
yield value;
137+
}
138+
} catch (error) {
139+
aborted = signal.aborted;
140+
throw error;
141+
} finally {
142+
if (!completed && typeof iterator.return === 'function') {
143+
const result = iterator.return();
144+
if (aborted) {
145+
markPromiseAsHandled(result);
146+
} else {
147+
await result;
148+
}
149+
}
150+
}
151+
},
152+
};
153+
}
154+
72155
/**
73156
* Compute the minimum cursor across a set of consumers and count how many
74157
* consumers are at that cursor.
@@ -301,4 +384,5 @@ module.exports = {
301384
toUint8Array,
302385
validateBackpressure,
303386
wrapError,
387+
yieldAbortable,
304388
};

test/parallel/test-stream-iter-consumers-bytes.js

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,30 @@ async function testBytesAsyncAbort() {
5151
);
5252
}
5353

54+
async function testAsyncConsumersAbortPendingNext() {
55+
const consumers = [
56+
['bytes', bytes],
57+
['text', text],
58+
['arrayBuffer', arrayBuffer],
59+
['array', array],
60+
];
61+
62+
for (const [name, consumer] of consumers) {
63+
const ac = new AbortController();
64+
const reason = new Error(`${name} boom`);
65+
66+
async function* never() {
67+
await new Promise(() => {});
68+
yield [];
69+
}
70+
71+
const promise = consumer(never(), { __proto__: null, signal: ac.signal });
72+
ac.abort(reason);
73+
74+
await assert.rejects(promise, reason);
75+
}
76+
}
77+
5478
async function testBytesEmpty() {
5579
const data = await bytes(from([]));
5680
assert.ok(data instanceof Uint8Array);
@@ -203,6 +227,7 @@ Promise.all([
203227
testBytesAsync(),
204228
testBytesAsyncLimit(),
205229
testBytesAsyncAbort(),
230+
testAsyncConsumersAbortPendingNext(),
206231
testBytesEmpty(),
207232
testArrayBufferSyncBasic(),
208233
testArrayBufferAsync(),

0 commit comments

Comments
 (0)