@@ -7,26 +7,32 @@ use std::pin::Pin;
7
7
use std:: marker:: Unpin ;
8
8
use pin_utils:: { unsafe_pinned, unsafe_unpinned} ;
9
9
10
+ struct Block {
11
+ offset : usize ,
12
+ bytes : Box < dyn AsRef < [ u8 ] > > ,
13
+ }
14
+
10
15
/// Sink for the [`into_sink`](super::AsyncWriteExt::into_sink) method.
11
16
#[ must_use = "sinks do nothing unless polled" ]
17
+ #[ derive( Debug ) ]
12
18
pub struct IntoSink < W > {
13
19
writer : W ,
14
- /// An outstanding block for us to push into the underlying writer, along with an index of how
15
- /// far into the block we have written already.
16
- buffer : Option < ( usize , Box < dyn AsRef < [ u8 ] > > ) > ,
20
+ /// An outstanding block for us to push into the underlying writer, along with an offset of how
21
+ /// far into this block we have written already.
22
+ buffer : Option < Block > ,
17
23
}
18
24
19
25
impl < W : Unpin > Unpin for IntoSink < W > { }
20
26
21
27
impl < W : AsyncWrite > IntoSink < W > {
22
28
unsafe_pinned ! ( writer: W ) ;
23
- unsafe_unpinned ! ( buffer: Option <( usize , Box <dyn AsRef < [ u8 ] >> ) >) ;
29
+ unsafe_unpinned ! ( buffer: Option <Block >) ;
24
30
25
31
pub ( super ) fn new ( writer : W ) -> Self {
26
32
IntoSink { writer, buffer : None }
27
33
}
28
34
29
- fn project < ' a > ( self : Pin < & ' a mut Self > ) -> ( Pin < & ' a mut W > , & ' a mut Option < ( usize , Box < dyn AsRef < [ u8 ] > > ) > ) {
35
+ fn project < ' a > ( self : Pin < & ' a mut Self > ) -> ( Pin < & ' a mut W > , & ' a mut Option < Block > ) {
30
36
unsafe {
31
37
let this = self . get_unchecked_mut ( ) ;
32
38
( Pin :: new_unchecked ( & mut this. writer ) , & mut this. buffer )
@@ -41,17 +47,17 @@ impl<W: AsyncWrite> IntoSink<W> {
41
47
) -> Poll < Result < ( ) , io:: Error > >
42
48
{
43
49
let ( mut writer, buffer) = self . project ( ) ;
44
- if let Some ( ( index , block ) ) = buffer {
50
+ if let Some ( buffer ) = buffer {
45
51
loop {
46
- let bytes = ( * * block ) . as_ref ( ) ;
47
- let written = ready ! ( writer. as_mut( ) . poll_write( cx, & bytes[ * index ..] ) ) ?;
48
- * index += written;
49
- if * index == bytes. len ( ) {
52
+ let bytes = ( * buffer . bytes ) . as_ref ( ) ;
53
+ let written = ready ! ( writer. as_mut( ) . poll_write( cx, & bytes[ buffer . offset ..] ) ) ?;
54
+ buffer . offset += written;
55
+ if buffer . offset == bytes. len ( ) {
50
56
break ;
51
57
}
52
58
}
53
- * buffer = None ;
54
59
}
60
+ * buffer = None ;
55
61
Poll :: Ready ( Ok ( ( ) ) )
56
62
}
57
63
@@ -75,7 +81,7 @@ impl<W: AsyncWrite, Item: AsRef<[u8]> + 'static> Sink<Item> for IntoSink<W> {
75
81
) -> Result < ( ) , Self :: SinkError >
76
82
{
77
83
debug_assert ! ( self . as_mut( ) . buffer( ) . is_none( ) ) ;
78
- * self . as_mut ( ) . buffer ( ) = Some ( ( 0 , Box :: new ( item) ) ) ;
84
+ * self . as_mut ( ) . buffer ( ) = Some ( Block { offset : 0 , bytes : Box :: new ( item) } ) ;
79
85
Ok ( ( ) )
80
86
}
81
87
@@ -100,16 +106,9 @@ impl<W: AsyncWrite, Item: AsRef<[u8]> + 'static> Sink<Item> for IntoSink<W> {
100
106
}
101
107
}
102
108
103
- impl < W : fmt:: Debug > fmt:: Debug for IntoSink < W > {
104
-
109
+ impl fmt:: Debug for Block {
105
110
fn fmt ( & self , f : & mut fmt:: Formatter < ' _ > ) -> fmt:: Result {
106
- let buffer = self . buffer . as_ref ( ) . map ( |( size, block) | {
107
- ( size, format ! ( "[... {} bytes ...]" , ( * * block) . as_ref( ) . len( ) ) )
108
- } ) ;
109
- f. debug_struct ( "IntoSink" )
110
- . field ( "writer" , & self . writer )
111
- . field ( "buffer" , & buffer)
112
- . finish ( ) ?;
111
+ write ! ( f, "[... {}/{} bytes ...]" , self . offset, ( * self . bytes) . as_ref( ) . len( ) ) ?;
113
112
Ok ( ( ) )
114
113
}
115
114
}
0 commit comments