Skip to content

Commit 429ea75

Browse files
committed
Fixing features, more renaming and reorganising
Signed-off-by: itowlson <[email protected]>
1 parent a276903 commit 429ea75

File tree

9 files changed

+885
-363
lines changed

9 files changed

+885
-363
lines changed

crates/blobstore-azure/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ spin-core = { path = "../core" }
1919
spin-factor-blobstore = { path = "../factor-blobstore" }
2020
tokio = { workspace = true }
2121
tokio-stream = "0.1.16"
22-
tokio-util = "0.7.12"
22+
tokio-util = { version = "0.7.12", features = ["compat"] }
2323
uuid = { version = "1.0", features = ["v4"] }
2424
wasmtime-wasi = { workspace = true }
2525

crates/blobstore-fs/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ spin-core = { path = "../core" }
1616
spin-factor-blobstore = { path = "../factor-blobstore" }
1717
tokio = { workspace = true }
1818
tokio-stream = "0.1.16"
19-
tokio-util = "0.7.12"
19+
tokio-util = { version = "0.7.12", features = ["codec", "compat"] }
2020
walkdir = "2.5"
2121
wasmtime-wasi = { workspace = true }
2222

crates/blobstore-s3/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ spin-core = { path = "../core" }
2525
spin-factor-blobstore = { path = "../factor-blobstore" }
2626
tokio = { workspace = true }
2727
tokio-stream = "0.1.16"
28-
tokio-util = "0.7.12"
28+
tokio-util = { version = "0.7.12", features = ["compat"] }
2929
uuid = { version = "1.0", features = ["v4"] }
3030
wasmtime-wasi = { workspace = true }
3131

crates/blobstore-s3/src/lib.rs

+10-11
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ mod store;
22

33
use serde::Deserialize;
44
use spin_factor_blobstore::runtime_config::spin::MakeBlobStore;
5-
use store::BlobStoreS3;
5+
use store::S3ContainerManager;
66

77
/// A blob store that uses a S3-compatible service as the backend.
88
/// This currently supports only AWS S3
@@ -40,29 +40,28 @@ impl MakeBlobStore for S3BlobStore {
4040

4141
type RuntimeConfig = S3BlobStoreRuntimeConfig;
4242

43-
type ContainerManager = BlobStoreS3;
43+
type ContainerManager = S3ContainerManager;
4444

4545
fn make_store(
4646
&self,
4747
runtime_config: Self::RuntimeConfig,
4848
) -> anyhow::Result<Self::ContainerManager> {
4949
let auth = match (&runtime_config.access_key, &runtime_config.secret_key) {
5050
(Some(access_key), Some(secret_key)) => {
51-
store::BlobStoreS3AuthOptions::RuntimeConfigValues(
52-
store::BlobStoreS3RuntimeConfigOptions::new(
53-
access_key.clone(),
54-
secret_key.clone(),
55-
runtime_config.token.clone(),
56-
),
57-
)
51+
store::S3AuthOptions::AccessKey(store::S3KeyAuth::new(
52+
access_key.clone(),
53+
secret_key.clone(),
54+
runtime_config.token.clone(),
55+
))
5856
}
59-
(None, None) => store::BlobStoreS3AuthOptions::Environmental,
57+
(None, None) => store::S3AuthOptions::Environmental,
6058
_ => anyhow::bail!(
6159
"either both of access_key and secret_key must be provided, or neither"
6260
),
6361
};
6462

65-
let blob_store = BlobStoreS3::new(runtime_config.region, auth, runtime_config.bucket)?;
63+
let blob_store =
64+
S3ContainerManager::new(runtime_config.region, auth, runtime_config.bucket)?;
6665
Ok(blob_store)
6766
}
6867
}

crates/blobstore-s3/src/store.rs

+27-229
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
1-
use aws_sdk_s3::config::http::HttpResponse as AwsHttpResponse;
2-
use aws_sdk_s3::error::SdkError;
3-
use aws_sdk_s3::operation::get_object;
4-
use aws_sdk_s3::operation::list_objects_v2;
5-
use aws_smithy_async::future::pagination_stream::PaginationStream;
61
use std::sync::Arc;
7-
use tokio::sync::Mutex;
82

