Skip to content

Commit 3c0c32e

Browse files
Nemo157cramertj
authored andcommitted
Add AsyncWriteExt::into_sink
1 parent 5c2f3d9 commit 3c0c32e

File tree

2 files changed

+152
-0
lines changed

2 files changed

+152
-0
lines changed

futures-util/src/io/into_sink.rs

+115
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
use futures_core::task::{Context, Poll};
2+
use futures_io::AsyncWrite;
3+
use futures_sink::Sink;
4+
use std::fmt;
5+
use std::io;
6+
use std::pin::Pin;
7+
use std::marker::Unpin;
8+
use pin_utils::{unsafe_pinned, unsafe_unpinned};
9+
10+
/// Sink for the [`into_sink`](super::AsyncWriteExt::into_sink) method.
11+
#[must_use = "sinks do nothing unless polled"]
12+
pub struct IntoSink<W> {
13+
writer: W,
14+
/// An outstanding block for us to push into the underlying writer, along with an index of how
15+
/// far into the block we have written already.
16+
buffer: Option<(usize, Box<dyn AsRef<[u8]>>)>,
17+
}
18+
19+
impl<W: Unpin> Unpin for IntoSink<W> {}
20+
21+
impl<W: AsyncWrite> IntoSink<W> {
22+
unsafe_pinned!(writer: W);
23+
unsafe_unpinned!(buffer: Option<(usize, Box<dyn AsRef<[u8]>>)>);
24+
25+
pub(super) fn new(writer: W) -> Self {
26+
IntoSink { writer, buffer: None }
27+
}
28+
29+
fn project<'a>(self: Pin<&'a mut Self>) -> (Pin<&'a mut W>, &'a mut Option<(usize, Box<dyn AsRef<[u8]>>)>) {
30+
unsafe {
31+
let this = self.get_unchecked_mut();
32+
(Pin::new_unchecked(&mut this.writer), &mut this.buffer)
33+
}
34+
}
35+
36+
/// If we have an outstanding block in `buffer` attempt to push it into the writer, does _not_
37+
/// flush the writer after it succeeds in pushing the block into it.
38+
fn poll_flush_buffer(
39+
self: Pin<&mut Self>,
40+
cx: &mut Context<'_>,
41+
) -> Poll<Result<(), io::Error>>
42+
{
43+
let (mut writer, buffer) = self.project();
44+
if let Some((index, block)) = buffer {
45+
loop {
46+
let bytes = (**block).as_ref();
47+
let written = ready!(writer.as_mut().poll_write(cx, &bytes[*index..]))?;
48+
*index += written;
49+
if *index == bytes.len() {
50+
break;
51+
}
52+
}
53+
*buffer = None;
54+
}
55+
Poll::Ready(Ok(()))
56+
}
57+
58+
}
59+
60+
impl<W: AsyncWrite, Item: AsRef<[u8]> + 'static> Sink<Item> for IntoSink<W> {
61+
type SinkError = io::Error;
62+
63+
fn poll_ready(
64+
mut self: Pin<&mut Self>,
65+
cx: &mut Context<'_>,
66+
) -> Poll<Result<(), Self::SinkError>>
67+
{
68+
ready!(self.as_mut().poll_flush_buffer(cx))?;
69+
Poll::Ready(Ok(()))
70+
}
71+
72+
fn start_send(
73+
mut self: Pin<&mut Self>,
74+
item: Item,
75+
) -> Result<(), Self::SinkError>
76+
{
77+
debug_assert!(self.as_mut().buffer().is_none());
78+
*self.as_mut().buffer() = Some((0, Box::new(item)));
79+
Ok(())
80+
}
81+
82+
fn poll_flush(
83+
mut self: Pin<&mut Self>,
84+
cx: &mut Context<'_>,
85+
) -> Poll<Result<(), Self::SinkError>>
86+
{
87+
ready!(self.as_mut().poll_flush_buffer(cx))?;
88+
ready!(self.as_mut().writer().poll_flush(cx))?;
89+
Poll::Ready(Ok(()))
90+
}
91+
92+
fn poll_close(
93+
mut self: Pin<&mut Self>,
94+
cx: &mut Context<'_>,
95+
) -> Poll<Result<(), Self::SinkError>>
96+
{
97+
ready!(self.as_mut().poll_flush_buffer(cx))?;
98+
ready!(self.as_mut().writer().poll_close(cx))?;
99+
Poll::Ready(Ok(()))
100+
}
101+
}
102+
103+
impl<W: fmt::Debug> fmt::Debug for IntoSink<W> {
104+
105+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106+
let buffer = self.buffer.as_ref().map(|(size, block)| {
107+
(size, format!("[... {} bytes ...]", (**block).as_ref().len()))
108+
});
109+
f.debug_struct("IntoSink")
110+
.field("writer", &self.writer)
111+
.field("buffer", &buffer)
112+
.finish()?;
113+
Ok(())
114+
}
115+
}

futures-util/src/io/mod.rs

+37
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ pub use self::copy_into::CopyInto;
3434
mod flush;
3535
pub use self::flush::Flush;
3636

37+
mod into_sink;
38+
pub use self::into_sink::IntoSink;
39+
3740
mod lines;
3841
pub use self::lines::Lines;
3942

@@ -382,6 +385,40 @@ pub trait AsyncWriteExt: AsyncWrite {
382385
{
383386
Compat::new(self)
384387
}
388+
389+
390+
/// Allow using an [`AsyncWrite`] as a [`Sink`](futures_sink::Sink)`<Item: AsRef<[u8]>>`.
391+
///
392+
/// This adapter produces a sink that will write each value passed to it
393+
/// into the underlying writer.
394+
///
395+
/// Note that this function consumes the given writer, returning a wrapped
396+
/// version.
397+
///
398+
/// # Examples
399+
///
400+
/// ```
401+
/// #![feature(async_await)]
402+
/// # futures::executor::block_on(async {
403+
/// use futures::io::AsyncWriteExt;
404+
/// use futures::stream::{self, StreamExt};
405+
///
406+
/// let stream = stream::iter(vec![Ok([1, 2, 3]), Ok([4, 5, 6])]);
407+
///
408+
/// let mut writer = vec![];
409+
///
410+
/// stream.forward((&mut writer).into_sink()).await?;
411+
///
412+
/// assert_eq!(writer, vec![1, 2, 3, 4, 5, 6]);
413+
/// # Ok::<(), Box<dyn std::error::Error>>(())
414+
/// # })?;
415+
/// # Ok::<(), Box<dyn std::error::Error>>(())
416+
/// ```
417+
fn into_sink(self) -> IntoSink<Self>
418+
where Self: Sized,
419+
{
420+
IntoSink::new(self)
421+
}
385422
}
386423

387424
impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}

0 commit comments

Comments
 (0)