Skip to content

Commit

Permalink
Add test for empty body streamed request in agent (#2603)
Browse files Browse the repository at this point in the history
* Add test for empty body streamed request in agent

* Add changelog file
  • Loading branch information
gememma authored Jul 15, 2024
1 parent 4fe9012 commit 652f092
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 9 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions changelog.d/2593.internal.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add test to ensure empty streamed request doesn't hang if empty
18 changes: 13 additions & 5 deletions mirrord/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@ workspace = true

[dependencies]
containerd-client = "0.5"
tokio = { workspace = true, features = ["rt", "net", "macros", "fs", "process", "signal"] }
tokio = { workspace = true, features = [
"rt",
"net",
"macros",
"fs",
"process",
"signal",
] }
serde.workspace = true
serde_json.workspace = true
pnet = "0.35"
nix = { workspace = true, features = ["mount", "sched", "user"] }
nix = { workspace = true, features = ["mount", "sched", "user"] }
clap = { workspace = true, features = ["env"] }
mirrord-protocol = { path = "../protocol"}
mirrord-protocol = { path = "../protocol" }
actix-codec.workspace = true
futures.workspace = true
tracing.workspace = true
Expand Down Expand Up @@ -66,11 +73,12 @@ x509-parser = "0.16"
rustls.workspace = true

[target.'cfg(target_os = "linux")'.dependencies]
iptables = {git = "https://github.com/metalbear-co/rust-iptables.git", rev = "e66c7332e361df3c61a194f08eefe3f40763d624"}
rawsocket = {git = "https://github.com/metalbear-co/rawsocket.git"}
iptables = { git = "https://github.com/metalbear-co/rust-iptables.git", rev = "e66c7332e361df3c61a194f08eefe3f40763d624" }
rawsocket = { git = "https://github.com/metalbear-co/rawsocket.git" }


[dev-dependencies]
rstest = "0.21"
mockall = "0.12" # 0.11.3 is broken
test_bin = "0.4"
rcgen = "0.13"
63 changes: 59 additions & 4 deletions mirrord/agent/src/steal/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,7 @@ mod test {
};
use hyper_util::rt::TokioIo;
use mirrord_protocol::tcp::{ChunkedRequest, DaemonTcp, InternalHttpBodyFrame};
use rstest::rstest;
use tokio::{
net::{TcpListener, TcpStream},
sync::{
Expand Down Expand Up @@ -794,7 +795,7 @@ mod test {
};
assert_eq!(
x.internal_request.body,
vec![InternalHttpBodyFrame::Data(b"string".to_vec().into())]
vec![InternalHttpBodyFrame::Data(b"string".to_vec())]
);
let x = client_rx.recv().now_or_never();
assert!(x.is_none());
Expand All @@ -810,13 +811,67 @@ mod test {
};
assert_eq!(
x.frames,
vec![InternalHttpBodyFrame::Data(
b"another_string".to_vec().into()
)]
vec![InternalHttpBodyFrame::Data(b"another_string".to_vec())]
);
let x = client_rx.recv().now_or_never();
assert!(x.is_none());

let _ = response_tx.send(Response::new(Empty::default()));
}

#[rstest]
#[tokio::test]
#[timeout(std::time::Duration::from_secs(5))]
async fn test_empty_streaming_request() {
let (addr, mut request_rx) = prepare_dummy_service().await;
let conn = TcpStream::connect(addr).await.unwrap();
let (mut sender, conn) = hyper::client::conn::http1::handshake(TokioIo::new(conn))
.await
.unwrap();
tokio::spawn(conn);

tokio::spawn(
sender.send_request(
Request::builder()
.method(Method::POST)
.uri("/")
.version(Version::HTTP_11)
.body(http_body_util::Empty::<Bytes>::new())
.unwrap(),
),
);

let (client_tx, mut client_rx) = mpsc::channel::<DaemonTcp>(4);
let client = Client {
tx: client_tx,
protocol_version: "1.7.0".parse().unwrap(),
subscribed_connections: Default::default(),
};

let (request, response_tx) = request_rx.recv().await.unwrap();
client.send_request_async(MatchedHttpRequest {
connection_id: 0,
port: 80,
request_id: 0,
request,
});

// Verify that ChunkedRequest::Start request is as expected
let msg = client_rx.recv().await.unwrap();
let DaemonTcp::HttpRequestChunked(ChunkedRequest::Start(_)) = msg else {
panic!("unexpected type received: {msg:?}")
};

// Verify that empty ChunkedRequest::Body request is as expected
let msg = client_rx.recv().await.unwrap();
let DaemonTcp::HttpRequestChunked(ChunkedRequest::Body(x)) = msg else {
panic!("unexpected type received: {msg:?}")
};
assert_eq!(x.frames, vec![]);
assert!(x.is_last);
let x = client_rx.recv().now_or_never();
assert!(x.is_none());

let _ = response_tx.send(Response::new(Empty::default()));
}
}

0 comments on commit 652f092

Please sign in to comment.