93
use anyhow::Result;
104
use spin_core::async_trait;
115
use spin_factor_blobstore::{Container, ContainerManager, Error};
126

13-
pub struct BlobStoreS3 {
7+
mod auth;
8+
mod incoming_data;
9+
mod object_names;
10+
11+
pub use auth::{S3AuthOptions, S3KeyAuth};
12+
use incoming_data::S3IncomingData;
13+
use object_names::S3ObjectNames;
14+
15+
pub struct S3ContainerManager {
1416
builder: object_store::aws::AmazonS3Builder,
1517
client: async_once_cell::Lazy<
1618
aws_sdk_s3::Client,
@@ -19,85 +21,32 @@ pub struct BlobStoreS3 {
1921
bucket: Option<String>,
2022
}
2123

22-
/// AWS S3 runtime config literal options for authentication
23-
#[derive(Clone, Debug)]
24-
pub struct BlobStoreS3RuntimeConfigOptions {
25-
/// The access key for the AWS S3 account role.
26-
access_key: String,
27-
/// The secret key for authorization on the AWS S3 account.
28-
secret_key: String,
29-
/// The token for authorization on the AWS S3 account.
30-
token: Option<String>,
31-
}
32-
33-
impl BlobStoreS3RuntimeConfigOptions {
34-
pub fn new(access_key: String, secret_key: String, token: Option<String>) -> Self {
35-
Self {
36-
access_key,
37-
secret_key,
38-
token,
39-
}
40-
}
41-
}
42-
43-
impl aws_credential_types::provider::ProvideCredentials for BlobStoreS3RuntimeConfigOptions {
44-
fn provide_credentials<'a>(
45-
&'a self,
46-
) -> aws_credential_types::provider::future::ProvideCredentials<'a>
47-
where
48-
Self: 'a,
49-
{
50-
aws_credential_types::provider::future::ProvideCredentials::ready(Ok(
51-
aws_credential_types::Credentials::new(
52-
self.access_key.clone(),
53-
self.secret_key.clone(),
54-
self.token.clone(),
55-
None, // Optional expiration time
56-
"spin_custom_aws_provider",
57-
),
58-
))
59-
}
60-
}
61-
62-
/// AWS S3 authentication options
63-
#[derive(Clone, Debug)]
64-
pub enum BlobStoreS3AuthOptions {
65-
/// Runtime Config values indicates the account and key have been specified directly
66-
RuntimeConfigValues(BlobStoreS3RuntimeConfigOptions),
67-
/// Use environment variables
68-
Environmental,
69-
}
70-
71-
impl BlobStoreS3 {
24+
impl S3ContainerManager {
7225
pub fn new(
7326
region: String,
74-
auth_options: BlobStoreS3AuthOptions,
27+
auth_options: S3AuthOptions,
7528
bucket: Option<String>,
7629
) -> Result<Self> {
7730
let builder = match &auth_options {
78-
BlobStoreS3AuthOptions::RuntimeConfigValues(config) => {
79-
object_store::aws::AmazonS3Builder::new()
80-
.with_region(&region)
81-
.with_access_key_id(&config.access_key)
82-
.with_secret_access_key(&config.secret_key)
83-
.with_token(config.token.clone().unwrap_or_default())
84-
}
85-
BlobStoreS3AuthOptions::Environmental => object_store::aws::AmazonS3Builder::from_env(),
31+
S3AuthOptions::AccessKey(config) => object_store::aws::AmazonS3Builder::new()
32+
.with_region(&region)
33+
.with_access_key_id(&config.access_key)
34+
.with_secret_access_key(&config.secret_key)
35+
.with_token(config.token.clone().unwrap_or_default()),
36+
S3AuthOptions::Environmental => object_store::aws::AmazonS3Builder::from_env(),
8637
};
8738

8839
let region_clone = region.clone();
8940
let client_fut = Box::pin(async move {
9041
let sdk_config = match auth_options {
91-
BlobStoreS3AuthOptions::RuntimeConfigValues(config) => {
92-
aws_config::SdkConfig::builder()
93-
.credentials_provider(aws_sdk_s3::config::SharedCredentialsProvider::new(
94-
config,
95-
))
96-
.region(aws_config::Region::new(region_clone))
97-
.behavior_version(aws_config::BehaviorVersion::latest())
98-
.build()
99-
}
100-
BlobStoreS3AuthOptions::Environmental => {
42+
S3AuthOptions::AccessKey(config) => aws_config::SdkConfig::builder()
43+
.credentials_provider(aws_sdk_s3::config::SharedCredentialsProvider::new(
44+
config,
45+
))
46+
.region(aws_config::Region::new(region_clone))
47+
.behavior_version(aws_config::BehaviorVersion::latest())
48+
.build(),
49+
S3AuthOptions::Environmental => {
10150
aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await
10251
}
10352
};
@@ -113,7 +62,7 @@ impl BlobStoreS3 {
11362
}
11463

11564
#[async_trait]
116-
impl ContainerManager for BlobStoreS3 {
65+
impl ContainerManager for S3ContainerManager {
11766
async fn get(&self, name: &str) -> Result<Arc<dyn Container>, Error> {
11867
let name = self.bucket.clone().unwrap_or_else(|| name.to_owned());
11968

@@ -280,7 +229,7 @@ impl Container for S3Container {
280229
.bucket(&self.name)
281230
.into_paginator()
282231
.send();
283-
Ok(Box::new(S3BlobsList::new(stm)))
232+
Ok(Box::new(S3ObjectNames::new(stm)))
284233
}
285234
}
286235

@@ -311,154 +260,3 @@ impl S3Container {
311260
Ok(())
312261
}
313262
}
314-
315-
struct S3IncomingData {
316-
get_obj_resp: Option<get_object::GetObjectOutput>,
317-
}
318-
319-
impl S3IncomingData {
320-
fn new(get_obj_resp: get_object::GetObjectOutput) -> Self {
321-
Self {
322-
get_obj_resp: Some(get_obj_resp),
323-
}
324-
}
325-
326-
fn consume_async_impl(&mut self) -> wasmtime_wasi::pipe::AsyncReadStream {
327-
use futures::TryStreamExt;
328-
use tokio_util::compat::FuturesAsyncReadCompatExt;
329-
let stm = self.consume_as_stream();
330-
let ar = stm.into_async_read();
331-
let arr = ar.compat();
332-
wasmtime_wasi::pipe::AsyncReadStream::new(arr)
333-
}
334-
335-
fn consume_as_stream(
336-
&mut self,
337-
) -> impl futures::stream::Stream<Item = Result<Vec<u8>, std::io::Error>> {
338-
use futures::StreamExt;
339-
let rr = self
340-
.get_obj_resp
341-
.take()
342-
.expect("get object resp was already consumed");
343-
let ar = rr.body.into_async_read();
344-
let s = tokio_util::io::ReaderStream::new(ar);
345-
s.map(|by| by.map(|b| b.to_vec()))
346-
}
347-
}
348-
349-
struct S3BlobsList {
350-
stm: Mutex<
351-
PaginationStream<
352-
Result<
353-
list_objects_v2::ListObjectsV2Output,
354-
SdkError<list_objects_v2::ListObjectsV2Error, AwsHttpResponse>,
355-
>,
356-
>,
357-
>,
358-
read_but_not_yet_returned: Vec<String>,
359-
end_stm_after_read_but_not_yet_returned: bool,
360-
}
361-
362-
impl S3BlobsList {
363-
fn new(
364-
stm: PaginationStream<
365-
Result<
366-
list_objects_v2::ListObjectsV2Output,
367-
SdkError<list_objects_v2::ListObjectsV2Error, AwsHttpResponse>,
368-
>,
369-
>,
370-
) -> Self {
371-
Self {
372-
stm: Mutex::new(stm),
373-
read_but_not_yet_returned: Default::default(),
374-
end_stm_after_read_but_not_yet_returned: false,
375-
}
376-
}
377-
378-
async fn read_impl(&mut self, len: u64) -> anyhow::Result<(Vec<String>, bool)> {
379-
let len: usize = len.try_into().unwrap();
380-
381-
// If we have names outstanding, send that first. (We are allowed to send less than len,
382-
// and so sending all pending stuff before paging, rather than trying to manage a mix of
383-
// pending stuff with newly retrieved chunks, simplifies the code.)
384-
if !self.read_but_not_yet_returned.is_empty() {
385-
if self.read_but_not_yet_returned.len() <= len {
386-
// We are allowed to send all pending names
387-
let to_return = self.read_but_not_yet_returned.drain(..).collect();
388-
return Ok((to_return, self.end_stm_after_read_but_not_yet_returned));
389-
} else {
390-
// Send as much as we can. The rest remains in the pending buffer to send,
391-
// so this does not represent end of stream.
392-
let to_return = self.read_but_not_yet_returned.drain(0..len).collect();
393-
return Ok((to_return, false));
394-
}
395-
}
396-
397-
// Get one chunk and send as much as we can of it. Aagin, we don't need to try to
398-
// pack the full length here - we can send chunk by chunk.
399-
400-
let Some(chunk) = self.stm.get_mut().next().await else {
401-
return Ok((vec![], false));
402-
};
403-
let chunk = chunk.unwrap();
404-
405-
let at_end = chunk.continuation_token().is_none();
406-
let mut names: Vec<_> = chunk
407-
.contents
408-
.unwrap_or_default()
409-
.into_iter()
410-
.flat_map(|blob| blob.key)
411-
.collect();
412-
413-
if names.len() <= len {
414-
// We can send them all!
415-
Ok((names, at_end))
416-
} else {
417-
// We have more names than we can send in this response. Send what we can and
418-
// stash the rest.
419-
let to_return: Vec<_> = names.drain(0..len).collect();
420-
self.read_but_not_yet_returned = names;
421-
self.end_stm_after_read_but_not_yet_returned = at_end;
422-
Ok((to_return, false))
423-
}
424-
}
425-
}
426-
427-
#[async_trait]
428-
impl spin_factor_blobstore::IncomingData for S3IncomingData {
429-
async fn consume_sync(&mut self) -> anyhow::Result<Vec<u8>> {
430-
let Some(goo) = self.get_obj_resp.take() else {
431-
anyhow::bail!("oh no");
432-
};
433-
434-
Ok(goo.body.collect().await?.to_vec())
435-
}
436-
437-
fn consume_async(&mut self) -> wasmtime_wasi::pipe::AsyncReadStream {
438-
self.consume_async_impl()
439-
}
440-
441-
async fn size(&mut self) -> anyhow::Result<u64> {
442-
use anyhow::Context;
443-
let goo = self.get_obj_resp.as_ref().context("resp has been taken")?;
444-
Ok(goo
445-
.content_length()
446-
.context("content-length not returned")?
447-
.try_into()?)
448-
}
449-
}
450-
451-
#[async_trait]
452-
impl spin_factor_blobstore::ObjectNames for S3BlobsList {
453-
async fn read(&mut self, len: u64) -> anyhow::Result<(Vec<String>, bool)> {
454-
self.read_impl(len).await // Separate function because rust-analyser gives better intellisense when async_trait isn't in the picture!
455-
}
456-
457-
async fn skip(&mut self, num: u64) -> anyhow::Result<(u64, bool)> {
458-
// TODO: there is a question (raised as an issue on the repo) about the required behaviour
459-
// here. For now I assume that skipping fewer than `num` is allowed as long as we are
460-
// honest about it. Because it is easier that is why.
461-
let (skipped, at_end) = self.read_impl(num).await?;
462-
Ok((skipped.len().try_into().unwrap(), at_end))
463-
}
464-
}

0 commit comments

Comments
 (0)