Skip to content

Commit

Permalink
h3i: implement expected frames
Browse files Browse the repository at this point in the history
Expected frames allow the user to specify a list of frames that the h3i
client expects to receive over a given connection.

If h3i sees all of the exected frames over the course of the connection,
it will pre-emptively close the connection with a CONNECTION_CLOSE
frame. If h3i does _not_ see all of the expected frames, the resulting
ConnectionSummary will contain a list of the missing target frames for
future inspection.

This gives users a way to close tests out without waiting for the idle
timeout, or adding Wait/ConnectionClose actions to the end of each test.
This should vastly speed up test suites that have a large number of h3i
tests.
  • Loading branch information
evanrittenhouse committed Dec 18, 2024
1 parent 85791d9 commit c188b1a
Show file tree
Hide file tree
Showing 7 changed files with 234 additions and 14 deletions.
2 changes: 1 addition & 1 deletion h3i/examples/content_length_mismatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ fn main() {
];

let summary =
sync_client::connect(config, &actions).expect("connection failed");
sync_client::connect(config, &actions, None).expect("connection failed");

println!(
"=== received connection summary! ===\n\n{}",
Expand Down
143 changes: 136 additions & 7 deletions h3i/src/client/connection_summary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use std::collections::HashMap;
use std::iter::FromIterator;

use crate::frame::EnrichedHeaders;
use crate::frame::ExpectedFrame;
use crate::frame::H3iFrame;

/// Maximum length of any serialized element's unstructured data such as reason
Expand All @@ -57,6 +58,8 @@ pub struct ConnectionSummary {
pub path_stats: Vec<PathStats>,
/// Details about why the connection closed.
pub conn_close_details: ConnectionCloseDetails,
/// [`ExpectedFrame`]s that were not received.
pub missing_frames: Option<Vec<ExpectedFrame>>,
}

impl Serialize for ConnectionSummary {
Expand All @@ -74,22 +77,29 @@ impl Serialize for ConnectionSummary {
self.path_stats.iter().map(SerializablePathStats).collect();
state.serialize_field("path_stats", &p)?;
state.serialize_field("error", &self.conn_close_details)?;
state.serialize_field("missed_expected_frames", &self.missing_frames)?;
state.end()
}
}

/// A read-only aggregation of frames received over a connection, mapped to the
/// stream ID over which they were received.
#[derive(Clone, Debug, Default, Serialize)]
pub struct StreamMap(HashMap<u64, Vec<H3iFrame>>);
pub struct StreamMap {
map: HashMap<u64, Vec<H3iFrame>>,
expected_frames: Option<ExpectedFrames>,
}

impl<T> From<T> for StreamMap
where
T: IntoIterator<Item = (u64, Vec<H3iFrame>)>,
{
fn from(value: T) -> Self {
let map = HashMap::from_iter(value);
Self(map)
Self {
map,
expected_frames: None,
}
}
}

Expand All @@ -113,7 +123,7 @@ impl StreamMap {
/// assert_eq!(stream_map.all_frames(), vec![headers]);
/// ```
pub fn all_frames(&self) -> Vec<H3iFrame> {
self.0
self.map
.values()
.flatten()
.map(Clone::clone)
Expand All @@ -140,7 +150,7 @@ impl StreamMap {
/// assert_eq!(stream_map.stream(0), vec![headers]);
/// ```
pub fn stream(&self, stream_id: u64) -> Vec<H3iFrame> {
self.0.get(&stream_id).cloned().unwrap_or_default()
self.map.get(&stream_id).cloned().unwrap_or_default()
}

/// Check if a provided [`H3iFrame`] was received, regardless of what stream
Expand Down Expand Up @@ -189,7 +199,7 @@ impl StreamMap {
pub fn received_frame_on_stream(
&self, stream: u64, frame: &H3iFrame,
) -> bool {
self.0.get(&stream).map(|v| v.contains(frame)).is_some()
self.map.get(&stream).map(|v| v.contains(frame)).is_some()
}

/// Check if the stream map is empty, e.g., no frames were received.
Expand All @@ -213,7 +223,7 @@ impl StreamMap {
/// assert!(!stream_map.is_empty());
/// ```
pub fn is_empty(&self) -> bool {
self.0.is_empty()
self.map.is_empty()
}

/// See all HEADERS received on a given stream.
Expand Down Expand Up @@ -246,8 +256,57 @@ impl StreamMap {
.collect()
}

pub(crate) fn new(expected: Option<Vec<ExpectedFrame>>) -> Self {
Self {
expected_frames: expected.map(|e| ExpectedFrames::new(e)),
..Default::default()
}
}

pub(crate) fn insert(&mut self, stream_id: u64, frame: H3iFrame) {
self.0.entry(stream_id).or_default().push(frame);
if let Some(expected) = self.expected_frames.as_mut() {
expected.receive_frame(stream_id, &frame);
}

self.map.entry(stream_id).or_default().push(frame);
}

pub(crate) fn saw_all_expected_frames(&self) -> bool {
self.expected_frames
.as_ref()
.is_some_and(|e| e.saw_all_frames())
}

pub(crate) fn missing_frames(&self) -> Option<Vec<ExpectedFrame>> {
self.expected_frames.as_ref().map(|e| e.missing_frames())
}
}

#[derive(Serialize, Clone, Debug)]
struct ExpectedFrames {
missing: Vec<ExpectedFrame>,
}

impl ExpectedFrames {
fn new(frames: Vec<ExpectedFrame>) -> Self {
Self { missing: frames }
}

fn receive_frame(&mut self, stream_id: u64, frame: &H3iFrame) {
for (i, ef) in self.missing.iter_mut().enumerate() {
if ef.is_equivalent(frame) && ef.stream_id() == stream_id {
self.missing.remove(i);
break;
}
}
}

fn saw_all_frames(&self) -> bool {
self.missing.is_empty()
}

fn missing_frames(&self) -> Vec<ExpectedFrame> {
self.missing.clone()
}
}

Expand Down Expand Up @@ -422,3 +481,73 @@ impl Serialize for SerializableConnectionError<'_> {
state.end()
}
}

#[cfg(test)]
mod tests {
use super::*;
use quiche::h3::Header;

fn h3i_frame() -> H3iFrame {
vec![Header::new(b"hello", b"world")].into()
}

#[test]
fn expected_frame() {
let frame = h3i_frame();
let mut expected =
ExpectedFrames::new(vec![ExpectedFrame::new(0, frame.clone())]);

expected.receive_frame(0, &frame);

assert!(expected.saw_all_frames());
}

#[test]
fn expected_frame_missing() {
let frame = h3i_frame();
let expected_frames = vec![
ExpectedFrame::new(0, frame.clone()),
ExpectedFrame::new(4, frame.clone()),
ExpectedFrame::new(8, vec![Header::new(b"go", b"jets")].into()),
];
let mut expected = ExpectedFrames::new(expected_frames.clone());

expected.receive_frame(0, &frame);

assert!(!expected.saw_all_frames());
assert_eq!(expected.missing_frames(), expected_frames[1..].to_vec());
}

fn stream_map_data() -> Vec<H3iFrame> {
let headers =
H3iFrame::Headers(EnrichedHeaders::from(vec![Header::new(
b"hello", b"world",
)]));
let data = H3iFrame::QuicheH3(quiche::h3::frame::Frame::Data {
payload: b"hello world".to_vec(),
});

vec![headers, data]
}

#[test]
fn test_stream_map_expected_frames_with_none() {
let stream_map: StreamMap = vec![(0, stream_map_data())].into();
assert!(!stream_map.saw_all_expected_frames());
}

#[test]
fn test_stream_map_expected_frames() {
let data = stream_map_data();
let mut stream_map = StreamMap::new(Some(vec![
ExpectedFrame::new(0, data[0].clone()),
ExpectedFrame::new(0, data[1].clone()),
]));

stream_map.insert(0, data[0].clone());
assert!(!stream_map.saw_all_expected_frames());
assert_eq!(stream_map.missing_frames().unwrap(), vec![
ExpectedFrame::new(0, data[1].clone())
]);
}
}
25 changes: 22 additions & 3 deletions h3i/src/client/sync_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::slice::Iter;
use std::time::Duration;
use std::time::Instant;

use crate::frame::ExpectedFrame;
use crate::frame::H3iFrame;
use crate::quiche;

Expand All @@ -57,6 +58,15 @@ struct SyncClient {
stream_parsers: StreamParserMap,
}

impl SyncClient {
fn new(expected_frames: Option<Vec<ExpectedFrame>>) -> Self {
Self {
streams: StreamMap::new(expected_frames),
..Default::default()
}
}
}

impl Client for SyncClient {
fn stream_parsers_mut(&mut self) -> &mut StreamParserMap {
&mut self.stream_parsers
Expand All @@ -74,7 +84,7 @@ impl Client for SyncClient {
///
/// Returns a [ConnectionSummary] on success, [ClientError] on failure.
pub fn connect(
args: Config, actions: &[Action],
args: Config, actions: &[Action], expected_frames: Option<Vec<ExpectedFrame>>,
) -> std::result::Result<ConnectionSummary, ClientError> {
let mut buf = [0; 65535];
let mut out = [0; MAX_DATAGRAM_SIZE];
Expand Down Expand Up @@ -142,8 +152,7 @@ pub fn connect(
let mut wait_duration = None;
let mut wait_instant = None;

let mut client = SyncClient::default();

let mut client = SyncClient::new(expected_frames);
let mut waiting_for = WaitingFor::default();

loop {
Expand Down Expand Up @@ -277,6 +286,14 @@ pub fn connect(
wait_cleared = true;
}

if client.streams.saw_all_expected_frames() {
let _ = conn.close(
true,
quiche::h3::WireErrorCode::NoError as u64,
b"saw all expected frames",
);
}

if wait_cleared {
check_duration_and_do_actions(
&mut wait_duration,
Expand Down Expand Up @@ -370,11 +387,13 @@ pub fn connect(
}
}

let missing_frames = client.streams.missing_frames();
Ok(ConnectionSummary {
stream_map: client.streams,
stats: Some(conn.stats()),
path_stats: conn.path_stats().collect(),
conn_close_details: ConnectionCloseDetails::new(&conn),
missing_frames,
})
}

Expand Down
1 change: 1 addition & 0 deletions h3i/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
use std::io;

/// Server details and QUIC connection properties.
#[derive(Clone)]
pub struct Config {
/// A string representing the host and port to connect to using the format
/// `<host>:<port>`.
Expand Down
72 changes: 71 additions & 1 deletion h3i/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub type BoxError = Box<dyn Error + Send + Sync + 'static>;

/// An internal representation of a QUIC or HTTP/3 frame. This type exists so
/// that we can extend types defined in Quiche.
#[derive(Debug, Clone, Eq, PartialEq)]
#[derive(Debug, Eq, PartialEq, Clone)]
pub enum H3iFrame {
/// A wrapper around a quiche HTTP/3 frame.
QuicheH3(QFrame),
Expand All @@ -69,6 +69,49 @@ impl H3iFrame {
None
}
}

/// Check if this [`H3iFrame`] is equivalent to another. For
/// QuicheH3/ResetStream variants, equivalence is the same as equality.
/// For Headers variants, this [`H3iFrame`] is equivalent to another if
/// the other frame contains all [`Header`]s in _this_ frame.
///
/// # Example
///
/// ```
/// use h3i::frame::H3iFrame;
/// use quiche::h3::Header;
///
/// let this: H3iFrame = vec![Header::new(b"hello", b"world")].into();
/// let other: H3iFrame = vec![
/// Header::new(b"hello", b"world"),
/// Header::new(b"go", b"jets")
/// ].into();
///
/// assert!(this.is_equivalent(&other));
/// // `this` does not contain the `go: jets` header, so `other` is not equivalent to `this`.
/// assert!(!other.is_equivalent(&this));
/// ```
pub fn is_equivalent(&self, other: &H3iFrame) -> bool {
match self {
Self::Headers(me) => {
let H3iFrame::Headers(other) = other else {
return false;
};

// We expect pretty small expected frame vectors, so complexity
// here isn't too bad
me.headers().iter().all(|m| other.headers().contains(m))
},
Self::QuicheH3(me) => match other {
H3iFrame::QuicheH3(other) => me == other,
_ => false,
},
Self::ResetStream(me) => match other {
H3iFrame::ResetStream(rs) => me == rs,
_ => false,
},
}
}
}

impl Serialize for H3iFrame {
Expand Down Expand Up @@ -432,3 +475,30 @@ impl Serialize for SerializableQFrame<'_> {
}
}
}

/// A combination of stream ID and [`H3iFrame`] which is used to instruct h3i to
/// watch for specific frames. If h3i receives all the frames it expects, it
/// will send an application CONNECTION_CLOSE frame with an error code of 0x100.
/// This bypasses the idle timeout and vastly quickens test suites which depend
/// heavily on h3i.
#[derive(Debug, Eq, PartialEq, Serialize, Clone)]
pub struct ExpectedFrame {
stream_id: u64,
frame: H3iFrame,
}

impl ExpectedFrame {
pub fn new(stream_id: u64, frame: H3iFrame) -> Self {
Self { stream_id, frame }
}

pub(crate) fn stream_id(&self) -> u64 {
self.stream_id
}

pub(crate) fn is_equivalent(&self, other: &H3iFrame) -> bool {
// TODO(evanrittenhouse): allow users to specify custom equivalence
// functions
self.frame.is_equivalent(other)
}
}
Loading

0 comments on commit c188b1a

Please sign in to comment.