From b56f5fa8d541fee7464ae4150132d76a31516d0b Mon Sep 17 00:00:00 2001 From: Evan Rittenhouse Date: Sun, 15 Dec 2024 12:07:09 -0600 Subject: [PATCH] h3i: implement expected frames Expected frames allow the user to specify a list of frames that the h3i client expects to receive over a given connection. If h3i sees all of the exected frames over the course of the connection, it will pre-emptively close the connection with a CONNECTION_CLOSE frame. If h3i does _not_ see all of the expected frames, the resulting ConnectionSummary will contain a list of the missing target frames for future inspection. This gives users a way to close tests out without waiting for the idle timeout, or adding Wait/ConnectionClose actions to the end of each test. This should vastly speed up test suites that have a large number of h3i tests. --- h3i/examples/content_length_mismatch.rs | 2 +- h3i/src/client/connection_summary.rs | 107 ++++++++++++++++++++++-- h3i/src/client/mod.rs | 15 ++-- h3i/src/client/sync_client.rs | 31 +++++-- h3i/src/frame.rs | 19 +++++ h3i/src/lib.rs | 2 +- h3i/src/main.rs | 8 +- 7 files changed, 161 insertions(+), 23 deletions(-) diff --git a/h3i/examples/content_length_mismatch.rs b/h3i/examples/content_length_mismatch.rs index cff1bfacb7..b3fd29bf1a 100644 --- a/h3i/examples/content_length_mismatch.rs +++ b/h3i/examples/content_length_mismatch.rs @@ -67,7 +67,7 @@ fn main() { ]; let summary = - sync_client::connect(config, &actions).expect("connection failed"); + sync_client::connect(config, &actions, None).expect("connection failed"); println!( "=== received connection summary! ===\n\n{}", diff --git a/h3i/src/client/connection_summary.rs b/h3i/src/client/connection_summary.rs index 9f72a2a73f..d557042110 100644 --- a/h3i/src/client/connection_summary.rs +++ b/h3i/src/client/connection_summary.rs @@ -39,6 +39,7 @@ use std::collections::HashMap; use std::iter::FromIterator; use crate::frame::EnrichedHeaders; +use crate::frame::ExpectedFrame; use crate::frame::H3iFrame; /// Maximum length of any serialized element's unstructured data such as reason @@ -74,6 +75,10 @@ impl Serialize for ConnectionSummary { self.path_stats.iter().map(SerializablePathStats).collect(); state.serialize_field("path_stats", &p)?; state.serialize_field("error", &self.conn_close_details)?; + state.serialize_field( + "missed_expected_frames", + &self.stream_map.missing_expected_frames(), + )?; state.end() } } @@ -81,7 +86,10 @@ impl Serialize for ConnectionSummary { /// A read-only aggregation of frames received over a connection, mapped to the /// stream ID over which they were received. #[derive(Clone, Debug, Default, Serialize)] -pub struct StreamMap(HashMap>); +pub struct StreamMap { + expected_frames: Option>, + map: HashMap>, +} impl From for StreamMap where @@ -113,7 +121,7 @@ impl StreamMap { /// assert_eq!(stream_map.all_frames(), vec![headers]); /// ``` pub fn all_frames(&self) -> Vec { - self.0 + self.map .values() .flatten() .map(Clone::clone) @@ -140,7 +148,7 @@ impl StreamMap { /// assert_eq!(stream_map.stream(0), vec![headers]); /// ``` pub fn stream(&self, stream_id: u64) -> Vec { - self.0.get(&stream_id).cloned().unwrap_or_default() + self.map.get(&stream_id).cloned().unwrap_or_default() } /// Check if a provided [`H3iFrame`] was received, regardless of what stream @@ -189,7 +197,7 @@ impl StreamMap { pub fn received_frame_on_stream( &self, stream: u64, frame: &H3iFrame, ) -> bool { - self.0.get(&stream).map(|v| v.contains(frame)).is_some() + self.map.get(&stream).map(|v| v.contains(frame)).is_some() } /// Check if the stream map is empty, e.g., no frames were received. @@ -213,7 +221,7 @@ impl StreamMap { /// assert!(!stream_map.is_empty()); /// ``` pub fn is_empty(&self) -> bool { - self.0.is_empty() + self.map.is_empty() } /// See all HEADERS received on a given stream. @@ -246,8 +254,49 @@ impl StreamMap { .collect() } + /// If the [`StreamMap`] has received all the [`ExectedFrame`]s it was + /// configured to receive. If no expected frames were specified, this + /// returns `false`. + pub fn seen_all_expected_frames(&self) -> bool { + self.expected_frames.as_ref().is_some_and(|inner| { + inner.iter().all(|tf| self.contains_expected_frame(tf)) + }) + } + + pub(crate) fn new( + expected_frames: Option>, + map: HashMap>, + ) -> Self { + Self { + expected_frames, + map, + } + } + + pub(crate) fn with_expected( + expected_frames: Option>, + ) -> Self { + Self::new(expected_frames, HashMap::default()) + } + pub(crate) fn insert(&mut self, stream_id: u64, frame: H3iFrame) { - self.0.entry(stream_id).or_default().push(frame); + self.map.entry(stream_id).or_default().push(frame); + } + + pub(crate) fn missing_expected_frames(&self) -> Option> { + self.expected_frames.as_ref().map(|inner| { + inner + .iter() + .filter(|tf| !self.contains_expected_frame(tf)) + .map(Clone::clone) + .collect::>() + }) + } + + fn contains_expected_frame(&self, ef: &ExpectedFrame) -> bool { + self.map + .get(ef.stream_id()) + .is_some_and(|frames| frames.contains(&ef.frame)) } } @@ -422,3 +471,49 @@ impl Serialize for SerializableConnectionError<'_> { state.end() } } + +#[cfg(test)] +mod tests { + use super::*; + + fn frames_to_expected_frames(frames: &[H3iFrame]) -> Vec { + frames + .into_iter() + .map(|f| ExpectedFrame::new(0, f.clone())) + .collect::>() + } + + #[test] + fn test_expected_frames() { + use quiche::h3::Header; + use std::collections::HashMap; + use std::iter::FromIterator; + + let h = Header::new(b"hello", b"world"); + let enriched = EnrichedHeaders::from(vec![h]); + let headers = H3iFrame::Headers(enriched.clone()); + let data = H3iFrame::QuicheH3(quiche::h3::frame::Frame::Data { + payload: b"hello world".to_vec(), + }); + + let frames = vec![headers.clone(), data.clone()]; + let map = HashMap::from_iter([(0, frames.clone())]); + let stream_map = + StreamMap::new(Some(frames_to_expected_frames(&frames)), map.clone()); + assert!(stream_map.seen_all_expected_frames()); + + let mut expected_frames = vec![ExpectedFrame::new( + 0, + H3iFrame::QuicheH3(quiche::h3::frame::Frame::Data { + payload: b"missing payload".to_vec(), + }), + )]; + expected_frames = expected_frames + .into_iter() + .chain(frames_to_expected_frames(&frames)) + .collect::>(); + + let stream_map = StreamMap::new(Some(expected_frames), map); + assert!(!stream_map.seen_all_expected_frames()); + } +} diff --git a/h3i/src/client/mod.rs b/h3i/src/client/mod.rs index b8195a9556..436b80f630 100644 --- a/h3i/src/client/mod.rs +++ b/h3i/src/client/mod.rs @@ -62,9 +62,7 @@ use quiche::h3::Error; use quiche::h3::NameValue; use quiche::Connection; use quiche::Result; -use quiche::{ - self, -}; +use quiche::{self}; const MAX_DATAGRAM_SIZE: usize = 1350; const QUIC_VERSION: u32 = 1; @@ -178,6 +176,9 @@ pub(crate) trait Client { /// Handles a response frame. This allows [`Client`]s to customize how they /// construct a [`StreamMap`] from a list of frames. fn handle_response_frame(&mut self, stream_id: u64, frame: H3iFrame); + + /// If the client has exhausted the list of [`ExpectedFrames`] + fn saw_all_expected_frames(&self) -> bool; } pub(crate) type StreamParserMap = HashMap; @@ -216,8 +217,8 @@ pub(crate) fn execute_action( // need to rewrite the event time ev.time = Instant::now() .duration_since(s.start_time()) - .as_secs_f32() * - 1000.0; + .as_secs_f32() + * 1000.0; s.add_event(ev).ok(); }, } @@ -271,8 +272,8 @@ pub(crate) fn execute_action( // need to rewrite the event time ev.time = Instant::now() .duration_since(s.start_time()) - .as_secs_f32() * - 1000.0; + .as_secs_f32() + * 1000.0; s.add_event(ev).ok(); }, } diff --git a/h3i/src/client/sync_client.rs b/h3i/src/client/sync_client.rs index a24b3cecfd..71e3c03da4 100644 --- a/h3i/src/client/sync_client.rs +++ b/h3i/src/client/sync_client.rs @@ -31,6 +31,7 @@ use std::slice::Iter; use std::time::Duration; use std::time::Instant; +use crate::frame::ExpectedFrame; use crate::frame::H3iFrame; use crate::quiche; @@ -57,6 +58,15 @@ struct SyncClient { stream_parsers: StreamParserMap, } +impl SyncClient { + fn new(expected_frames: Option>) -> Self { + Self { + stream_map: StreamMap::with_expected(expected_frames), + stream_parsers: StreamParserMap::default(), + } + } +} + impl Client for SyncClient { fn stream_parsers_mut(&mut self) -> &mut StreamParserMap { &mut self.stream_parsers @@ -65,6 +75,10 @@ impl Client for SyncClient { fn handle_response_frame(&mut self, stream_id: u64, frame: H3iFrame) { self.streams.insert(stream_id, frame); } + + fn saw_all_expected_frames(&self) -> bool { + self.stream_map.seen_all_expected_frames() + } } /// Connect to a server and execute provided actions. @@ -74,7 +88,7 @@ impl Client for SyncClient { /// /// Returns a [ConnectionSummary] on success, [ClientError] on failure. pub fn connect( - args: Config, actions: &[Action], + args: Config, actions: &[Action], expected_frames: Option>, ) -> std::result::Result { let mut buf = [0; 65535]; let mut out = [0; MAX_DATAGRAM_SIZE]; @@ -142,8 +156,7 @@ pub fn connect( let mut wait_duration = None; let mut wait_instant = None; - let mut client = SyncClient::default(); - + let mut client = SyncClient::new(expected_frames); let mut waiting_for = WaitingFor::default(); loop { @@ -248,8 +261,8 @@ pub fn connect( // Create a new application protocol session once the QUIC connection is // established. - if (conn.is_established() || conn.is_in_early_data()) && - !app_proto_selected + if (conn.is_established() || conn.is_in_early_data()) + && !app_proto_selected { app_proto_selected = true; } @@ -277,6 +290,14 @@ pub fn connect( wait_cleared = true; } + if client.saw_all_expected_frames() { + let _ = conn.close( + true, + quiche::h3::WireErrorCode::NoError as u64, + b"saw all expected frames", + ); + } + if wait_cleared { check_duration_and_do_actions( &mut wait_duration, diff --git a/h3i/src/frame.rs b/h3i/src/frame.rs index dd642e6518..9eea250e9b 100644 --- a/h3i/src/frame.rs +++ b/h3i/src/frame.rs @@ -58,6 +58,25 @@ pub enum H3iFrame { ResetStream(ResetStream), } +#[derive(Debug, Eq, PartialEq, Serialize, Clone)] +pub struct ExpectedFrame { + stream_id: u64, + // pub(crate) means we can move the frame without needing to implement + // Default for H3iFrame, since otherwise we'd need to std::mem::take() to + // transfer H3iFrame ownership to the StreamMap + pub(crate) frame: H3iFrame, +} + +impl ExpectedFrame { + pub fn new(stream_id: u64, frame: H3iFrame) -> Self { + Self { stream_id, frame } + } + + pub fn stream_id(&self) -> &u64 { + &self.stream_id + } +} + impl H3iFrame { /// Try to convert this `H3iFrame` to an [EnrichedHeaders]. /// diff --git a/h3i/src/lib.rs b/h3i/src/lib.rs index 10cfb8ef29..7d45882b82 100644 --- a/h3i/src/lib.rs +++ b/h3i/src/lib.rs @@ -112,7 +112,7 @@ //! ]; //! //! let summary = -//! sync_client::connect(config, &actions).expect("connection failed"); +//! sync_client::connect(config, &actions, vec![]).expect("connection failed"); //! //! println!( //! "=== received connection summary! ===\n\n{}", diff --git a/h3i/src/main.rs b/h3i/src/main.rs index b27075b51f..9a024164db 100644 --- a/h3i/src/main.rs +++ b/h3i/src/main.rs @@ -33,6 +33,7 @@ use std::time::Instant; use h3i::actions::h3::Action; use h3i::client::connection_summary::ConnectionSummary; use h3i::client::ClientError; +use h3i::frame::ExpectedFrame; use h3i::prompts::h3::Prompter; use h3i::recordreplay::qlog::QlogEvent; use h3i::recordreplay::qlog::*; @@ -298,7 +299,8 @@ fn config_from_clap() -> std::result::Result { fn sync_client( config: Config, actions: &[Action], ) -> Result { - h3i::client::sync_client::connect(config.library_config, actions) + // TODO: CLI doesn't support passing expected frames at the moment + h3i::client::sync_client::connect(config.library_config, actions, None) } fn read_qlog(filename: &str, host_override: Option<&str>) -> Vec { @@ -345,8 +347,8 @@ fn prompt_frames(config: &Config) -> Vec { // need to rewrite the event time ev.time = Instant::now() .duration_since(streamer.start_time()) - .as_secs_f32() * - 1000.0; + .as_secs_f32() + * 1000.0; streamer.add_event(ev).ok(); }, }