Skip to content

Commit 036c841

Browse files
wip
1 parent f05ec8c commit 036c841

File tree

5 files changed

+96
-14
lines changed

5 files changed

+96
-14
lines changed

Cargo.lock

+63
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ rust-version = "1.63"
1414
[dependencies]
1515
abao = { version = "0.2.0", features = ["group_size_16k", "tokio_io"], default-features = false }
1616
anyhow = { version = "1", features = ["backtrace"] }
17+
async-compression = { version = "0.3.15", features = ["tokio", "zstd"] }
1718
base64 = "0.21.0"
1819
blake3 = "1.3.3"
1920
bytes = "1"

src/get.rs

+20-12
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use anyhow::{anyhow, bail, Context, Result};
1919
use bytes::BytesMut;
2020
use futures::Future;
2121
use postcard::experimental::max_size::MaxSize;
22-
use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf};
22+
use tokio::io::{AsyncRead, AsyncReadExt, BufReader, ReadBuf};
2323
use tracing::{debug, error};
2424

2525
pub use crate::util::Hash;
@@ -103,18 +103,21 @@ impl Stats {
103103
/// We guarantee that the data is correct by incrementally verifying a hash
104104
#[repr(transparent)]
105105
#[derive(Debug)]
106-
pub struct DataStream(AsyncSliceDecoder<quinn::RecvStream>);
106+
pub struct DataStream(AsyncSliceDecoder<RecvStream>);
107+
108+
type RecvStream =
109+
async_compression::tokio::bufread::ZstdDecoder<tokio::io::BufReader<quinn::RecvStream>>;
107110

108111
impl DataStream {
109-
fn new(inner: quinn::RecvStream, hash: Hash) -> Self {
112+
fn new(inner: RecvStream, hash: Hash) -> Self {
110113
DataStream(AsyncSliceDecoder::new(inner, &hash.into(), 0, u64::MAX))
111114
}
112115

113116
async fn read_size(&mut self) -> io::Result<u64> {
114117
self.0.read_size().await
115118
}
116119

117-
fn into_inner(self) -> quinn::RecvStream {
120+
fn into_inner(self) -> RecvStream {
118121
self.0.into_inner()
119122
}
120123
}
@@ -149,7 +152,7 @@ where
149152
let now = Instant::now();
150153
let connection = setup(opts).await?;
151154

152-
let (mut writer, mut reader) = connection.open_bi().await?;
155+
let (mut writer, reader) = connection.open_bi().await?;
153156

154157
on_connected().await?;
155158

@@ -181,6 +184,7 @@ where
181184
{
182185
debug!("reading response");
183186
let mut in_buffer = BytesMut::with_capacity(1024);
187+
let mut reader = BufReader::new(reader);
184188

185189
// track total amount of blob data transferred
186190
let mut data_len = 0;
@@ -218,7 +222,7 @@ where
218222
if blob_reader.read_exact(&mut [0u8; 1]).await.is_ok() {
219223
bail!("`on_blob` callback did not fully read the blob content")
220224
}
221-
reader = blob_reader.into_inner();
225+
reader = blob_reader.into_inner().into_inner();
222226
}
223227
}
224228

@@ -236,11 +240,12 @@ where
236240
}
237241

238242
// Shut down the stream
239-
if let Some(chunk) = reader.read_chunk(8, false).await? {
240-
reader.stop(0u8.into()).ok();
241-
error!("Received unexpected data from the provider: {chunk:?}");
243+
if let Ok(bytes) = reader.read_u8().await {
244+
reader.into_inner().stop(0u8.into()).ok();
245+
error!("Received unexpected data from the provider: {bytes:?}");
246+
} else {
247+
drop(reader);
242248
}
243-
drop(reader);
244249

245250
let elapsed = now.elapsed();
246251

@@ -261,7 +266,7 @@ where
261266
/// The `AsyncReader` can be used to read the content.
262267
async fn handle_blob_response(
263268
hash: Hash,
264-
mut reader: quinn::RecvStream,
269+
mut reader: BufReader<quinn::RecvStream>,
265270
buffer: &mut BytesMut,
266271
) -> Result<DataStream> {
267272
match read_lp(&mut reader, buffer).await? {
@@ -277,7 +282,10 @@ async fn handle_blob_response(
277282
// next blob in collection will be sent over
278283
Res::Found => {
279284
assert!(buffer.is_empty());
280-
let decoder = DataStream::new(reader, hash);
285+
// Decompress data
286+
let decompress_reader =
287+
async_compression::tokio::bufread::ZstdDecoder::new(reader);
288+
let decoder = DataStream::new(decompress_reader, hash);
281289
Ok(decoder)
282290
}
283291
}

src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ mod tests {
4141

4242
#[tokio::test]
4343
async fn basics() -> Result<()> {
44+
setup_logging();
4445
transfer_data(vec![("hello_world", "hello world!".as_bytes().to_vec())]).await
4546
}
4647

src/provider/mod.rs

+11-2
Original file line numberDiff line numberDiff line change
@@ -837,18 +837,26 @@ async fn send_blob<W: AsyncWrite + Unpin + Send + 'static>(
837837
// need to thread the writer though the spawn_blocking, since
838838
// taking a reference does not work. spawn_blocking requires
839839
// 'static lifetime.
840+
840841
writer = tokio::task::spawn_blocking(move || {
842+
// Compress data
843+
let mut compressed_writer =
844+
async_compression::tokio::write::ZstdEncoder::with_quality(
845+
writer,
846+
async_compression::Level::Fastest,
847+
);
848+
841849
let file_reader = std::fs::File::open(&path)?;
842850
let outboard_reader = std::io::Cursor::new(outboard);
843-
let mut wrapper = SyncIoBridge::new(&mut writer);
851+
let mut wrapper = SyncIoBridge::new(&mut compressed_writer);
844852
let mut slice_extractor = abao::encode::SliceExtractor::new_outboard(
845853
file_reader,
846854
outboard_reader,
847855
0,
848856
size,
849857
);
850858
let _copied = std::io::copy(&mut slice_extractor, &mut wrapper)?;
851-
std::io::Result::Ok(writer)
859+
std::io::Result::Ok(compressed_writer.into_inner())
852860
})
853861
.await??;
854862

@@ -1069,6 +1077,7 @@ async fn write_response<W: AsyncWrite + Unpin>(
10691077
}
10701078
let used = postcard::to_slice(&response, buffer)?;
10711079

1080+
// Write lp
10721081
write_lp(&mut writer, used).await?;
10731082

10741083
debug!("written response of length {}", used.len());

0 commit comments

Comments
 (0)