Skip to content

Commit 2322d39

Browse files
authored
Add StreamBody (#237)
This adds `StreamBody` which converts a `Stream` of `Bytes` into a `http_body::Body`. --- As suggested by Kestrer on Discord it would make sense for axum to provide different kinds of body types other than `Empty`, `Full`, and `hyper::Body`. There is also some talk about [splitting up `hyper::Body`](hyperium/hyper#2345) so this can be seen as getting started on that effort. axum's body types could be moved to hyper or http-body if thats the direction we decide on. The types I'm thinking about adding are: - `StreamBody`- added in this PR - `AsyncReadBody` - similar to [http-body#41](https://github.com/hyperium/http-body/pull/41/files) - `ChannelBody` - similar to `hyper::Body::channel`
1 parent 5ae94b6 commit 2322d39

File tree

5 files changed

+110
-0
lines changed

5 files changed

+110
-0
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
4848
- Responses:
4949
- **added:** Add `Headers` for easily customizing headers on a response ([#193](https://github.com/tokio-rs/axum/pull/193))
5050
- **added:** Add `Redirect` response ([#192](https://github.com/tokio-rs/axum/pull/192))
51+
- **added:** Add `body::StreamBody` for easily responding with a stream of byte chunks ([#237](https://github.com/tokio-rs/axum/pull/237))
5152
- **changed:** Add associated `Body` and `BodyError` types to `IntoResponse`. This is
5253
required for returning responses with bodies other than `hyper::Body` from
5354
handlers. See the docs for advice on how to implement `IntoResponse` ([#86](https://github.com/tokio-rs/axum/pull/86))

src/body.rs

+4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
use crate::BoxError;
44
use crate::Error;
55

6+
mod stream_body;
7+
8+
pub use self::stream_body::StreamBody;
9+
610
#[doc(no_inline)]
711
pub use http_body::{Body as HttpBody, Empty, Full};
812

src/body/stream_body.rs

+103
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
use crate::{BoxError, Error};
2+
use bytes::Bytes;
3+
use futures_util::stream::{self, Stream, TryStreamExt};
4+
use http::HeaderMap;
5+
use http_body::Body;
6+
use std::convert::Infallible;
7+
use std::{
8+
fmt,
9+
pin::Pin,
10+
task::{Context, Poll},
11+
};
12+
use sync_wrapper::SyncWrapper;
13+
14+
/// An [`http_body::Body`] created from a [`Stream`].
15+
///
16+
/// # Example
17+
///
18+
/// ```
19+
/// use axum::{
20+
/// Router,
21+
/// handler::get,
22+
/// body::StreamBody,
23+
/// };
24+
/// use futures::stream;
25+
///
26+
/// async fn handler() -> StreamBody {
27+
/// let chunks: Vec<Result<_, std::io::Error>> = vec![
28+
/// Ok("Hello,"),
29+
/// Ok(" "),
30+
/// Ok("world!"),
31+
/// ];
32+
/// let stream = stream::iter(chunks);
33+
/// StreamBody::new(stream)
34+
/// }
35+
///
36+
/// let app = Router::new().route("/", get(handler));
37+
/// # async {
38+
/// # axum::Server::bind(&"".parse().unwrap()).serve(app.into_make_service()).await.unwrap();
39+
/// # };
40+
/// ```
41+
///
42+
/// [`Stream`]: futures_util::stream::Stream
43+
// this should probably be extracted to `http_body`, eventually...
44+
pub struct StreamBody {
45+
stream: SyncWrapper<Pin<Box<dyn Stream<Item = Result<Bytes, Error>> + Send>>>,
46+
}
47+
48+
impl StreamBody {
49+
/// Create a new `StreamBody` from a [`Stream`].
50+
///
51+
/// [`Stream`]: futures_util::stream::Stream
52+
pub fn new<S, T, E>(stream: S) -> Self
53+
where
54+
S: Stream<Item = Result<T, E>> + Send + 'static,
55+
T: Into<Bytes> + 'static,
56+
E: Into<BoxError> + 'static,
57+
{
58+
let stream = stream
59+
.map_ok(Into::into)
60+
.map_err(|err| Error::new(err.into()));
61+
Self {
62+
stream: SyncWrapper::new(Box::pin(stream)),
63+
}
64+
}
65+
}
66+
67+
impl Default for StreamBody {
68+
fn default() -> Self {
69+
Self::new(stream::empty::<Result<Bytes, Infallible>>())
70+
}
71+
}
72+
73+
impl fmt::Debug for StreamBody {
74+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75+
f.debug_tuple("StreamBody").finish()
76+
}
77+
}
78+
79+
impl Body for StreamBody {
80+
type Data = Bytes;
81+
type Error = Error;
82+
83+
fn poll_data(
84+
mut self: Pin<&mut Self>,
85+
cx: &mut Context<'_>,
86+
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
87+
Pin::new(self.stream.get_mut()).poll_next(cx)
88+
}
89+
90+
fn poll_trailers(
91+
self: Pin<&mut Self>,
92+
_cx: &mut Context<'_>,
93+
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
94+
Poll::Ready(Ok(None))
95+
}
96+
}
97+
98+
#[test]
99+
fn stream_body_traits() {
100+
crate::tests::assert_send::<StreamBody>();
101+
crate::tests::assert_sync::<StreamBody>();
102+
crate::tests::assert_unpin::<StreamBody>();
103+
}

src/response/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,7 @@ macro_rules! impl_into_response_for_body {
198198
impl_into_response_for_body!(hyper::Body);
199199
impl_into_response_for_body!(Full<Bytes>);
200200
impl_into_response_for_body!(Empty<Bytes>);
201+
impl_into_response_for_body!(crate::body::StreamBody);
201202

202203
impl<E> IntoResponse for http_body::combinators::BoxBody<Bytes, E>
203204
where

src/tests/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -701,3 +701,4 @@ where
701701

702702
pub(crate) fn assert_send<T: Send>() {}
703703
pub(crate) fn assert_sync<T: Sync>() {}
704+
pub(crate) fn assert_unpin<T: Unpin>() {}

0 commit comments

Comments
 (0)