Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(app/test): minor tweaks to linkerd-app-test #3428

Merged
merged 7 commits into from
Dec 6, 2024
9 changes: 8 additions & 1 deletion linkerd/app/inbound/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,14 @@ pub mod fuzz {
.header(header_name, header_value)
.body(Body::default())
{
let rsp = http_util::http_request(&mut client, req).await;
let rsp = client
.ready()
.await
.expect("HTTP client poll_ready failed")
.call(req)
.await
cratelyn marked this conversation as resolved.
Show resolved Hide resolved
.expect("HTTP client request failed");
tracing::info!(?rsp);
tracing::info!(?rsp);
if let Ok(rsp) = rsp {
let body = http_util::body_to_string(rsp.into_body()).await;
Expand Down
149 changes: 117 additions & 32 deletions linkerd/app/inbound/src/http/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use linkerd_app_test::connect::ConnectFuture;
use linkerd_tracing::test::trace_init;
use std::{net::SocketAddr, sync::Arc};
use tokio::time;
use tower::{Service, ServiceExt};
use tracing::Instrument;

fn build_server<I>(
Expand Down Expand Up @@ -72,7 +73,14 @@ async fn unmeshed_http1_hello_world() {
.uri("http://foo.svc.cluster.local:5550")
.body(Body::default())
.unwrap();
let rsp = http_util::http_request(&mut client, req).await.unwrap();
let rsp = client
.ready()
.await
.expect("HTTP client poll_ready failed")
.call(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
assert_eq!(rsp.status(), http::StatusCode::OK);
let body = http_util::body_to_string(rsp.into_body()).await.unwrap();
assert_eq!(body, "Hello world!");
Expand Down Expand Up @@ -113,7 +121,14 @@ async fn downgrade_origin_form() {
.header("l5d-orig-proto", "HTTP/1.1")
.body(Body::default())
.unwrap();
let rsp = http_util::http_request(&mut client, req).await.unwrap();
let rsp = client
.ready()
.await
.expect("HTTP client poll_ready failed")
.call(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
assert_eq!(rsp.status(), http::StatusCode::OK);
let body = http_util::body_to_string(rsp.into_body()).await.unwrap();
assert_eq!(body, "Hello world!");
Expand Down Expand Up @@ -153,7 +168,14 @@ async fn downgrade_absolute_form() {
.header("l5d-orig-proto", "HTTP/1.1; absolute-form")
.body(Body::default())
.unwrap();
let rsp = http_util::http_request(&mut client, req).await.unwrap();
let rsp = client
.ready()
.await
.expect("HTTP client poll_ready failed")
.call(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
assert_eq!(rsp.status(), http::StatusCode::OK);
let body = http_util::body_to_string(rsp.into_body()).await.unwrap();
assert_eq!(body, "Hello world!");
Expand Down Expand Up @@ -189,13 +211,20 @@ async fn http1_bad_gateway_meshed_response_error_header() {
.uri("http://foo.svc.cluster.local:5550")
.body(Body::default())
.unwrap();
let response = http_util::http_request(&mut client, req).await.unwrap();
assert_eq!(response.status(), http::StatusCode::BAD_GATEWAY);
let rsp = client
.ready()
.await
.expect("HTTP client poll_ready failed")
.call(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
assert_eq!(rsp.status(), http::StatusCode::BAD_GATEWAY);
// NOTE: this does not include a stack error context for that endpoint
// because we don't build a real HTTP endpoint stack, which adds error
// context to this error, and the client rescue layer is below where the
// logical error context is added.
check_error_header(response.headers(), "server is not listening");
check_error_header(rsp.headers(), "server is not listening");

drop(client);
bg.await.expect("background task failed");
Expand Down Expand Up @@ -228,10 +257,17 @@ async fn http1_bad_gateway_unmeshed_response() {
.uri("http://foo.svc.cluster.local:5550")
.body(Body::default())
.unwrap();
let response = http_util::http_request(&mut client, req).await.unwrap();
assert_eq!(response.status(), http::StatusCode::BAD_GATEWAY);
let rsp = client
.ready()
.await
.expect("HTTP client poll_ready failed")
.call(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
assert_eq!(rsp.status(), http::StatusCode::BAD_GATEWAY);
assert!(
response.headers().get(L5D_PROXY_ERROR).is_none(),
rsp.headers().get(L5D_PROXY_ERROR).is_none(),
"response must not contain L5D_PROXY_ERROR header"
);

Expand Down Expand Up @@ -270,14 +306,21 @@ async fn http1_connect_timeout_meshed_response_error_header() {
.uri("http://foo.svc.cluster.local:5550")
.body(Body::default())
.unwrap();
let response = http_util::http_request(&mut client, req).await.unwrap();
assert_eq!(response.status(), http::StatusCode::GATEWAY_TIMEOUT);
let rsp = client
.ready()
.await
.expect("HTTP client poll_ready failed")
.call(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
assert_eq!(rsp.status(), http::StatusCode::GATEWAY_TIMEOUT);

// NOTE: this does not include a stack error context for that endpoint
// because we don't build a real HTTP endpoint stack, which adds error
// context to this error, and the client rescue layer is below where the
// logical error context is added.
check_error_header(response.headers(), "connect timed out after 1s");
check_error_header(rsp.headers(), "connect timed out after 1s");

drop(client);
bg.await.expect("background task failed");
Expand Down Expand Up @@ -314,10 +357,17 @@ async fn http1_connect_timeout_unmeshed_response_error_header() {
.uri("http://foo.svc.cluster.local:5550")
.body(Body::default())
.unwrap();
let response = http_util::http_request(&mut client, req).await.unwrap();
assert_eq!(response.status(), http::StatusCode::GATEWAY_TIMEOUT);
let rsp = client
.ready()
.await
.expect("HTTP client poll_ready failed")
.call(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
assert_eq!(rsp.status(), http::StatusCode::GATEWAY_TIMEOUT);
assert!(
response.headers().get(L5D_PROXY_ERROR).is_none(),
rsp.headers().get(L5D_PROXY_ERROR).is_none(),
"response must not contain L5D_PROXY_ERROR header"
);

Expand Down Expand Up @@ -352,10 +402,17 @@ async fn h2_response_meshed_error_header() {
.uri("http://foo.svc.cluster.local:5550")
.body(Body::default())
.unwrap();
let response = http_util::http_request(&mut client, req).await.unwrap();
assert_eq!(response.status(), http::StatusCode::GATEWAY_TIMEOUT);

check_error_header(response.headers(), "service in fail-fast");
let rsp = client
.ready()
.await
.expect("HTTP client poll_ready failed")
.call(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
assert_eq!(rsp.status(), http::StatusCode::GATEWAY_TIMEOUT);

check_error_header(rsp.headers(), "service in fail-fast");

// Drop the client and discard the result of awaiting the proxy background
// task. The result is discarded because it hits an error that is related
Expand Down Expand Up @@ -391,10 +448,17 @@ async fn h2_response_unmeshed_error_header() {
.uri("http://foo.svc.cluster.local:5550")
.body(Body::default())
.unwrap();
let response = http_util::http_request(&mut client, req).await.unwrap();
assert_eq!(response.status(), http::StatusCode::GATEWAY_TIMEOUT);
let rsp = client
.ready()
.await
.expect("HTTP client poll_ready failed")
.call(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
assert_eq!(rsp.status(), http::StatusCode::GATEWAY_TIMEOUT);
assert!(
response.headers().get(L5D_PROXY_ERROR).is_none(),
rsp.headers().get(L5D_PROXY_ERROR).is_none(),
"response must not contain L5D_PROXY_ERROR header"
);

Expand Down Expand Up @@ -433,10 +497,17 @@ async fn grpc_meshed_response_error_header() {
.header(http::header::CONTENT_TYPE, "application/grpc")
.body(Body::default())
.unwrap();
let response = http_util::http_request(&mut client, req).await.unwrap();
assert_eq!(response.status(), http::StatusCode::OK);
let rsp = client
.ready()
.await
.expect("HTTP client poll_ready failed")
.call(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
assert_eq!(rsp.status(), http::StatusCode::OK);

check_error_header(response.headers(), "service in fail-fast");
check_error_header(rsp.headers(), "service in fail-fast");

// Drop the client and discard the result of awaiting the proxy background
// task. The result is discarded because it hits an error that is related
Expand Down Expand Up @@ -473,10 +544,17 @@ async fn grpc_unmeshed_response_error_header() {
.header(http::header::CONTENT_TYPE, "application/grpc")
.body(Body::default())
.unwrap();
let response = http_util::http_request(&mut client, req).await.unwrap();
assert_eq!(response.status(), http::StatusCode::OK);
let rsp = client
.ready()
.await
.expect("HTTP client poll_ready failed")
.call(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
assert_eq!(rsp.status(), http::StatusCode::OK);
assert!(
response.headers().get(L5D_PROXY_ERROR).is_none(),
rsp.headers().get(L5D_PROXY_ERROR).is_none(),
"response must not contain L5D_PROXY_ERROR header"
);

Expand Down Expand Up @@ -529,11 +607,18 @@ async fn grpc_response_class() {
.body(Body::default())
.unwrap();

let mut response = http_util::http_request(&mut client, req).await.unwrap();
assert_eq!(response.status(), http::StatusCode::OK);
let mut rsp = client
.ready()
.await
.expect("HTTP client poll_ready failed")
.call(req)
.await
.expect("HTTP client request failed");
tracing::info!(?rsp);
assert_eq!(rsp.status(), http::StatusCode::OK);

response.body_mut().data().await;
let trls = response.body_mut().trailers().await.unwrap().unwrap();
rsp.body_mut().data().await;
let trls = rsp.body_mut().trailers().await.unwrap().unwrap();
assert_eq!(trls.get("grpc-status").unwrap().to_str().unwrap(), "2");

let response_total = metrics
Expand Down
6 changes: 1 addition & 5 deletions linkerd/app/test/src/connect.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::io;
use linkerd_app_core::{
svc::{Param, Service},
transport::{ClientAddr, Local, Remote, ServerAddr},
Expand All @@ -14,11 +15,6 @@ use std::{
};
use tracing::instrument::{Instrument, Instrumented};

mod io {
pub use linkerd_app_core::io::*;
pub use tokio_test::io::*;
}

type ConnectFn<T> = Box<dyn FnMut(T) -> ConnectFuture + Send>;

pub type ConnectFuture =
Expand Down
39 changes: 11 additions & 28 deletions linkerd/app/test/src/http_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
io, ContextError,
};
use futures::FutureExt;
use hyper::{body::HttpBody, Body, Request, Response};
use hyper::{body::HttpBody, Body};
use std::future::Future;
use tokio::task::JoinHandle;
use tower::{util::ServiceExt, Service};
Expand All @@ -14,7 +14,7 @@ use hyper::client::conn::{Builder as ClientBuilder, SendRequest};

type BoxServer = svc::BoxTcp<io::DuplexStream>;

pub async fn run_proxy(mut server: BoxServer) -> (io::DuplexStream, JoinHandle<Result<(), Error>>) {
async fn run_proxy(mut server: BoxServer) -> (io::DuplexStream, JoinHandle<Result<(), Error>>) {
let (client_io, server_io) = io::duplex(4096);
let f = server
.ready()
Expand All @@ -34,7 +34,7 @@ pub async fn run_proxy(mut server: BoxServer) -> (io::DuplexStream, JoinHandle<R
}

#[allow(deprecated)] // linkerd/linkerd2#8733
pub async fn connect_client(
async fn connect_client(
client_settings: &mut ClientBuilder,
io: io::DuplexStream,
) -> (SendRequest<Body>, JoinHandle<Result<(), Error>>) {
Expand Down Expand Up @@ -73,37 +73,20 @@ pub async fn connect_and_accept(
(client, bg)
}

#[tracing::instrument(skip(client))]
#[allow(deprecated)] // linkerd/linkerd2#8733
pub async fn http_request(
client: &mut SendRequest<Body>,
request: Request<Body>,
) -> Result<Response<Body>, Error> {
let rsp = client
.ready()
.await
.map_err(ContextError::ctx("HTTP client poll_ready failed"))?
.call(request)
.await
.map_err(ContextError::ctx("HTTP client request failed"))?;

tracing::info!(?rsp);

Ok(rsp)
}

/// Collects a request or response body, returning it as a [`String`].
pub async fn body_to_string<T>(body: T) -> Result<String, Error>
where
T: HttpBody,
T::Error: Into<Error>,
{
let body = body
let bytes = body
.collect()
.await
.map(http_body::Collected::to_bytes)
.map_err(ContextError::ctx("HTTP response body stream failed"))?;
let body = std::str::from_utf8(&body[..])
.map_err(ContextError::ctx("converting body to string failed"))?
.to_owned();
Ok(body)
.map_err(ContextError::ctx("HTTP response body stream failed"))?
.to_vec();

String::from_utf8(bytes)
.map_err(ContextError::ctx("converting body to string failed"))
.map_err(Into::into)
}
4 changes: 4 additions & 0 deletions linkerd/app/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ pub use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
pub use tokio::sync::oneshot;
pub use tower::Service;
pub use tracing::*;

/// I/O facilities for tests.
///
/// Provides [`AsyncRead`] and [`AsyncWrite`] types via [`tokio_test`], and via [`linkerd_io`].
pub mod io {
pub use linkerd_app_core::io::*;
pub use tokio_test::io::*;
Expand Down
Loading