From 0b413a6fdcfa4075cf434c30fb6492be18ff0fb0 Mon Sep 17 00:00:00 2001 From: katelyn martin Date: Sun, 1 Dec 2024 00:00:00 +0000 Subject: [PATCH] chore(http/upgrade): replace `hyper::Body` with `BoxBody` `hyper::Body` is removed in the 1.0 version. this commit removes it from our upgrade facilities, using a generic body parameter that defaults to BoxBody. see . Signed-off-by: katelyn martin --- Cargo.lock | 1 + linkerd/http/upgrade/Cargo.toml | 1 + linkerd/http/upgrade/src/glue.rs | 80 +++++++++++++++-------------- linkerd/http/upgrade/src/upgrade.rs | 10 ++-- 4 files changed, 49 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 94270712c2..64e536d5ba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1859,6 +1859,7 @@ dependencies = [ "hyper", "linkerd-duplex", "linkerd-error", + "linkerd-http-box", "linkerd-http-version", "linkerd-io", "linkerd-stack", diff --git a/linkerd/http/upgrade/Cargo.toml b/linkerd/http/upgrade/Cargo.toml index 4cc5e01ccb..1787fddb6e 100644 --- a/linkerd/http/upgrade/Cargo.toml +++ b/linkerd/http/upgrade/Cargo.toml @@ -24,6 +24,7 @@ try-lock = "0.2" linkerd-duplex = { path = "../../duplex" } linkerd-error = { path = "../../error" } +linkerd-http-box = { path = "../box" } linkerd-http-version = { path = "../version" } linkerd-io = { path = "../../io" } linkerd-stack = { path = "../../stack" } diff --git a/linkerd/http/upgrade/src/glue.rs b/linkerd/http/upgrade/src/glue.rs index 2e7ac2d7da..551fe93563 100644 --- a/linkerd/http/upgrade/src/glue.rs +++ b/linkerd/http/upgrade/src/glue.rs @@ -1,9 +1,9 @@ use crate::upgrade::Http11Upgrade; -use bytes::Bytes; -use futures::TryFuture; +use futures::{ready, TryFuture}; use http_body::Body; use hyper::client::connect as hyper_connect; use linkerd_error::{Error, Result}; +use linkerd_http_box::BoxBody; use linkerd_io::{self as io, AsyncRead, AsyncWrite}; use linkerd_stack::{MakeConnection, Service}; use pin_project::{pin_project, pinned_drop}; @@ -17,10 +17,11 @@ use tracing::debug; /// Provides optional HTTP/1.1 upgrade support on the body. #[pin_project(PinnedDrop)] #[derive(Debug)] -pub struct UpgradeBody { +pub struct UpgradeBody { /// In UpgradeBody::drop, if this was an HTTP upgrade, the body is taken /// to be inserted into the Http11Upgrade half. - body: hyper::Body, + #[pin] + body: B, pub(super) upgrade: Option<(Http11Upgrade, hyper::upgrade::OnUpgrade)>, } @@ -50,9 +51,13 @@ pub struct HyperConnectFuture { // === impl UpgradeBody === -impl Body for UpgradeBody { - type Data = Bytes; - type Error = hyper::Error; +impl Body for UpgradeBody +where + B: Body + Unpin, + B::Error: std::fmt::Display, +{ + type Data = B::Data; + type Error = B::Error; fn is_end_stream(&self) -> bool { self.body.is_end_stream() @@ -62,28 +67,34 @@ impl Body for UpgradeBody { self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>> { - let body = self.project().body; - let poll = futures::ready!(Pin::new(body) // `hyper::Body` is Unpin - .poll_data(cx)); - Poll::Ready(poll.map(|x| { - x.map_err(|e| { - debug!("http body error: {}", e); - e - }) - })) + // Poll the next chunk from the body. + let this = self.project(); + let body = this.body; + let data = ready!(body.poll_data(cx)); + + // Log errors. + if let Some(Err(e)) = &data { + debug!("http body error: {}", e); + } + + Poll::Ready(data) } fn poll_trailers( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll, Self::Error>> { - let body = self.project().body; - Pin::new(body) // `hyper::Body` is Unpin - .poll_trailers(cx) - .map_err(|e| { - debug!("http trailers error: {}", e); - e - }) + // Poll the trailers from the body. + let this = self.project(); + let body = this.body; + let trailers = ready!(body.poll_trailers(cx)); + + // Log errors. + if let Err(e) = &trailers { + debug!("http trailers error: {}", e); + } + + Poll::Ready(trailers) } #[inline] @@ -92,32 +103,23 @@ impl Body for UpgradeBody { } } -impl Default for UpgradeBody { +impl Default for UpgradeBody { fn default() -> Self { - hyper::Body::empty().into() - } -} - -impl From for UpgradeBody { - fn from(body: hyper::Body) -> Self { Self { - body, + body: B::default(), upgrade: None, } } } -impl UpgradeBody { - pub fn new( - body: hyper::Body, - upgrade: Option<(Http11Upgrade, hyper::upgrade::OnUpgrade)>, - ) -> Self { +impl UpgradeBody { + pub fn new(body: B, upgrade: Option<(Http11Upgrade, hyper::upgrade::OnUpgrade)>) -> Self { Self { body, upgrade } } } #[pinned_drop] -impl PinnedDrop for UpgradeBody { +impl PinnedDrop for UpgradeBody { fn drop(self: Pin<&mut Self>) { let this = self.project(); // If an HTTP/1 upgrade was wanted, send the upgrade future. @@ -164,6 +166,8 @@ where } } +// === impl HyperConnectFuture === + impl Future for HyperConnectFuture where F: TryFuture + 'static, @@ -181,7 +185,7 @@ where } } -// === impl Connected === +// === impl Connection === impl AsyncRead for Connection where diff --git a/linkerd/http/upgrade/src/upgrade.rs b/linkerd/http/upgrade/src/upgrade.rs index 1094584f83..3d6975472f 100644 --- a/linkerd/http/upgrade/src/upgrade.rs +++ b/linkerd/http/upgrade/src/upgrade.rs @@ -174,20 +174,20 @@ impl Service { type ResponseFuture = Either, E>>>; -impl tower::Service> for Service +impl tower::Service> for Service where - S: tower::Service, Response = http::Response>, - B: Default, + S: tower::Service>, Response = http::Response>, + RespB: Default, { type Response = S::Response; type Error = S::Error; - type Future = ResponseFuture; + type Future = ResponseFuture; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.service.poll_ready(cx) } - fn call(&mut self, mut req: http::Request) -> Self::Future { + fn call(&mut self, mut req: http::Request) -> Self::Future { // Should this rejection happen later in the Service stack? // // Rejecting here means telemetry doesn't record anything about it...