Skip to content

Commit cd6983f

Browse files
Nemo157cramertj
authored andcommitted
Make IntoSink defined for a single item type
1 parent c7e8d89 commit cd6983f

File tree

2 files changed

+13
-20
lines changed

2 files changed

+13
-20
lines changed

futures-util/src/io/into_sink.rs

+12-19
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,38 @@
11
use futures_core::task::{Context, Poll};
22
use futures_io::AsyncWrite;
33
use futures_sink::Sink;
4-
use std::fmt;
54
use std::io;
65
use std::pin::Pin;
76
use std::marker::Unpin;
87
use pin_utils::{unsafe_pinned, unsafe_unpinned};
98

10-
struct Block {
9+
#[derive(Debug)]
10+
struct Block<Item> {
1111
offset: usize,
12-
bytes: Box<dyn AsRef<[u8]>>,
12+
bytes: Item,
1313
}
1414

1515
/// Sink for the [`into_sink`](super::AsyncWriteExt::into_sink) method.
1616
#[must_use = "sinks do nothing unless polled"]
1717
#[derive(Debug)]
18-
pub struct IntoSink<W> {
18+
pub struct IntoSink<W, Item> {
1919
writer: W,
2020
/// An outstanding block for us to push into the underlying writer, along with an offset of how
2121
/// far into this block we have written already.
22-
buffer: Option<Block>,
22+
buffer: Option<Block<Item>>,
2323
}
2424

25-
impl<W: Unpin> Unpin for IntoSink<W> {}
25+
impl<W: Unpin, Item> Unpin for IntoSink<W, Item> {}
2626

27-
impl<W: AsyncWrite> IntoSink<W> {
27+
impl<W: AsyncWrite, Item: AsRef<[u8]>> IntoSink<W, Item> {
2828
unsafe_pinned!(writer: W);
29-
unsafe_unpinned!(buffer: Option<Block>);
29+
unsafe_unpinned!(buffer: Option<Block<Item>>);
3030

3131
pub(super) fn new(writer: W) -> Self {
3232
IntoSink { writer, buffer: None }
3333
}
3434

35-
fn project<'a>(self: Pin<&'a mut Self>) -> (Pin<&'a mut W>, &'a mut Option<Block>) {
35+
fn project<'a>(self: Pin<&'a mut Self>) -> (Pin<&'a mut W>, &'a mut Option<Block<Item>>) {
3636
unsafe {
3737
let this = self.get_unchecked_mut();
3838
(Pin::new_unchecked(&mut this.writer), &mut this.buffer)
@@ -49,7 +49,7 @@ impl<W: AsyncWrite> IntoSink<W> {
4949
let (mut writer, buffer) = self.project();
5050
if let Some(buffer) = buffer {
5151
loop {
52-
let bytes = (*buffer.bytes).as_ref();
52+
let bytes = buffer.bytes.as_ref();
5353
let written = ready!(writer.as_mut().poll_write(cx, &bytes[buffer.offset..]))?;
5454
buffer.offset += written;
5555
if buffer.offset == bytes.len() {
@@ -63,7 +63,7 @@ impl<W: AsyncWrite> IntoSink<W> {
6363

6464
}
6565

66-
impl<W: AsyncWrite, Item: AsRef<[u8]> + 'static> Sink<Item> for IntoSink<W> {
66+
impl<W: AsyncWrite, Item: AsRef<[u8]>> Sink<Item> for IntoSink<W, Item> {
6767
type SinkError = io::Error;
6868

6969
fn poll_ready(
@@ -81,7 +81,7 @@ impl<W: AsyncWrite, Item: AsRef<[u8]> + 'static> Sink<Item> for IntoSink<W> {
8181
) -> Result<(), Self::SinkError>
8282
{
8383
debug_assert!(self.as_mut().buffer().is_none());
84-
*self.as_mut().buffer() = Some(Block { offset: 0, bytes: Box::new(item)} );
84+
*self.as_mut().buffer() = Some(Block { offset: 0, bytes: item });
8585
Ok(())
8686
}
8787

@@ -105,10 +105,3 @@ impl<W: AsyncWrite, Item: AsRef<[u8]> + 'static> Sink<Item> for IntoSink<W> {
105105
Poll::Ready(Ok(()))
106106
}
107107
}
108-
109-
impl fmt::Debug for Block {
110-
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111-
write!(f, "[... {}/{} bytes ...]", self.offset, (*self.bytes).as_ref().len())?;
112-
Ok(())
113-
}
114-
}

futures-util/src/io/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ pub trait AsyncWriteExt: AsyncWrite {
414414
/// # })?;
415415
/// # Ok::<(), Box<dyn std::error::Error>>(())
416416
/// ```
417-
fn into_sink(self) -> IntoSink<Self>
417+
fn into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item>
418418
where Self: Sized,
419419
{
420420
IntoSink::new(self)

0 commit comments

Comments
 (0)