Skip to content

Commit 19197f6

Browse files
committed
quic: extract transport logic from Application to Session
Two small fixes notably included: - Fixed a SendPendingData but where datagrams would stall on an idle connection in the nwrite == 0 branch. - Check is_destroyed() after StreamCommit, since it calls JS callbacks which could destroy the session.
1 parent c612f35 commit 19197f6

9 files changed

Lines changed: 615 additions & 553 deletions

File tree

src/quic/README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ data channels that carry application data.
7777

7878
Every entry point that may generate outbound data creates a
7979
`SendPendingDataScope`. Scopes nest — an internal depth counter ensures
80-
`Application::SendPendingData()` is called exactly once, when the outermost
80+
`Session::SendPendingData()` is called exactly once, when the outermost
8181
scope exits:
8282

8383
```cpp
@@ -218,13 +218,13 @@ Session::Receive()
218218

219219
```text
220220
SendPendingDataScope::~SendPendingDataScope()
221-
Application::SendPendingData()
221+
Session::SendPendingData()
222222
Loop (up to max_packet_count):
223-
├── GetStreamData() // pull data from next stream
224-
│ └── stream->Pull() // bob pull from Outbound→DataQueue
225-
├── WriteVStream() // ngtcp2_conn_writev_stream()
223+
├── application().GetStreamData() // pull data from next stream
224+
│ └── stream->Pull() // bob pull from Outbound→DataQueue
225+
├── WriteVStream() // ngtcp2_conn_writev_stream()
226226
│ encrypts, frames, paces
227-
├── if ndatalen > 0: StreamCommit()
227+
├── if ndatalen > 0: application().StreamCommit()
228228
│ stream->Commit(datalen, fin)
229229
├── if nwrite > 0: Send() // uv_udp_send()
230230
├── if WRITE_MORE: continue // room for more in this packet

src/quic/application.cc

Lines changed: 0 additions & 442 deletions
Large diffs are not rendered by default.

src/quic/application.h

Lines changed: 0 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -205,10 +205,6 @@ class Session::Application : public MemoryRetainer {
205205
return false;
206206
}
207207

208-
// Signals to the Application that it should serialize and transmit any
209-
// pending session and stream packets it has accumulated.
210-
void SendPendingData();
211-
212208
// Returns true if the application protocol supports sending and
213209
// receiving headers on streams (e.g. HTTP/3). Applications that
214210
// do not support headers should return false (the default).
@@ -243,10 +239,6 @@ class Session::Application : public MemoryRetainer {
243239
return {StreamPriority::DEFAULT, StreamPriorityFlags::NON_INCREMENTAL};
244240
}
245241

246-
// The StreamData struct is used by the application to pass pending stream
247-
// data to the session for transmission.
248-
struct StreamData;
249-
250242
virtual int GetStreamData(StreamData* data) = 0;
251243
virtual bool StreamCommit(StreamData* data, size_t datalen) = 0;
252244

@@ -262,57 +254,9 @@ class Session::Application : public MemoryRetainer {
262254
}
263255

264256
private:
265-
Packet::Ptr CreateStreamDataPacket();
266-
267-
// Tries to pack a pending datagram into the current packet buffer.
268-
// If < 0 is returned, either NGTCP2_ERR_WRITE_MORE or a fatal error is
269-
// returned; the caller must check. If > 0 is returned, the packet is done
270-
// and the value is the size of the finalized packet. If 0 is returned,
271-
// the datagram is either congestion limited or was abandoned
272-
ssize_t TryWritePendingDatagram(PathStorage* path,
273-
uint8_t* dest,
274-
size_t destlen,
275-
uint64_t ts);
276-
277-
// Write the given stream_data into the buffer. The PacketInfo out-param
278-
// is populated by ngtcp2 with per-packet metadata (e.g., ECN codepoint)
279-
// that should be applied when sending the packet.
280-
ssize_t WriteVStream(PathStorage* path,
281-
PacketInfo* pi,
282-
uint8_t* buf,
283-
ssize_t* ndatalen,
284-
size_t max_packet_size,
285-
const StreamData& stream_data,
286-
uint64_t ts);
287-
288257
Session* session_ = nullptr;
289258
};
290259

