Skip to content

Commit 4cb2a53

Browse files
committed
streams: PushPromise tasks are notified properly
The push task is a separate task from the recv task, so its state needs to be tracked separately for waking. I don't know how to be systematic about ensuring that notify_push is called in all the right places, but this is an initial attempt. In order to test this works, we manually utilize FuturesUnordered which does fine-grained task wake tracking. The added test failed before making the other changes.
1 parent 7dbb5c5 commit 4cb2a53

File tree

6 files changed

+40
-15
lines changed

6 files changed

+40
-15
lines changed

src/proto/streams/prioritize.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -839,10 +839,7 @@ impl Prioritize {
839839
}),
840840
None => {
841841
if let Some(reason) = stream.state.get_scheduled_reset() {
842-
let stream_id = stream.id;
843-
stream
844-
.state
845-
.set_reset(stream_id, reason, Initiator::Library);
842+
stream.set_reset(reason, Initiator::Library);
846843

847844
let frame = frame::Reset::new(stream.id, reason);
848845
Frame::Reset(frame)

src/proto/streams/recv.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ impl Recv {
296296
let is_open = stream.state.ensure_recv_open()?;
297297

298298
if is_open {
299-
stream.recv_task = Some(cx.waker().clone());
299+
stream.push_task = Some(cx.waker().clone());
300300
Poll::Pending
301301
} else {
302302
Poll::Ready(None)
@@ -760,6 +760,7 @@ impl Recv {
760760
.pending_recv
761761
.push_back(&mut self.buffer, Event::Headers(Server(req)));
762762
stream.notify_recv();
763+
stream.notify_push();
763764
Ok(())
764765
}
765766

@@ -814,6 +815,7 @@ impl Recv {
814815

815816
stream.notify_send();
816817
stream.notify_recv();
818+
stream.notify_push();
817819

818820
Ok(())
819821
}
@@ -826,6 +828,7 @@ impl Recv {
826828
// If a receiver is waiting, notify it
827829
stream.notify_send();
828830
stream.notify_recv();
831+
stream.notify_push();
829832
}
830833

831834
pub fn go_away(&mut self, last_processed_id: StreamId) {
@@ -837,6 +840,7 @@ impl Recv {
837840
stream.state.recv_eof();
838841
stream.notify_send();
839842
stream.notify_recv();
843+
stream.notify_push();
840844
}
841845

842846
pub(super) fn clear_recv_buffer(&mut self, stream: &mut Stream) {

src/proto/streams/send.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -206,10 +206,7 @@ impl Send {
206206
}
207207

208208
// Transition the state to reset no matter what.
209-
stream.state.set_reset(stream_id, reason, initiator);
210-
// Notify the recv task if it's waiting, because it'll
211-
// want to hear about the reset.
212-
stream.notify_recv();
209+
stream.set_reset(reason, initiator);
213210

214211
// If closed AND the send queue is flushed, then the stream cannot be
215212
// reset explicitly, either. Implicit resets can still be queued.

src/proto/streams/stream.rs

+20
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use crate::Reason;
2+
13
use super::*;
24

35
use std::task::{Context, Waker};
@@ -104,6 +106,9 @@ pub(super) struct Stream {
104106
/// Task tracking receiving frames
105107
pub recv_task: Option<Waker>,
106108

109+
/// Task tracking pushed promises.
110+
pub push_task: Option<Waker>,
111+
107112
/// The stream's pending push promises
108113
pub pending_push_promises: store::Queue<NextAccept>,
109114

@@ -186,6 +191,7 @@ impl Stream {
186191
pending_recv: buffer::Deque::new(),
187192
is_recv: true,
188193
recv_task: None,
194+
push_task: None,
189195
pending_push_promises: store::Queue::new(),
190196
content_length: ContentLength::Omitted,
191197
}
@@ -369,6 +375,20 @@ impl Stream {
369375
task.wake();
370376
}
371377
}
378+
379+
pub(super) fn notify_push(&mut self) {
380+
if let Some(task) = self.push_task.take() {
381+
task.wake();
382+
}
383+
}
384+
385+
/// Set the stream's state to `Closed` with the given reason and initiator.
386+
/// Notify the send and receive tasks, if they exist.
387+
pub(super) fn set_reset(&mut self, reason: Reason, initiator: Initiator) {
388+
self.state.set_reset(self.id, reason, initiator);
389+
self.notify_push();
390+
self.notify_recv();
391+
}
372392
}
373393

374394
impl store::Next for NextAccept {

src/proto/streams/streams.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -825,7 +825,7 @@ impl Inner {
825825

826826
let parent = &mut self.store.resolve(parent_key);
827827
parent.pending_push_promises = ppp;
828-
parent.notify_recv();
828+
parent.notify_push();
829829
};
830830

831831
Ok(())

tests/h2-tests/tests/push_promise.rs

+12-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
use futures::future::join;
2-
use futures::{StreamExt, TryStreamExt};
1+
use std::iter::FromIterator;
2+
3+
use futures::{future::join, FutureExt as _, StreamExt, TryStreamExt};
34
use h2_support::prelude::*;
45

56
#[tokio::test]
@@ -51,9 +52,15 @@ async fn recv_push_works() {
5152
let ps: Vec<_> = p.collect().await;
5253
assert_eq!(1, ps.len())
5354
};
54-
55-
h2.drive(join(check_resp_status, check_pushed_response))
56-
.await;
55+
// Use a FuturesUnordered to poll both tasks but only poll them
56+
// if they have been notified.
57+
let tasks = futures::stream::FuturesUnordered::from_iter([
58+
check_resp_status.boxed(),
59+
check_pushed_response.boxed(),
60+
])
61+
.collect::<()>();
62+
63+
h2.drive(tasks).await;
5764
};
5865

5966
join(mock, h2).await;

0 commit comments

Comments
 (0)