Skip to content

Commit 7dbb5c5

Browse files
authored
streams: notify recv task upon reset (#791)
Before this change, the transition to the reset state wouldn't notify tasks that were waiting for a response. The motivating case for this patch involved a large header being sent by the server. This case was mostly tested by an existing test, but because that test did not spawn separate tasks and kept polling the futures through its use of `conn.drive`, the missing notify was masked. Informs hyperium/hyper#3724.
1 parent cf95990 commit 7dbb5c5

File tree

2 files changed

+21
-9
lines changed

2 files changed

+21
-9
lines changed

src/proto/streams/send.rs

+3
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,9 @@ impl Send {
207207

208208
// Transition the state to reset no matter what.
209209
stream.state.set_reset(stream_id, reason, initiator);
210+
// Notify the recv task if it's waiting, because it'll
211+
// want to hear about the reset.
212+
stream.notify_recv();
210213

211214
// If closed AND the send queue is flushed, then the stream cannot be
212215
// reset explicitly, either. Implicit resets can still be queued.

tests/h2-tests/tests/client_request.rs

+18-9
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
use futures::future::{join, ready, select, Either};
1+
use futures::future::{join, join_all, ready, select, Either};
22
use futures::stream::FuturesUnordered;
33
use futures::StreamExt;
44
use h2_support::prelude::*;
5-
use std::io;
65
use std::pin::Pin;
76
use std::task::Context;
7+
use std::{io, panic};
88

99
#[tokio::test]
1010
async fn handshake() {
@@ -849,7 +849,7 @@ async fn recv_too_big_headers() {
849849
};
850850

851851
let client = async move {
852-
let (mut client, mut conn) = client::Builder::new()
852+
let (mut client, conn) = client::Builder::new()
853853
.max_header_list_size(10)
854854
.handshake::<_, Bytes>(io)
855855
.await
@@ -861,25 +861,34 @@ async fn recv_too_big_headers() {
861861
.unwrap();
862862

863863
let req1 = client.send_request(request, true);
864-
let req1 = async move {
864+
// Spawn tasks to ensure that the error wakes up tasks that are blocked
865+
// waiting for a response.
866+
let req1 = tokio::spawn(async move {
865867
let err = req1.expect("send_request").0.await.expect_err("response1");
866868
assert_eq!(err.reason(), Some(Reason::REFUSED_STREAM));
867-
};
869+
});
868870

869871
let request = Request::builder()
870872
.uri("https://http2.akamai.com/")
871873
.body(())
872874
.unwrap();
873875

874876
let req2 = client.send_request(request, true);
875-
let req2 = async move {
877+
let req2 = tokio::spawn(async move {
876878
let err = req2.expect("send_request").0.await.expect_err("response2");
877879
assert_eq!(err.reason(), Some(Reason::REFUSED_STREAM));
878-
};
880+
});
879881

880-
conn.drive(join(req1, req2)).await;
881-
conn.await.expect("client");
882+
let conn = tokio::spawn(async move {
883+
conn.await.expect("client");
884+
});
885+
for err in join_all([req1, req2, conn]).await {
886+
if let Some(err) = err.err().and_then(|err| err.try_into_panic().ok()) {
887+
std::panic::resume_unwind(err);
888+
}
889+
}
882890
};
891+
883892
join(srv, client).await;
884893
}
885894

0 commit comments

Comments
 (0)