291-
struct Session::Application::StreamData final {
292-
// The actual number of vectors in the struct, up to kMaxVectorCount.
293-
size_t count = 0;
294-
// The stream identifier. If this is a negative value then no stream is
295-
// identified.
296-
stream_id id = -1;
297-
int fin = 0;
298-
ngtcp2_vec data[kMaxVectorCount]{};
299-
BaseObjectPtr<Stream> stream;
300-
301-
static_assert(sizeof(ngtcp2_vec) == sizeof(nghttp3_vec) &&
302-
alignof(ngtcp2_vec) == alignof(nghttp3_vec) &&
303-
offsetof(ngtcp2_vec, base) == offsetof(nghttp3_vec, base) &&
304-
offsetof(ngtcp2_vec, len) == offsetof(nghttp3_vec, len),
305-
"ngtcp2_vec and nghttp3_vec must have identical layout");
306-
inline operator nghttp3_vec*() {
307-
return reinterpret_cast<nghttp3_vec*>(data);
308-
}
309-
310-
inline operator const ngtcp2_vec*() const { return data; }
311-
inline operator ngtcp2_vec*() { return data; }
312-
313-
std::string ToString() const;
314-
};
315-
316260
// Create a DefaultApplication for the given session.
317261
std::unique_ptr<Session::Application> CreateDefaultApplication(
318262
Session* session, const Session::Application_Options& options);

src/quic/http3.cc

Lines changed: 35 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -209,24 +209,25 @@ class Http3ApplicationImpl final : public Session::Application {
209209
started_ = true;
210210
Debug(&session(), "Starting HTTP/3 application.");
211211

212-
auto params = ngtcp2_conn_get_remote_transport_params(session());
213-
if (params == nullptr) [[unlikely]] {
212+
const auto params = session().remote_transport_params();
213+
if (!params) [[unlikely]] {
214214
// The params are not available yet. Cannot start.
215215
Debug(&session(),
216216
"Cannot start HTTP/3 application yet. No remote transport params");
217217
return false;
218218
}
219219

220-
if (params->initial_max_streams_uni < 3) {
220+
if (params.initial_max_streams_uni() < 3) {
221221
// HTTP3 requires 3 unidirectional control streams to be opened in each
222222
// direction in additional to the bidirectional streams that are used to
223223
// actually carry request and response payload back and forth.
224224
// See:
225225
// https://nghttp2.org/nghttp3/programmers-guide.html#binding-control-streams
226226
Debug(&session(),
227227
"Cannot start HTTP/3 application. Initial max "
228-
"unidirectional streams [%zu] is too low. Must be at least 3",
229-
params->initial_max_streams_uni);
228+
"unidirectional streams [%" PRIu64
229+
"] is too low. Must be at least 3",
230+
params.initial_max_streams_uni());
230231
return false;
231232
}
232233

@@ -235,17 +236,14 @@ class Http3ApplicationImpl final : public Session::Application {
235236
// of requests that the client can actually created.
236237
if (session().is_server()) {
237238
nghttp3_conn_set_max_client_streams_bidi(
238-
*this, params->initial_max_streams_bidi);
239+
*this, params.initial_max_streams_bidi());
239240
}
240241

241242
Debug(&session(), "Creating and binding HTTP/3 control streams");
242243
bool ret =
243-
ngtcp2_conn_open_uni_stream(session(), &control_stream_id_, nullptr) ==
244-
0 &&
245-
ngtcp2_conn_open_uni_stream(
246-
session(), &qpack_enc_stream_id_, nullptr) == 0 &&
247-
ngtcp2_conn_open_uni_stream(
248-
session(), &qpack_dec_stream_id_, nullptr) == 0 &&
244+
session().OpenUni(&control_stream_id_) &&
245+
session().OpenUni(&qpack_enc_stream_id_) &&
246+
session().OpenUni(&qpack_dec_stream_id_) &&
249247
nghttp3_conn_bind_control_stream(*this, control_stream_id_) == 0 &&
250248
nghttp3_conn_bind_qpack_streams(
251249
*this, qpack_enc_stream_id_, qpack_dec_stream_id_) == 0;
@@ -306,8 +304,7 @@ class Http3ApplicationImpl final : public Session::Application {
306304
Debug(&session(),
307305
"Extending stream and connection offset by %zd bytes",
308306
nread);
309-
session().ExtendStreamOffset(id, nread);
310-
session().ExtendOffset(nread);
307+
session().Consume(id, nread);
311308
}
312309

313310
// If this data arrived as 0-RTT, mark the stream. We set it after
@@ -365,24 +362,11 @@ class Http3ApplicationImpl final : public Session::Application {
365362
case EndpointLabel::LOCAL:
366363
return;
367364
case EndpointLabel::REMOTE: {
368-
switch (direction) {
369-
case Direction::BIDIRECTIONAL: {
370-
Debug(&session(),
371-
"HTTP/3 application extending max bidi streams by %" PRIu64,
372-
max_streams);
373-
ngtcp2_conn_extend_max_streams_bidi(
374-
session(), static_cast<size_t>(max_streams));
375-
break;
376-
}
377-
case Direction::UNIDIRECTIONAL: {
378-
Debug(&session(),
379-
"HTTP/3 application extending max uni streams by %" PRIu64,
380-
max_streams);
381-
ngtcp2_conn_extend_max_streams_uni(
382-
session(), static_cast<size_t>(max_streams));
383-
break;
384-
}
385-
}
365+
Debug(&session(),
366+
"HTTP/3 application extending max %s streams by %" PRIu64,
367+
direction == Direction::BIDIRECTIONAL ? "bidi" : "uni",
368+
max_streams);
369+
session().ExtendMaxStreams(direction, max_streams);
386370
}
387371
}
388372
}
@@ -530,8 +514,7 @@ class Http3ApplicationImpl final : public Session::Application {
530514
return;
531515
}
532516

533-
session().SetLastError(
534-
QuicError::ForApplication(nghttp3_err_infer_quic_app_error_code(rv)));
517+
session().SetApplicationError(nghttp3_err_infer_quic_app_error_code(rv));
535518
session().Close();
536519
}
537520

