diff --git a/linkerd/app/inbound/src/http.rs b/linkerd/app/inbound/src/http.rs index 395bfbea00..3de8c136d2 100644 --- a/linkerd/app/inbound/src/http.rs +++ b/linkerd/app/inbound/src/http.rs @@ -74,7 +74,14 @@ pub mod fuzz { .header(header_name, header_value) .body(Body::default()) { - let rsp = http_util::http_request(&mut client, req).await; + let rsp = client + .ready() + .await + .expect("HTTP client poll_ready failed") + .call(req) + .await + .expect("HTTP client request failed"); + tracing::info!(?rsp); tracing::info!(?rsp); if let Ok(rsp) = rsp { let body = http_util::body_to_string(rsp.into_body()).await; diff --git a/linkerd/app/inbound/src/http/tests.rs b/linkerd/app/inbound/src/http/tests.rs index fa19104c98..77cc109ff6 100644 --- a/linkerd/app/inbound/src/http/tests.rs +++ b/linkerd/app/inbound/src/http/tests.rs @@ -21,6 +21,7 @@ use linkerd_app_test::connect::ConnectFuture; use linkerd_tracing::test::trace_init; use std::{net::SocketAddr, sync::Arc}; use tokio::time; +use tower::ServiceExt; use tracing::Instrument; fn build_server( @@ -65,19 +66,22 @@ async fn unmeshed_http1_hello_world() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_HTTP1); - let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (client, bg) = http_util::connect_and_accept(&mut client, server).await; let req = Request::builder() .method(http::Method::GET) .uri("http://foo.svc.cluster.local:5550") .body(Body::default()) .unwrap(); - let rsp = http_util::http_request(&mut client, req).await.unwrap(); + let rsp = client + .oneshot(req) + .await + .expect("HTTP client request failed"); + tracing::info!(?rsp); assert_eq!(rsp.status(), http::StatusCode::OK); let body = http_util::body_to_string(rsp.into_body()).await.unwrap(); assert_eq!(body, "Hello world!"); - drop(client); bg.await.expect("background task failed"); } @@ -104,7 +108,7 @@ async fn downgrade_origin_form() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_H2); - let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (client, bg) = http_util::connect_and_accept(&mut client, server).await; let req = Request::builder() .method(http::Method::GET) @@ -113,12 +117,15 @@ async fn downgrade_origin_form() { .header("l5d-orig-proto", "HTTP/1.1") .body(Body::default()) .unwrap(); - let rsp = http_util::http_request(&mut client, req).await.unwrap(); + let rsp = client + .oneshot(req) + .await + .expect("HTTP client request failed"); + tracing::info!(?rsp); assert_eq!(rsp.status(), http::StatusCode::OK); let body = http_util::body_to_string(rsp.into_body()).await.unwrap(); assert_eq!(body, "Hello world!"); - drop(client); bg.await.expect("background task failed"); } @@ -144,7 +151,7 @@ async fn downgrade_absolute_form() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_H2); - let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (client, bg) = http_util::connect_and_accept(&mut client, server).await; let req = Request::builder() .method(http::Method::GET) @@ -153,12 +160,15 @@ async fn downgrade_absolute_form() { .header("l5d-orig-proto", "HTTP/1.1; absolute-form") .body(Body::default()) .unwrap(); - let rsp = http_util::http_request(&mut client, req).await.unwrap(); + let rsp = client + .oneshot(req) + .await + .expect("HTTP client request failed"); + tracing::info!(?rsp); assert_eq!(rsp.status(), http::StatusCode::OK); let body = http_util::body_to_string(rsp.into_body()).await.unwrap(); assert_eq!(body, "Hello world!"); - drop(client); bg.await.expect("background task failed"); } @@ -180,7 +190,7 @@ async fn http1_bad_gateway_meshed_response_error_header() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_http1()); - let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (client, bg) = http_util::connect_and_accept(&mut client, server).await; // Send a request and assert that it is a BAD_GATEWAY with the expected // header message. @@ -189,15 +199,18 @@ async fn http1_bad_gateway_meshed_response_error_header() { .uri("http://foo.svc.cluster.local:5550") .body(Body::default()) .unwrap(); - let response = http_util::http_request(&mut client, req).await.unwrap(); - assert_eq!(response.status(), http::StatusCode::BAD_GATEWAY); + let rsp = client + .oneshot(req) + .await + .expect("HTTP client request failed"); + tracing::info!(?rsp); + assert_eq!(rsp.status(), http::StatusCode::BAD_GATEWAY); // NOTE: this does not include a stack error context for that endpoint // because we don't build a real HTTP endpoint stack, which adds error // context to this error, and the client rescue layer is below where the // logical error context is added. - check_error_header(response.headers(), "server is not listening"); + check_error_header(rsp.headers(), "server is not listening"); - drop(client); bg.await.expect("background task failed"); } @@ -219,7 +232,7 @@ async fn http1_bad_gateway_unmeshed_response() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_HTTP1); - let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (client, bg) = http_util::connect_and_accept(&mut client, server).await; // Send a request and assert that it is a BAD_GATEWAY with the expected // header message. @@ -228,14 +241,17 @@ async fn http1_bad_gateway_unmeshed_response() { .uri("http://foo.svc.cluster.local:5550") .body(Body::default()) .unwrap(); - let response = http_util::http_request(&mut client, req).await.unwrap(); - assert_eq!(response.status(), http::StatusCode::BAD_GATEWAY); + let rsp = client + .oneshot(req) + .await + .expect("HTTP client request failed"); + tracing::info!(?rsp); + assert_eq!(rsp.status(), http::StatusCode::BAD_GATEWAY); assert!( - response.headers().get(L5D_PROXY_ERROR).is_none(), + rsp.headers().get(L5D_PROXY_ERROR).is_none(), "response must not contain L5D_PROXY_ERROR header" ); - drop(client); bg.await.expect("background task failed"); } @@ -261,7 +277,7 @@ async fn http1_connect_timeout_meshed_response_error_header() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_http1()); - let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (client, bg) = http_util::connect_and_accept(&mut client, server).await; // Send a request and assert that it is a GATEWAY_TIMEOUT with the // expected header message. @@ -270,16 +286,19 @@ async fn http1_connect_timeout_meshed_response_error_header() { .uri("http://foo.svc.cluster.local:5550") .body(Body::default()) .unwrap(); - let response = http_util::http_request(&mut client, req).await.unwrap(); - assert_eq!(response.status(), http::StatusCode::GATEWAY_TIMEOUT); + let rsp = client + .oneshot(req) + .await + .expect("HTTP client request failed"); + tracing::info!(?rsp); + assert_eq!(rsp.status(), http::StatusCode::GATEWAY_TIMEOUT); // NOTE: this does not include a stack error context for that endpoint // because we don't build a real HTTP endpoint stack, which adds error // context to this error, and the client rescue layer is below where the // logical error context is added. - check_error_header(response.headers(), "connect timed out after 1s"); + check_error_header(rsp.headers(), "connect timed out after 1s"); - drop(client); bg.await.expect("background task failed"); } @@ -305,7 +324,7 @@ async fn http1_connect_timeout_unmeshed_response_error_header() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_HTTP1); - let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (client, bg) = http_util::connect_and_accept(&mut client, server).await; // Send a request and assert that it is a GATEWAY_TIMEOUT with the // expected header message. @@ -314,14 +333,17 @@ async fn http1_connect_timeout_unmeshed_response_error_header() { .uri("http://foo.svc.cluster.local:5550") .body(Body::default()) .unwrap(); - let response = http_util::http_request(&mut client, req).await.unwrap(); - assert_eq!(response.status(), http::StatusCode::GATEWAY_TIMEOUT); + let rsp = client + .oneshot(req) + .await + .expect("HTTP client request failed"); + tracing::info!(?rsp); + assert_eq!(rsp.status(), http::StatusCode::GATEWAY_TIMEOUT); assert!( - response.headers().get(L5D_PROXY_ERROR).is_none(), + rsp.headers().get(L5D_PROXY_ERROR).is_none(), "response must not contain L5D_PROXY_ERROR header" ); - drop(client); bg.await.expect("background task failed"); } @@ -343,7 +365,7 @@ async fn h2_response_meshed_error_header() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_h2()); - let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (client, bg) = http_util::connect_and_accept(&mut client, server).await; // Send a request and assert that it is SERVICE_UNAVAILABLE with the // expected header message. @@ -352,15 +374,18 @@ async fn h2_response_meshed_error_header() { .uri("http://foo.svc.cluster.local:5550") .body(Body::default()) .unwrap(); - let response = http_util::http_request(&mut client, req).await.unwrap(); - assert_eq!(response.status(), http::StatusCode::GATEWAY_TIMEOUT); + let rsp = client + .oneshot(req) + .await + .expect("HTTP client request failed"); + tracing::info!(?rsp); + assert_eq!(rsp.status(), http::StatusCode::GATEWAY_TIMEOUT); - check_error_header(response.headers(), "service in fail-fast"); + check_error_header(rsp.headers(), "service in fail-fast"); // Drop the client and discard the result of awaiting the proxy background // task. The result is discarded because it hits an error that is related // to the mock implementation and has no significance to the test. - drop(client); let _ = bg.await; } @@ -382,7 +407,7 @@ async fn h2_response_unmeshed_error_header() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_H2); - let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (client, bg) = http_util::connect_and_accept(&mut client, server).await; // Send a request and assert that it is SERVICE_UNAVAILABLE with the // expected header message. @@ -391,17 +416,20 @@ async fn h2_response_unmeshed_error_header() { .uri("http://foo.svc.cluster.local:5550") .body(Body::default()) .unwrap(); - let response = http_util::http_request(&mut client, req).await.unwrap(); - assert_eq!(response.status(), http::StatusCode::GATEWAY_TIMEOUT); + let rsp = client + .oneshot(req) + .await + .expect("HTTP client request failed"); + tracing::info!(?rsp); + assert_eq!(rsp.status(), http::StatusCode::GATEWAY_TIMEOUT); assert!( - response.headers().get(L5D_PROXY_ERROR).is_none(), + rsp.headers().get(L5D_PROXY_ERROR).is_none(), "response must not contain L5D_PROXY_ERROR header" ); // Drop the client and discard the result of awaiting the proxy background // task. The result is discarded because it hits an error that is related // to the mock implementation and has no significance to the test. - drop(client); let _ = bg.await; } @@ -423,7 +451,7 @@ async fn grpc_meshed_response_error_header() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_h2()); - let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (client, bg) = http_util::connect_and_accept(&mut client, server).await; // Send a request and assert that it is OK with the expected header // message. @@ -433,15 +461,18 @@ async fn grpc_meshed_response_error_header() { .header(http::header::CONTENT_TYPE, "application/grpc") .body(Body::default()) .unwrap(); - let response = http_util::http_request(&mut client, req).await.unwrap(); - assert_eq!(response.status(), http::StatusCode::OK); + let rsp = client + .oneshot(req) + .await + .expect("HTTP client request failed"); + tracing::info!(?rsp); + assert_eq!(rsp.status(), http::StatusCode::OK); - check_error_header(response.headers(), "service in fail-fast"); + check_error_header(rsp.headers(), "service in fail-fast"); // Drop the client and discard the result of awaiting the proxy background // task. The result is discarded because it hits an error that is related // to the mock implementation and has no significance to the test. - drop(client); let _ = bg.await; } @@ -463,7 +494,7 @@ async fn grpc_unmeshed_response_error_header() { let cfg = default_config(); let (rt, _shutdown) = runtime(); let server = build_server(cfg, rt, profiles, connect).new_service(Target::UNMESHED_H2); - let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (client, bg) = http_util::connect_and_accept(&mut client, server).await; // Send a request and assert that it is OK with the expected header // message. @@ -473,17 +504,20 @@ async fn grpc_unmeshed_response_error_header() { .header(http::header::CONTENT_TYPE, "application/grpc") .body(Body::default()) .unwrap(); - let response = http_util::http_request(&mut client, req).await.unwrap(); - assert_eq!(response.status(), http::StatusCode::OK); + let rsp = client + .oneshot(req) + .await + .expect("HTTP client request failed"); + tracing::info!(?rsp); + assert_eq!(rsp.status(), http::StatusCode::OK); assert!( - response.headers().get(L5D_PROXY_ERROR).is_none(), + rsp.headers().get(L5D_PROXY_ERROR).is_none(), "response must not contain L5D_PROXY_ERROR header" ); // Drop the client and discard the result of awaiting the proxy background // task. The result is discarded because it hits an error that is related // to the mock implementation and has no significance to the test. - drop(client); let _ = bg.await; } @@ -518,7 +552,7 @@ async fn grpc_response_class() { .http_endpoint .into_report(time::Duration::from_secs(3600)); let server = build_server(cfg, rt, profiles, connect).new_service(Target::meshed_h2()); - let (mut client, bg) = http_util::connect_and_accept(&mut client, server).await; + let (client, bg) = http_util::connect_and_accept(&mut client, server).await; // Send a request and assert that it is OK with the expected header // message. @@ -529,11 +563,15 @@ async fn grpc_response_class() { .body(Body::default()) .unwrap(); - let mut response = http_util::http_request(&mut client, req).await.unwrap(); - assert_eq!(response.status(), http::StatusCode::OK); + let mut rsp = client + .oneshot(req) + .await + .expect("HTTP client request failed"); + tracing::info!(?rsp); + assert_eq!(rsp.status(), http::StatusCode::OK); - response.body_mut().data().await; - let trls = response.body_mut().trailers().await.unwrap().unwrap(); + rsp.body_mut().data().await; + let trls = rsp.body_mut().trailers().await.unwrap().unwrap(); assert_eq!(trls.get("grpc-status").unwrap().to_str().unwrap(), "2"); let response_total = metrics @@ -564,7 +602,7 @@ async fn grpc_response_class() { .expect("response_total not found"); assert_eq!(response_total, 1.0); - drop((client, bg)); + drop(bg); } #[tracing::instrument] diff --git a/linkerd/app/test/src/connect.rs b/linkerd/app/test/src/connect.rs index 4ac7a91916..dcc789bafc 100644 --- a/linkerd/app/test/src/connect.rs +++ b/linkerd/app/test/src/connect.rs @@ -1,3 +1,4 @@ +use crate::io; use linkerd_app_core::{ svc::{Param, Service}, transport::{ClientAddr, Local, Remote, ServerAddr}, @@ -14,11 +15,6 @@ use std::{ }; use tracing::instrument::{Instrument, Instrumented}; -mod io { - pub use linkerd_app_core::io::*; - pub use tokio_test::io::*; -} - type ConnectFn = Box ConnectFuture + Send>; pub type ConnectFuture = diff --git a/linkerd/app/test/src/http_util.rs b/linkerd/app/test/src/http_util.rs index 9b68f1a783..1519a0bdfc 100644 --- a/linkerd/app/test/src/http_util.rs +++ b/linkerd/app/test/src/http_util.rs @@ -3,7 +3,7 @@ use crate::{ io, ContextError, }; use futures::FutureExt; -use hyper::{body::HttpBody, Body, Request, Response}; +use hyper::{body::HttpBody, Body}; use std::future::Future; use tokio::task::JoinHandle; use tower::{util::ServiceExt, Service}; @@ -14,7 +14,7 @@ use hyper::client::conn::{Builder as ClientBuilder, SendRequest}; type BoxServer = svc::BoxTcp; -pub async fn run_proxy(mut server: BoxServer) -> (io::DuplexStream, JoinHandle>) { +async fn run_proxy(mut server: BoxServer) -> (io::DuplexStream, JoinHandle>) { let (client_io, server_io) = io::duplex(4096); let f = server .ready() @@ -34,7 +34,7 @@ pub async fn run_proxy(mut server: BoxServer) -> (io::DuplexStream, JoinHandle (SendRequest, JoinHandle>) { @@ -73,37 +73,20 @@ pub async fn connect_and_accept( (client, bg) } -#[tracing::instrument(skip(client))] -#[allow(deprecated)] // linkerd/linkerd2#8733 -pub async fn http_request( - client: &mut SendRequest, - request: Request, -) -> Result, Error> { - let rsp = client - .ready() - .await - .map_err(ContextError::ctx("HTTP client poll_ready failed"))? - .call(request) - .await - .map_err(ContextError::ctx("HTTP client request failed"))?; - - tracing::info!(?rsp); - - Ok(rsp) -} - +/// Collects a request or response body, returning it as a [`String`]. pub async fn body_to_string(body: T) -> Result where T: HttpBody, T::Error: Into, { - let body = body + let bytes = body .collect() .await .map(http_body::Collected::to_bytes) - .map_err(ContextError::ctx("HTTP response body stream failed"))?; - let body = std::str::from_utf8(&body[..]) - .map_err(ContextError::ctx("converting body to string failed"))? - .to_owned(); - Ok(body) + .map_err(ContextError::ctx("HTTP response body stream failed"))? + .to_vec(); + + String::from_utf8(bytes) + .map_err(ContextError::ctx("converting body to string failed")) + .map_err(Into::into) } diff --git a/linkerd/app/test/src/lib.rs b/linkerd/app/test/src/lib.rs index e49cb35b6c..1e6ec51e8b 100644 --- a/linkerd/app/test/src/lib.rs +++ b/linkerd/app/test/src/lib.rs @@ -14,6 +14,10 @@ pub use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; pub use tokio::sync::oneshot; pub use tower::Service; pub use tracing::*; + +/// I/O facilities for tests. +/// +/// Provides [`AsyncRead`] and [`AsyncWrite`] types via [`tokio_test`], and via [`linkerd_io`]. pub mod io { pub use linkerd_app_core::io::*; pub use tokio_test::io::*;