Skip to content

Commit df9a262

Browse files
dswijseanmonstar
authored andcommitted
fix: stream flow control insufficient size before ack (#746)
1 parent 94e80b1 commit df9a262

File tree

4 files changed

+84
-8
lines changed

4 files changed

+84
-8
lines changed

src/proto/connection.rs

-4
Original file line numberDiff line numberDiff line change
@@ -106,10 +106,6 @@ where
106106
pub fn new(codec: Codec<T, Prioritized<B>>, config: Config) -> Connection<T, P, B> {
107107
fn streams_config(config: &Config) -> streams::Config {
108108
streams::Config {
109-
local_init_window_sz: config
110-
.settings
111-
.initial_window_size()
112-
.unwrap_or(DEFAULT_INITIAL_WINDOW_SIZE),
113109
initial_max_send_streams: config.initial_max_send_streams,
114110
local_max_buffer_size: config.max_send_buffer_size,
115111
local_next_stream_id: config.next_stream_id,

src/proto/streams/mod.rs

-3
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,6 @@ use std::time::Duration;
3333

3434
#[derive(Debug)]
3535
pub struct Config {
36-
/// Initial window size of locally initiated streams
37-
pub local_init_window_sz: WindowSize,
38-
3936
/// Initial maximum number of locally initiated streams.
4037
/// After receiving a Settings frame from the remote peer,
4138
/// the connection will overwrite this value with the

src/proto/streams/recv.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ impl Recv {
9393
flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE).unwrap();
9494

9595
Recv {
96-
init_window_sz: config.local_init_window_sz,
96+
init_window_sz: DEFAULT_INITIAL_WINDOW_SIZE,
9797
flow,
9898
in_flight_data: 0 as WindowSize,
9999
next_stream_id: Ok(next_stream_id.into()),

tests/h2-tests/tests/server.rs

+83
Original file line numberDiff line numberDiff line change
@@ -1415,3 +1415,86 @@ async fn reject_informational_status_header_in_request() {
14151415

14161416
join(client, srv).await;
14171417
}
1418+
1419+
#[tokio::test]
1420+
async fn client_drop_connection_without_close_notify() {
1421+
h2_support::trace_init!();
1422+
1423+
let (io, mut client) = mock::new();
1424+
let client = async move {
1425+
let _recv_settings = client.assert_server_handshake().await;
1426+
client
1427+
.send_frame(frames::headers(1).request("GET", "https://example.com/"))
1428+
.await;
1429+
client.send_frame(frames::data(1, &b"hello"[..])).await;
1430+
client.recv_frame(frames::headers(1).response(200)).await;
1431+
1432+
client.close_without_notify(); // Client closed without notify causing UnexpectedEof
1433+
};
1434+
1435+
let mut builder = server::Builder::new();
1436+
builder.max_concurrent_streams(1);
1437+
1438+
let h2 = async move {
1439+
let mut srv = builder.handshake::<_, Bytes>(io).await.expect("handshake");
1440+
let (req, mut stream) = srv.next().await.unwrap().unwrap();
1441+
1442+
assert_eq!(req.method(), &http::Method::GET);
1443+
1444+
let rsp = http::Response::builder().status(200).body(()).unwrap();
1445+
stream.send_response(rsp, false).unwrap();
1446+
1447+
// Step the conn state forward and hitting the EOF
1448+
// But we have no outstanding request from client to be satisfied, so we should not return
1449+
// an error
1450+
let _ = poll_fn(|cx| srv.poll_closed(cx)).await.unwrap();
1451+
};
1452+
1453+
join(client, h2).await;
1454+
}
1455+
1456+
#[tokio::test]
1457+
async fn init_window_size_smaller_than_default_should_use_default_before_ack() {
1458+
h2_support::trace_init!();
1459+
1460+
let (io, mut client) = mock::new();
1461+
let client = async move {
1462+
// Client can send in some data before ACK;
1463+
// Server needs to make sure the Recv stream has default window size
1464+
// as per https://datatracker.ietf.org/doc/html/rfc9113#name-initial-flow-control-window
1465+
client.write_preface().await;
1466+
client
1467+
.send(frame::Settings::default().into())
1468+
.await
1469+
.unwrap();
1470+
client.next().await.expect("unexpected EOF").unwrap();
1471+
client
1472+
.send_frame(frames::headers(1).request("GET", "https://example.com/"))
1473+
.await;
1474+
client.send_frame(frames::data(1, &b"hello"[..])).await;
1475+
client.send(frame::Settings::ack().into()).await.unwrap();
1476+
client.next().await;
1477+
client
1478+
.recv_frame(frames::headers(1).response(200).eos())
1479+
.await;
1480+
};
1481+
1482+
let mut builder = server::Builder::new();
1483+
builder.max_concurrent_streams(1);
1484+
builder.initial_window_size(1);
1485+
let h2 = async move {
1486+
let mut srv = builder.handshake::<_, Bytes>(io).await.expect("handshake");
1487+
let (req, mut stream) = srv.next().await.unwrap().unwrap();
1488+
1489+
assert_eq!(req.method(), &http::Method::GET);
1490+
1491+
let rsp = http::Response::builder().status(200).body(()).unwrap();
1492+
stream.send_response(rsp, true).unwrap();
1493+
1494+
// Drive the state forward
1495+
let _ = poll_fn(|cx| srv.poll_closed(cx)).await.unwrap();
1496+
};
1497+
1498+
join(client, h2).await;
1499+
}
1500+

0 commit comments

Comments
 (0)