@@ -548,8 +531,7 @@ class Http3ApplicationImpl final : public Session::Application {
548531
return;
549532
}
550533

551-
session().SetLastError(
552-
QuicError::ForApplication(nghttp3_err_infer_quic_app_error_code(rv)));
534+
session().SetApplicationError(nghttp3_err_infer_quic_app_error_code(rv));
553535
session().Close();
554536
}
555537

@@ -688,12 +670,22 @@ class Http3ApplicationImpl final : public Session::Application {
688670
}
689671

690672
int GetStreamData(StreamData* data) override {
673+
static_assert(
674+
sizeof(ngtcp2_vec) == sizeof(nghttp3_vec) &&
675+
alignof(ngtcp2_vec) == alignof(nghttp3_vec) &&
676+
offsetof(ngtcp2_vec, base) == offsetof(nghttp3_vec, base) &&
677+
offsetof(ngtcp2_vec, len) == offsetof(nghttp3_vec, len),
678+
"ngtcp2_vec and nghttp3_vec must have identical layout");
691679
data->count = kMaxVectorCount;
692680
ssize_t ret = 0;
693681
Debug(&session(), "HTTP/3 application getting stream data");
694682
if (conn_ && session().max_data_left()) {
695-
ret = nghttp3_conn_writev_stream(
696-
*this, &data->id, &data->fin, *data, data->count);
683+
ret =
684+
nghttp3_conn_writev_stream(*this,
685+
&data->id,
686+
&data->fin,
687+
reinterpret_cast<nghttp3_vec*>(data->data),
688+
data->count);
697689
// A negative return value indicates an error.
698690
if (ret < 0) {
699691
return static_cast<int>(ret);
@@ -720,8 +712,7 @@ class Http3ApplicationImpl final : public Session::Application {
720712
// nghttp3 tracks its own offset via add_write_offset.
721713
int err = nghttp3_conn_add_write_offset(*this, data->id, datalen);
722714
if (err != 0) {
723-
session().SetLastError(QuicError::ForApplication(
724-
nghttp3_err_infer_quic_app_error_code(err)));
715+
session().SetApplicationError(nghttp3_err_infer_quic_app_error_code(err));
725716
return false;
726717
}
727718
// Raw application bytes are committed to the stream's outbound
@@ -1211,10 +1202,10 @@ class Http3ApplicationImpl final : public Session::Application {
12111202
void* conn_user_data,
12121203
void* stream_user_data) {
12131204
NGHTTP3_CALLBACK_SCOPE(app);
1214-
auto& session = app.session();
1215-
Debug(&session, "HTTP/3 application deferred consume %zu bytes", consumed);
1216-
session.ExtendStreamOffset(id, consumed);
1217-
session.ExtendOffset(consumed);
1205+
Debug(&app.session(),
1206+
"HTTP/3 application deferred consume %zu bytes",
1207+
consumed);
1208+
app.session().Consume(id, consumed);
12181209
return NGTCP2_SUCCESS;
12191210
}
12201211

0 commit comments

Comments
 (0)