Skip to content

Commit f4b3a00

Browse files
committed
quic: use http3Binding to drop header & priority stream logic from QUIC
1 parent 6a23394 commit f4b3a00

21 files changed

Lines changed: 692 additions & 551 deletions

lib/internal/quic/http3.js

Lines changed: 119 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
const {
44
ArrayIsArray,
55
ArrayPrototypePush,
6+
ObjectHasOwn,
67
Symbol,
78
SymbolAsyncIterator,
89
} = primordials;
@@ -40,6 +41,7 @@ const {
4041

4142
const {
4243
setHttp3Callbacks,
44+
createHttp3Handle,
4345

4446
QUIC_STREAM_HEADERS_KIND_INITIAL: kHeadersKindInitial,
4547
QUIC_STREAM_HEADERS_KIND_HINTS: kHeadersKindHints,
@@ -65,8 +67,10 @@ const {
6567
} = require('internal/quic/diagnostics');
6668

6769
const {
70+
validateBoolean,
6871
validateFunction,
6972
validateObject,
73+
validateOneOf,
7074
} = require('internal/validators');
7175

7276
const {
@@ -128,14 +132,29 @@ function parseHeaderPairs(pairs) {
128132
* will be sent after these headers.
129133
*/
130134

135+
const kGetHttp3Handle = Symbol('kGetHttp3Handle');
136+
const kSubmitInitialHeaders = Symbol('kSubmitInitialHeaders');
137+
138+
function priorityFieldValue(level, incremental) {
139+
const urgency = level === 'high' ? 0 : level === 'low' ? 7 : 3;
140+
if (urgency === 3 && !incremental) return undefined;
141+
return incremental ? `u=${urgency}, i` : `u=${urgency}`;
142+
}
143+
131144
class Http3Stream {
132145
#stream;
133146
#session;
147+
#h3handle;
134148
#headers = undefined;
135149
#onheaders = undefined;
136150
#oninfo = undefined;
137151
#ontrailers = undefined;
138152
#onwanttrailers = undefined;
153+
// Client-side stored priority (the value this side requested). The server
154+
// side reads the peer's requested priority from nghttp3 instead.
155+
#priority = { __proto__: null, level: 'default', incremental: false };
156+
// Have we submitted an initial header block (request/response) to nghttp3?
157+
#headersSubmitted = false;
139158

140159
/**
141160
* @param {QuicStream} stream the underlying QUIC stream
@@ -148,6 +167,7 @@ class Http3Stream {
148167
}
149168
this.#stream = stream;
150169
this.#session = session;
170+
this.#h3handle = session[kGetHttp3Handle]();
151171
const handle = stream[kStreamHandle];
152172
if (handle !== undefined) {
153173
handle[kApplicationOwner] = this;
@@ -241,9 +261,57 @@ class Http3Stream {
241261
// True when the stream's data was received as 0-RTT early data.
242262
get early() { return this.#stream.early; }
243263

244-
get priority() { return this.#stream.priority; }
264+
get #isServer() {
265+
return getQuicSessionState(this.#session.session).isServer;
266+
}
245267

246-
setPriority(options) { return this.#stream.setPriority(options); }
268+
get #knownToNgHttp3() {
269+
return this.#isServer || this.#headersSubmitted;
270+
}
271+
272+
// The stream's priority. On a client this is the value we requested; on a
273+
// server it is the peer's requested priority read from nghttp3.
274+
get priority() {
275+
const stream = this.#stream;
276+
if (stream.destroyed || this.#h3handle === undefined) return null;
277+
if (!this.#isServer) {
278+
return { level: this.#priority.level,
279+
incremental: this.#priority.incremental };
280+
}
281+
const packed = this.#h3handle.getPriority(stream[kStreamHandle]);
282+
if (packed === undefined) return null;
283+
const urgency = packed >> 1;
284+
const incremental = !!(packed & 1);
285+
const level = urgency < 3 ? 'high' : urgency > 3 ? 'low' : 'default';
286+
return { level, incremental };
287+
}
288+
289+
setPriority(options = kEmptyObject) {
290+
const stream = this.#stream;
291+
if (stream.destroyed) return;
292+
validateObject(options, 'options');
293+
const { level = 'default', incremental = false } = options;
294+
validateOneOf(level, 'options.level', ['default', 'low', 'high']);
295+
validateBoolean(incremental, 'options.incremental');
296+
this.#priority = { __proto__: null, level, incremental };
297+
298+
// Before the stream is known to nghttp3 (client, pre-submit) the requested
299+
// priority is carried in the initial header block by sendHeaders;
300+
// afterwards a change is signaled with a PRIORITY_UPDATE frame.
301+
if (this.#knownToNgHttp3 && this.#h3handle !== undefined) {
302+
const urgency = level === 'high' ? 0 : level === 'low' ? 7 : 3;
303+
this.#h3handle.setPriority(
304+
stream[kStreamHandle], (urgency << 1) | (incremental ? 1 : 0));
305+
}
306+
}
307+
308+
[kSubmitInitialHeaders](headerString, flags) {
309+
const stream = this.#stream;
310+
if (stream.destroyed || this.#h3handle === undefined) return false;
311+
this.#headersSubmitted = true;
312+
return this.#h3handle.sendHeaders(
313+
stream[kStreamHandle], headerString, flags);
314+
}
247315

248316
#updateHeaderInterest() {
249317
getQuicStreamState(this.#stream).wantsHeaders =
@@ -303,14 +371,29 @@ class Http3Stream {
303371
*/
304372
sendHeaders(headers, options = kEmptyObject) {
305373
const stream = this.#stream;
306-
if (stream.destroyed) return false;
374+
if (stream.destroyed || this.#h3handle === undefined) return false;
307375
validateObject(headers, 'headers');
308376
const { terminal = false } = options;
377+
378+
// A client request carries its requested priority as a priority header
379+
// when set - server responses signal via setPriority instead.
380+
let toSend = headers;
381+
if (!this.#isServer) {
382+
const pri = priorityFieldValue(
383+
this.#priority.level, this.#priority.incremental);
384+
if (pri !== undefined && !ObjectHasOwn(headers, 'priority')) {
385+
toSend = { __proto__: null, ...headers, priority: pri };
386+
}
387+
}
388+
309389
const headerString = buildNgHeaderString(
310-
headers, assertValidPseudoHeader, true /* strictSingleValueFields */);
390+
toSend, assertValidPseudoHeader, true /* strictSingleValueFields */);
311391
const flags = terminal ? kHeadersFlagsTerminal : kHeadersFlagsNone;
312-
return stream[kStreamHandle].sendHeaders(
313-
kHeadersKindInitial, headerString, flags);
392+
// The stream now exists in nghttp3; later priority changes use
393+
// PRIORITY_UPDATE rather than the header block.
394+
this.#headersSubmitted = true;
395+
return this.#h3handle.sendHeaders(
396+
stream[kStreamHandle], headerString, flags);
314397
}
315398

316399
/**
@@ -321,11 +404,12 @@ class Http3Stream {
321404
sendInformationalHeaders(headers) {
322405
const stream = this.#stream;
323406
if (stream.destroyed) return false;
407+
if (this.#h3handle === undefined) return false;
324408
validateObject(headers, 'headers');
325409
const headerString = buildNgHeaderString(
326410
headers, assertValidPseudoHeader, true);
327-
return stream[kStreamHandle].sendHeaders(
328-
kHeadersKindHints, headerString, kHeadersFlagsNone);
411+
return this.#h3handle.sendInformationalHeaders(
412+
stream[kStreamHandle], headerString);
329413
}
330414

331415
/**
@@ -337,11 +421,12 @@ class Http3Stream {
337421
sendTrailers(headers) {
338422
const stream = this.#stream;
339423
if (stream.destroyed) return false;
424+
if (this.#h3handle === undefined) return false;
340425
validateObject(headers, 'headers');
341426
const headerString =
342427
buildNgHeaderString(headers, assertValidPseudoHeaderTrailer);
343-
return stream[kStreamHandle].sendHeaders(
344-
kHeadersKindTrailing, headerString, kHeadersFlagsNone);
428+
return this.#h3handle.sendTrailers(
429+
stream[kStreamHandle], headerString);
345430
}
346431

347432
// Outbound body writer (stream/iter push writer).
@@ -366,6 +451,7 @@ class Http3Stream {
366451

367452
class Http3Session {
368453
#session;
454+
#h3handle;
369455
#onstream = undefined;
370456
#ongoaway = undefined;
371457
#onorigin = undefined;
@@ -390,6 +476,11 @@ class Http3Session {
390476
// routed back here.
391477
const handle = session[kSessionHandle];
392478
if (handle !== undefined) {
479+
if (handle[kApplicationOwner] !== undefined) {
480+
throw new ERR_INVALID_STATE(
481+
'The QUIC session already has an application attached');
482+
}
483+
this.#h3handle = createHttp3Handle(handle);
393484
handle[kApplicationOwner] = this;
394485
}
395486
// Claim the session's incoming streams. This takes the single
@@ -412,6 +503,9 @@ class Http3Session {
412503
if (onsettings !== undefined) this.onsettings = onsettings;
413504
}
414505

506+
// Internal: hands the per-session HTTP/3 handle to this session's streams.
507+
[kGetHttp3Handle]() { return this.#h3handle; }
508+
415509
/**
416510
* The peer initiated a graceful shutdown of the session (HTTP/3
417511
* GOAWAY). The session stops allowing new streams.
@@ -579,14 +673,22 @@ class Http3Session {
579673
onwanttrailers,
580674
onreset,
581675
onerror,
676+
priority,
677+
incremental,
582678
...quicOptions
583679
} = options;
584680

585681
let headerString;
586682
if (headers !== undefined) {
683+
let toSend = headers;
684+
const pri = priorityFieldValue(priority ?? 'default', incremental ?? false);
685+
if (pri !== undefined && !ObjectHasOwn(headers, 'priority')) {
686+
toSend = { __proto__: null, ...headers, priority: pri };
687+
}
587688
headerString = buildNgHeaderString(
588-
headers, assertValidPseudoHeader, true /* strictSingleValueFields */);
689+
toSend, assertValidPseudoHeader, true /* strictSingleValueFields */);
589690
}
691+
590692
const stream = await this.#session.createBidirectionalStream(quicOptions);
591693
const wrapped = new Http3Stream(stream, this, {
592694
__proto__: null,
@@ -597,6 +699,11 @@ class Http3Session {
597699
onreset,
598700
onerror,
599701
});
702+
703+
if (priority !== undefined || incremental !== undefined) {
704+
wrapped.setPriority({ __proto__: null, level: priority, incremental });
705+
}
706+
600707
// Submit the request headers only after the callbacks above are
601708
// attached. Nothing for this stream can hit the wire before the
602709
// headers are submitted: the HTTP/3 application only learns about the
@@ -606,8 +713,7 @@ class Http3Session {
606713
if (headerString !== undefined) {
607714
const flags = quicOptions.body === undefined ?
608715
kHeadersFlagsTerminal : kHeadersFlagsNone;
609-
if (!stream[kStreamHandle].sendHeaders(
610-
kHeadersKindInitial, headerString, flags)) {
716+
if (!wrapped[kSubmitInitialHeaders](headerString, flags)) {
611717
wrapped.destroy();
612718
throw new ERR_QUIC_OPEN_STREAM_FAILED();
613719
}

lib/internal/quic/quic.js

Lines changed: 0 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -281,8 +281,6 @@ const endpointRegistry = new SafeSet();
281281
* FileHandle|AsyncIterable|Iterable|Promise|null} [body] The outbound
282282
* body source. See the public docs for `stream.setBody()` for details
283283
* on supported types. When omitted, the stream is closed immediately.
284-
* @property {'high'|'default'|'low'} [priority] The priority level of the stream.
285-
* @property {boolean} [incremental] Whether to interleave data with same-priority streams.
286284
* @property {number} [highWaterMark] The high water mark for write
287285
* backpressure, in bytes. **Default:** `65536`.
288286
*/
@@ -472,12 +470,6 @@ const endpointRegistry = new SafeSet();
472470
* frames do not themselves carry a reason field over the wire.
473471
*/
474472

475-
/**
476-
* @typedef {object} StreamPriority
477-
* @property {'default' | 'low' | 'high'} level The priority level of the stream.
478-
* @property {boolean} incremental Whether to interleave data with same-priority streams.
479-
*/
480-
481473
/**
482474
* @typedef {object} QuicSessionPath
483475
* @property {SocketAddress} local The local address for this path
@@ -2079,45 +2071,6 @@ class QuicStream {
20792071
this.#handle.resetStream(BigInt(code));
20802072
}
20812073

2082-
/**
2083-
* The priority of the stream. If the stream is destroyed or if
2084-
* the session does not support priority, `null` will be
2085-
* returned.
2086-
* @type {StreamPriority | null}
2087-
*/
2088-
get priority() {
2089-
assertIsQuicStream(this);
2090-
if (this.destroyed ||
2091-
!getQuicSessionState(this.#inner.session).isPrioritySupported) return null;
2092-
const packed = this.#handle.getPriority();
2093-
const urgency = packed >> 1;
2094-
const incremental = !!(packed & 1);
2095-
const level = urgency < 3 ? 'high' : urgency > 3 ? 'low' : 'default';
2096-
return { level, incremental };
2097-
}
2098-
2099-
/**
2100-
* Sets the priority of the stream.
2101-
* @param {StreamPriority} [options]
2102-
*/
2103-
setPriority(options = kEmptyObject) {
2104-
assertIsQuicStream(this);
2105-
if (this.destroyed) return;
2106-
if (!getQuicSessionState(this.#inner.session).isPrioritySupported) {
2107-
throw new ERR_INVALID_STATE(
2108-
'The session does not support stream priority');
2109-
}
2110-
validateObject(options, 'options');
2111-
const {
2112-
level = 'default',
2113-
incremental = false,
2114-
} = options;
2115-
validateOneOf(level, 'options.level', ['default', 'low', 'high']);
2116-
validateBoolean(incremental, 'options.incremental');
2117-
const urgency = level === 'high' ? 0 : level === 'low' ? 7 : 3;
2118-
this.#handle.setPriority((urgency << 1) | (incremental ? 1 : 0));
2119-
}
2120-
21212074
[kFinishClose](error) {
21222075
const inner = this.#inner;
21232076
inner.pendingClose ??= PromiseWithResolvers();
@@ -2761,26 +2714,16 @@ class QuicSession {
27612714
validateObject(options, 'options');
27622715
const {
27632716
body,
2764-
priority = 'default',
2765-
incremental = false,
27662717
highWaterMark = kDefaultHighWaterMark,
27672718
} = options;
27682719

2769-
validateOneOf(priority, 'options.priority', ['default', 'low', 'high']);
2770-
validateBoolean(incremental, 'options.incremental');
2771-
27722720
const validatedBody = validateBody(body);
27732721

27742722
const handle = this.#handle.openStream(direction, validatedBody);
27752723
if (handle === undefined) {
27762724
throw new ERR_QUIC_OPEN_STREAM_FAILED();
27772725
}
27782726

2779-
if (inner.state.isPrioritySupported) {
2780-
const urgency = priority === 'high' ? 0 : priority === 'low' ? 7 : 3;
2781-
handle.setPriority((urgency << 1) | (incremental ? 1 : 0));
2782-
}
2783-
27842727
const stream = new QuicStream(
27852728
kPrivateConstructor, handle, this, direction, true /* isLocal */);
27862729
inner.streams.add(stream);

0 commit comments

Comments
 (0)