Skip to content

Commit fbffdf6

Browse files
committed
Use Sink and Stream instead of manual buffering. Moar formatting
Signed-off-by: itowlson <[email protected]>
1 parent 4ee320f commit fbffdf6

File tree

4 files changed

+23
-33
lines changed

4 files changed

+23
-33
lines changed

crates/blobstore-fs/src/lib.rs

+8-14
Original file line numberDiff line numberDiff line change
@@ -206,23 +206,17 @@ impl spin_factor_blobstore::Container for FileSystemContainer {
206206

207207
impl FileSystemContainer {
208208
async fn write_data_core(
209-
mut data: tokio::io::ReadHalf<tokio::io::SimplexStream>,
210-
mut file: tokio::fs::File,
209+
data: tokio::io::ReadHalf<tokio::io::SimplexStream>,
210+
file: tokio::fs::File,
211211
) -> anyhow::Result<()> {
212-
use tokio::io::AsyncReadExt;
213-
use tokio::io::AsyncWriteExt;
212+
use futures::SinkExt;
213+
use tokio_util::codec::{BytesCodec, FramedWrite};
214214

215-
const BUF_SIZE: usize = 8192;
215+
// Ceremonies to turn `file` and `data` into Sink and Stream
216+
let mut file_sink = FramedWrite::new(file, BytesCodec::new());
217+
let mut data_stm = tokio_util::io::ReaderStream::new(data);
216218

217-
loop {
218-
let mut buf = vec![0; BUF_SIZE];
219-
let count = data.read(&mut buf).await?;
220-
if count == 0 {
221-
_ = file.flush().await;
222-
break;
223-
}
224-
file.write_all(&buf[0..count]).await?;
225-
}
219+
file_sink.send_all(&mut data_stm).await?;
226220

227221
Ok(())
228222
}

crates/blobstore-s3/src/store.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -292,11 +292,13 @@ impl S3Container {
292292
) -> anyhow::Result<()> {
293293
use object_store::ObjectStore;
294294

295+
const BUF_SIZE: usize = 5 * 1024 * 1024;
296+
295297
let mupload = store.put_multipart(&path).await?;
296298
let mut writer = object_store::WriteMultipart::new(mupload);
297299
loop {
298300
use tokio::io::AsyncReadExt;
299-
let mut buf = vec![0; 5 * 1024 * 1024];
301+
let mut buf = vec![0; BUF_SIZE];
300302
let read_amount = data.read(&mut buf).await?;
301303
if read_amount == 0 {
302304
break;

crates/factor-blobstore/src/host/object_names.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,7 @@ impl HostStreamObjectNames for BlobStoreDispatch<'_> {
2828
object_names.skip(num).await.map_err(|e| e.to_string())
2929
}
3030

31-
async fn drop(
32-
&mut self,
33-
rep: Resource<StreamObjectNames>,
34-
) -> anyhow::Result<()> {
31+
async fn drop(&mut self, rep: Resource<StreamObjectNames>) -> anyhow::Result<()> {
3532
self.object_names.write().await.remove(rep.rep());
3633
Ok(())
3734
}

crates/factor-blobstore/src/lib.rs

+11-14
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,22 @@
11
//! Example usage:
2-
//!
2+
//!
33
//! --------------------
4-
//!
4+
//!
55
//! spin.toml:
6-
//!
6+
//!
77
//! [component.foo]
88
//! blob_containers = ["default"]
99
//!
1010
//! --------------------
11-
//!
11+
//!
1212
//! runtime-config.toml
13-
//!
13+
//!
1414
//! [blob_store.default]
1515
//! type = "file_system" | "s3" | "azure_blob"
1616
//! # further config settings per type
17-
//!
17+
//!
1818
//! --------------------
19-
//!
19+
//!
2020
//! TODO: the naming here is not very consistent and we should make a more conscious
2121
//! decision about whether these things are "blob stores" or "containers" or what
2222
@@ -35,10 +35,7 @@ use spin_factors::{ConfigureAppContext, Factor, InitContext, PrepareContext, Run
3535
use spin_locked_app::MetadataKey;
3636
use spin_resource_table::Table;
3737

38-
pub use host::{
39-
BlobStoreDispatch, Container, ContainerManager, Error, IncomingData,
40-
ObjectNames,
41-
};
38+
pub use host::{BlobStoreDispatch, Container, ContainerManager, Error, IncomingData, ObjectNames};
4239
pub use runtime_config::RuntimeConfig;
4340
pub use spin_world::wasi::blobstore::types::{ContainerMetadata, ObjectMetadata};
4441
pub use stream::AsyncWriteStream;
@@ -170,15 +167,15 @@ pub struct InstanceBuilder {
170167
/// There are multiple WASI interfaces in play here. The factor adds each of them
171168
/// to the linker, passing a closure that derives the interface implementation
172169
/// from the InstanceBuilder.
173-
///
170+
///
174171
/// For the different interfaces to agree on their resource tables, each closure
175172
/// needs to derive the same resource table from the InstanceBuilder.
176173
/// The only* way that works is for the InstanceBuilder to set up all
177174
/// the resource tables, and Arc-RwLock them so that each clone gets
178175
/// the same one.
179-
///
176+
///
180177
/// * TODO: for 'only', read 'or maybe we can do some shenanigans with borrowing
181-
/// from the InstanceBuilder/instance state'
178+
/// from the InstanceBuilder/instance state'
182179
containers: Arc<RwLock<Table<Arc<dyn Container>>>>,
183180
incoming_values: Arc<RwLock<Table<Box<dyn IncomingData>>>>,
184181
outgoing_values: Arc<RwLock<Table<host::OutgoingValue>>>,

0 commit comments

Comments
 (0)