Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(volo-http): wrap hyper::body::Incoming by Body #504

Merged
merged 2 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 72 additions & 30 deletions volo-http/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,36 @@ use std::{

use bytes::Bytes;
use faststr::FastStr;
use futures_util::{ready, Stream};
use futures_util::stream::Stream;
use http_body::{Frame, SizeHint};
use http_body_util::{combinators::BoxBody, BodyExt, Full, StreamBody};
pub use hyper::body::Incoming;
use motore::BoxError;
use hyper::body::Incoming;
use pin_project::pin_project;
#[cfg(feature = "json")]
use serde::de::DeserializeOwned;

use crate::error::BoxError;

// The `futures_util::stream::BoxStream` does not have `Sync`
type BoxStream<'a, T> = Pin<Box<dyn Stream<Item = T> + Send + Sync + 'a>>;

/// An implementation for [`http_body::Body`].
#[pin_project]
pub struct Body {
#[pin]
repr: BodyRepr,
}

#[pin_project(project = BodyProj)]
pub enum Body {
enum BodyRepr {
/// Complete [`Bytes`], with a certain size and content
Full(#[pin] Full<Bytes>),
/// Wrapper of [`hyper::body::Incoming`], it usually appers in request of server or response of
/// client.
///
/// Althrough [`hyper::body::Incoming`] implements [`http_body::Body`], the type is so commonly
/// used, we wrap it here as [`Body::Hyper`] to avoid cost of [`Box`] with dynamic dispatch.
Hyper(#[pin] Incoming),
/// Boxed stream with `Item = Result<Frame<Bytes>, BoxError>`
Stream(#[pin] StreamBody<BoxStream<'static, Result<Frame<Bytes>, BoxError>>>),
/// Boxed [`http_body::Body`]
Expand All @@ -44,15 +57,29 @@ impl Default for Body {
impl Body {
/// Create an empty body.
pub fn empty() -> Self {
Self::Full(Full::new(Bytes::new()))
Self {
repr: BodyRepr::Full(Full::new(Bytes::new())),
}
}

/// Create a body by [`hyper::body::Incoming`].
///
/// Compared to [`Body::from_body`], this function avoids overhead of allocating by [`Box`]
/// and dynamic dispatch by [`dyn http_body::Body`][http_body::Body].
pub fn from_incoming(incoming: Incoming) -> Self {
Self {
repr: BodyRepr::Hyper(incoming),
}
}

/// Create a body by a [`Stream`] with `Item = Result<Frame<Bytes>, BoxError>`.
pub fn from_stream<S>(stream: S) -> Self
where
S: Stream<Item = Result<Frame<Bytes>, BoxError>> + Send + Sync + 'static,
{
Self::Stream(StreamBody::new(Box::pin(stream)))
Self {
repr: BodyRepr::Stream(StreamBody::new(Box::pin(stream))),
}
}

/// Create a body by another [`http_body::Body`] instance.
Expand All @@ -61,7 +88,9 @@ impl Body {
B: http_body::Body<Data = Bytes> + Send + Sync + 'static,
B::Error: Into<BoxError>,
{
Self::Body(BoxBody::new(body.map_err(Into::into)))
Self {
repr: BodyRepr::Body(BoxBody::new(body.map_err(Into::into))),
}
}
}

Expand All @@ -73,39 +102,42 @@ impl http_body::Body for Body {
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.project() {
BodyProj::Full(full) => {
// Convert `Infallible` to `BoxError`
Poll::Ready(ready!(full.poll_frame(cx)).map(|res| Ok(res?)))
match self.project().repr.project() {
BodyProj::Full(full) => http_body::Body::poll_frame(full, cx).map_err(BoxError::from),
BodyProj::Hyper(incoming) => {
http_body::Body::poll_frame(incoming, cx).map_err(BoxError::from)
}
BodyProj::Stream(stream) => stream.poll_frame(cx),
BodyProj::Body(body) => body.poll_frame(cx),
BodyProj::Stream(stream) => http_body::Body::poll_frame(stream, cx),
BodyProj::Body(body) => http_body::Body::poll_frame(body, cx),
}
}

fn is_end_stream(&self) -> bool {
match self {
Self::Full(full) => full.is_end_stream(),
Self::Stream(stream) => stream.is_end_stream(),
Self::Body(body) => body.is_end_stream(),
match &self.repr {
BodyRepr::Full(full) => http_body::Body::is_end_stream(full),
BodyRepr::Hyper(incoming) => http_body::Body::is_end_stream(incoming),
BodyRepr::Stream(stream) => http_body::Body::is_end_stream(stream),
BodyRepr::Body(body) => http_body::Body::is_end_stream(body),
}
}

fn size_hint(&self) -> SizeHint {
match self {
Self::Full(full) => full.size_hint(),
Self::Stream(stream) => http_body::Body::size_hint(stream),
Self::Body(body) => body.size_hint(),
match &self.repr {
BodyRepr::Full(full) => http_body::Body::size_hint(full),
BodyRepr::Hyper(incoming) => http_body::Body::size_hint(incoming),
BodyRepr::Stream(stream) => http_body::Body::size_hint(stream),
BodyRepr::Body(body) => http_body::Body::size_hint(body),
}
}
}

impl fmt::Debug for Body {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Full(_) => f.write_str("Body::Full"),
Self::Stream(_) => f.write_str("Body::Stream"),
Self::Body(_) => f.write_str("Body::Body"),
match &self.repr {
BodyRepr::Full(_) => f.write_str("Body::Full"),
BodyRepr::Hyper(_) => f.write_str("Body::Hyper"),
BodyRepr::Stream(_) => f.write_str("Body::Stream"),
BodyRepr::Body(_) => f.write_str("Body::Body"),
}
}
}
Expand Down Expand Up @@ -256,30 +288,40 @@ impl From<()> for Body {

impl From<&'static str> for Body {
fn from(value: &'static str) -> Self {
Self::Full(Full::new(Bytes::from_static(value.as_bytes())))
Self {
repr: BodyRepr::Full(Full::new(Bytes::from_static(value.as_bytes()))),
}
}
}

impl From<Vec<u8>> for Body {
fn from(value: Vec<u8>) -> Self {
Self::Full(Full::new(Bytes::from(value)))
Self {
repr: BodyRepr::Full(Full::new(Bytes::from(value))),
}
}
}

impl From<Bytes> for Body {
fn from(value: Bytes) -> Self {
Self::Full(Full::new(value))
Self {
repr: BodyRepr::Full(Full::new(value)),
}
}
}

impl From<FastStr> for Body {
fn from(value: FastStr) -> Self {
Self::Full(Full::new(value.into_bytes()))
Self {
repr: BodyRepr::Full(Full::new(value.into_bytes())),
}
}
}

impl From<String> for Body {
fn from(value: String) -> Self {
Self::Full(Full::new(Bytes::from(value)))
Self {
repr: BodyRepr::Full(Full::new(Bytes::from(value))),
}
}
}
7 changes: 3 additions & 4 deletions volo-http/src/client/transport.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::error::Error;

use http_body::Body;
use hyper::client::conn::http1;
use hyper_util::rt::TokioIo;
use motore::{make::MakeConnection, service::Service};
Expand Down Expand Up @@ -116,7 +115,7 @@ impl ClientTransport {
req: ClientRequest<B>,
) -> Result<ClientResponse, ClientError>
where
B: Body + Send + 'static,
B: http_body::Body + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn Error + Send + Sync>> + 'static,
{
Expand All @@ -132,13 +131,13 @@ impl ClientTransport {
tracing::error!("[Volo-HTTP] failed to send request, error: {err}");
request_error(err)
})?;
Ok(resp)
Ok(resp.map(crate::body::Body::from_incoming))
}
}

impl<B> Service<ClientContext, ClientRequest<B>> for ClientTransport
where
B: Body + Send + 'static,
B: http_body::Body + Send + 'static,
B::Data: Send,
B::Error: Into<Box<dyn Error + Send + Sync>> + 'static,
{
Expand Down
2 changes: 1 addition & 1 deletion volo-http/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub type ClientRequest<B = crate::body::Body> = Request<B>;
///
/// [`Incoming`]: hyper::body::Incoming
#[cfg(feature = "server")]
pub type ServerRequest<B = hyper::body::Incoming> = Request<B>;
pub type ServerRequest<B = crate::body::Body> = Request<B>;

/// HTTP header [`X-Forwarded-For`][mdn].
///
Expand Down
20 changes: 10 additions & 10 deletions volo-http/src/response.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
//! Response types for client and server.

use hyper::body::Incoming;

use crate::body::Body;

/// [`Response`][Response] with [`Body`] as default body
/// [`Response`] with [`Body`] as default body
///
/// [Response]: http::Response
pub type ServerResponse<B = Body> = http::Response<B>;
/// [`Response`]: http::response::Response
/// [`Body`]: crate::body::Body
#[cfg(feature = "server")]
pub type ServerResponse<B = crate::body::Body> = http::response::Response<B>;

/// [`Response`][Response] with [`Incoming`] as default body
/// [`Response`] with [`Body`] as default body
///
/// [Response]: http::Response
pub type ClientResponse<B = Incoming> = http::Response<B>;
/// [`Response`]: http::response::Response
/// [`Body`]: crate::body::Body
#[cfg(feature = "client")]
pub type ClientResponse<B = crate::body::Body> = http::response::Response<B>;
5 changes: 2 additions & 3 deletions volo-http/src/server/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@

use std::{convert::Infallible, marker::PhantomData, sync::Arc};

use hyper::body::Incoming;
use motore::{layer::Layer, service::Service};

use super::{
handler::{MiddlewareHandlerFromFn, MiddlewareHandlerMapResponse},
route::Route,
IntoResponse,
};
use crate::{context::ServerContext, request::ServerRequest, response::ServerResponse};
use crate::{body::Body, context::ServerContext, request::ServerRequest, response::ServerResponse};

/// A [`Layer`] from an async function
///
Expand Down Expand Up @@ -239,7 +238,7 @@ where
/// response.
///
/// See [`from_fn`] for more details.
pub struct Next<B = Incoming, E = Infallible> {
pub struct Next<B = Body, E = Infallible> {
service: Route<B, E>,
}

Expand Down
12 changes: 9 additions & 3 deletions volo-http/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,9 @@ struct HyperService<S> {
config: Config,
}

impl<S, E> hyper::service::Service<ServerRequest> for HyperService<S>
type HyperRequest = http::request::Request<hyper::body::Incoming>;

impl<S, E> hyper::service::Service<HyperRequest> for HyperService<S>
where
S: Service<ServerContext, ServerRequest, Error = E> + Clone + Send + Sync + 'static,
S::Response: IntoResponse,
Expand All @@ -485,13 +487,17 @@ where
type Error = Infallible;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn call(&self, req: ServerRequest) -> Self::Future {
fn call(&self, req: HyperRequest) -> Self::Future {
let service = self.clone();
Box::pin(
METAINFO.scope(RefCell::new(MetaInfo::default()), async move {
let mut cx = ServerContext::new(service.peer);
cx.rpc_info_mut().set_config(service.config);
Ok(service.inner.call(&mut cx, req).await.into_response())
Ok(service
.inner
.call(&mut cx, req.map(Body::from_incoming))
.await
.into_response())
}),
)
}
Expand Down
6 changes: 3 additions & 3 deletions volo-http/src/server/route/method_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@
use std::convert::Infallible;

use http::{method::Method, status::StatusCode};
use hyper::body::Incoming;
use motore::{layer::Layer, service::Service, ServiceExt};
use paste::paste;

use super::{Fallback, Route};
use crate::{
body::Body,
context::ServerContext,
request::ServerRequest,
response::ServerResponse,
Expand Down Expand Up @@ -65,7 +65,7 @@ use crate::{
/// let app: Router = Router::new().route("/", get(index));
/// let app: Router = Router::new().route("/", get(index).post(index).head(index));
/// ```
pub struct MethodRouter<B = Incoming, E = Infallible> {
pub struct MethodRouter<B = Body, E = Infallible> {
options: MethodEndpoint<B, E>,
get: MethodEndpoint<B, E>,
post: MethodEndpoint<B, E>,
Expand Down Expand Up @@ -344,7 +344,7 @@ where
}

#[derive(Default)]
enum MethodEndpoint<B = Incoming, E = Infallible> {
enum MethodEndpoint<B = Body, E = Infallible> {
#[default]
None,
Route(Route<B, E>),
Expand Down
7 changes: 3 additions & 4 deletions volo-http/src/server/route/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@
use std::{convert::Infallible, future::Future, marker::PhantomData};

use http::status::StatusCode;
use hyper::body::Incoming;
use motore::{layer::Layer, service::Service, ServiceExt};

use super::{handler::Handler, IntoResponse};
use crate::{context::ServerContext, request::ServerRequest, response::ServerResponse};
use crate::{body::Body, context::ServerContext, request::ServerRequest, response::ServerResponse};

pub mod method_router;
pub mod router;
Expand All @@ -24,7 +23,7 @@ mod utils;
pub use self::{method_router::*, router::Router};

/// The route service used for [`Router`].
pub struct Route<B = Incoming, E = Infallible> {
pub struct Route<B = Body, E = Infallible> {
inner: motore::service::BoxService<ServerContext, ServerRequest<B>, ServerResponse, E>,
}

Expand Down Expand Up @@ -57,7 +56,7 @@ impl<B, E> Service<ServerContext, ServerRequest<B>> for Route<B, E> {
}
}

enum Fallback<B = Incoming, E = Infallible> {
enum Fallback<B = Body, E = Infallible> {
Route(Route<B, E>),
}

Expand Down
Loading
Loading