Skip to content

Commit 59c9364

Browse files
authored
io: pass through IO traits for StreamReader and SinkWriter (tokio-rs#5941)
1 parent 3b79be6 commit 59c9364

File tree

3 files changed

+47
-1
lines changed

3 files changed

+47
-1
lines changed

tokio-util/src/io/copy_to_bytes.rs

+8
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use bytes::Bytes;
2+
use futures_core::stream::Stream;
23
use futures_sink::Sink;
34
use pin_project_lite::pin_project;
45
use std::pin::Pin;
@@ -66,3 +67,10 @@ where
6667
self.project().inner.poll_close(cx)
6768
}
6869
}
70+
71+
impl<S: Stream> Stream for CopyToBytes<S> {
72+
type Item = S::Item;
73+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
74+
self.project().inner.poll_next(cx)
75+
}
76+
}

tokio-util/src/io/sink_writer.rs

+19-1
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use futures_core::ready;
22
use futures_sink::Sink;
33

4+
use futures_core::stream::Stream;
45
use pin_project_lite::pin_project;
56
use std::io;
67
use std::pin::Pin;
78
use std::task::{Context, Poll};
8-
use tokio::io::AsyncWrite;
9+
use tokio::io::{AsyncRead, AsyncWrite};
910

1011
pin_project! {
1112
/// Convert a [`Sink`] of byte chunks into an [`AsyncWrite`].
@@ -115,3 +116,20 @@ where
115116
self.project().inner.poll_close(cx).map_err(Into::into)
116117
}
117118
}
119+
120+
impl<S: Stream> Stream for SinkWriter<S> {
121+
type Item = S::Item;
122+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
123+
self.project().inner.poll_next(cx)
124+
}
125+
}
126+
127+
impl<S: AsyncRead> AsyncRead for SinkWriter<S> {
128+
fn poll_read(
129+
self: Pin<&mut Self>,
130+
cx: &mut Context<'_>,
131+
buf: &mut tokio::io::ReadBuf<'_>,
132+
) -> Poll<io::Result<()>> {
133+
self.project().inner.poll_read(cx, buf)
134+
}
135+
}

tokio-util/src/io/stream_reader.rs

+20
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use bytes::Buf;
22
use futures_core::stream::Stream;
3+
use futures_sink::Sink;
34
use std::io;
45
use std::pin::Pin;
56
use std::task::{Context, Poll};
@@ -324,3 +325,22 @@ impl<S, B> StreamReader<S, B> {
324325
}
325326
}
326327
}
328+
329+
impl<S: Sink<T, Error = E>, E, T> Sink<T> for StreamReader<S, E> {
330+
type Error = E;
331+
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
332+
self.project().inner.poll_ready(cx)
333+
}
334+
335+
fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
336+
self.project().inner.start_send(item)
337+
}
338+
339+
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
340+
self.project().inner.poll_flush(cx)
341+
}
342+
343+
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
344+
self.project().inner.poll_close(cx)
345+
}
346+
}

0 commit comments

Comments
 (0)