Skip to content

Commit bf909cd

Browse files
committed
Add per bucket cache for ObjectStore
1 parent 5c64663 commit bf909cd

File tree

4 files changed

+152
-29
lines changed

4 files changed

+152
-29
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ log = "0.4.22"
5959
mime = "0.3.17"
6060
mockito = "1.5"
6161
object_store = "0.11.0"
62+
once_cell = "1.20.2"
6263
openssl = { version = "0.10.68", features = ["vendored"] }
6364
openssl-src = "=300.4.1" # joinked from https://github.com/iopsystems/rpc-perf/commit/705b290d2105af6f33150da04b217422c6d68701#diff-2e9d962a08321605940b5a657135052fbcef87b5e360662bb527c96d9a615542R41 to cross-compile Python
6465
parquet = { version = "52.2", default-features = false }

crates/core/Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ geoparquet-compression = [
3131
"parquet/lz4",
3232
"parquet/zstd",
3333
]
34-
object-store = ["dep:object_store", "dep:tokio"]
34+
object-store = ["dep:object_store", "dep:tokio", "dep:once_cell"]
3535
object-store-aws = ["object-store", "object_store/aws"]
3636
object-store-azure = ["object-store", "object_store/azure"]
3737
object-store-gcp = ["object-store", "object_store/gcp"]
@@ -62,6 +62,7 @@ geojson.workspace = true
6262
jsonschema = { workspace = true, optional = true }
6363
log.workspace = true
6464
object_store = { workspace = true, optional = true }
65+
once_cell = { workspace = true, optional = true }
6566
parquet = { workspace = true, optional = true }
6667
reqwest = { workspace = true, features = ["json", "blocking"], optional = true }
6768
serde = { workspace = true, features = ["derive"] }

crates/core/src/format.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,7 @@ impl Format {
157157
let href = href.into();
158158
match href.realize() {
159159
RealizedHref::Url(url) => {
160-
use object_store::ObjectStore;
161-
162-
let (object_store, path) = crate::parse_url_opts(&url, options)?;
160+
let (object_store, path) = crate::parse_url_opts(&url, options).await?;
163161
let get_result = object_store.get(&path).await?;
164162
let mut value: T = self.from_bytes(get_result.bytes().await?)?;
165163
*value.self_href_mut() = Some(Href::Url(url));

crates/core/src/object_store.rs

+148-25
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,45 @@
1-
use object_store::{
2-
local::LocalFileSystem, memory::InMemory, path::Path, DynObjectStore, ObjectStoreScheme,
3-
};
1+
use std::{collections::HashMap, sync::Arc};
2+
3+
use object_store::{local::LocalFileSystem, path::Path, DynObjectStore, ObjectStoreScheme};
4+
use once_cell::sync::Lazy;
5+
use tokio::sync::RwLock;
46
use url::Url;
57

6-
#[cfg(feature = "object-store")]
8+
static OBJECT_STORE_CACHE: Lazy<RwLock<HashMap<ObjectStoreIdentifier, Arc<DynObjectStore>>>> =
9+
Lazy::new(Default::default);
10+
11+
/// Parameter set to identify and cache an object Storage
12+
#[derive(PartialEq, Eq, Hash)]
13+
struct ObjectStoreIdentifier {
14+
/// A base url to the bucket.
15+
// should be enough to identify cloud provider and bucket
16+
base_url: Url,
17+
18+
/// Object Store options
19+
options: Vec<(String, String)>,
20+
}
21+
22+
impl ObjectStoreIdentifier {
23+
fn new<I, K, V>(base_url: Url, options: I) -> Self
24+
where
25+
I: IntoIterator<Item = (K, V)>,
26+
K: AsRef<str>,
27+
V: Into<String>,
28+
{
29+
Self {
30+
base_url,
31+
options: options
32+
.into_iter()
33+
.map(|(k, v)| (k.as_ref().into(), v.into()))
34+
.collect(),
35+
}
36+
}
37+
38+
fn get_options(&self) -> Vec<(String, String)> {
39+
self.options.to_owned()
40+
}
41+
}
42+
743
macro_rules! builder_env_opts {
844
($builder:ty, $url:expr, $options:expr) => {{
945
let builder = $options.into_iter().fold(
@@ -13,54 +49,45 @@ macro_rules! builder_env_opts {
1349
Err(_) => builder,
1450
},
1551
);
16-
Box::new(builder.build()?)
52+
Arc::new(builder.build()?)
1753
}};
1854
}
1955

20-
/// Modified version of object_store::parse_url_opts that also parses env
21-
///
22-
/// It does the same, except we start from env vars, then apply url and then overrides from options
23-
///
24-
/// This is POC. To improve on this idea, maybe it's good to "cache" a box with dynamic ObjectStore for each bucket we access, since ObjectStore have some logic inside tied to a bucket level, like connection pooling, credential caching
25-
pub fn parse_url_opts<I, K, V>(
56+
fn create_object_store<I, K, V>(
57+
scheme: ObjectStoreScheme,
2658
url: &Url,
2759
options: I,
28-
) -> Result<(Box<DynObjectStore>, Path), object_store::Error>
60+
) -> Result<Arc<DynObjectStore>, object_store::Error>
2961
where
3062
I: IntoIterator<Item = (K, V)>,
3163
K: AsRef<str>,
3264
V: Into<String>,
3365
{
34-
let _options = options;
35-
let (scheme, path) = ObjectStoreScheme::parse(url)?;
36-
let path = Path::parse(path)?;
37-
38-
let store: Box<DynObjectStore> = match scheme {
39-
ObjectStoreScheme::Local => Box::new(LocalFileSystem::new()),
40-
ObjectStoreScheme::Memory => Box::new(InMemory::new()),
66+
let store: Arc<DynObjectStore> = match scheme {
67+
ObjectStoreScheme::Local => Arc::new(LocalFileSystem::new()),
4168
#[cfg(feature = "object-store-aws")]
4269
ObjectStoreScheme::AmazonS3 => {
43-
builder_env_opts!(object_store::aws::AmazonS3Builder, url, _options)
70+
builder_env_opts!(object_store::aws::AmazonS3Builder, url, options)
4471
}
4572
#[cfg(feature = "object-store-gcp")]
4673
ObjectStoreScheme::GoogleCloudStorage => {
47-
builder_env_opts!(object_store::gcp::GoogleCloudStorageBuilder, url, _options)
74+
builder_env_opts!(object_store::gcp::GoogleCloudStorageBuilder, url, options)
4875
}
4976
#[cfg(feature = "object-store-azure")]
5077
ObjectStoreScheme::MicrosoftAzure => {
51-
builder_env_opts!(object_store::azure::MicrosoftAzureBuilder, url, _options)
78+
builder_env_opts!(object_store::azure::MicrosoftAzureBuilder, url, options)
5279
}
5380
#[cfg(feature = "object-store-http")]
5481
ObjectStoreScheme::Http => {
5582
let url = &url[..url::Position::BeforePath];
56-
let builder = _options.into_iter().fold(
83+
let builder = options.into_iter().fold(
5784
object_store::http::HttpBuilder::new().with_url(url.to_string()),
5885
|builder, (key, value)| match key.as_ref().parse() {
5986
Ok(k) => builder.with_config(k, value),
6087
Err(_) => builder,
6188
},
6289
);
63-
Box::new(builder.build()?)
90+
Arc::new(builder.build()?)
6491
}
6592
s => {
6693
return Err(object_store::Error::Generic {
@@ -69,5 +96,101 @@ where
6996
})
7097
}
7198
};
72-
Ok((store, path))
99+
Ok(store)
100+
}
101+
102+
/// Modified version of object_store::parse_url_opts that also parses env
103+
///
104+
/// It does the same, except we start from env vars, then apply url and then overrides from options
105+
///
106+
/// This is POC. To improve on this idea, maybe it's good to "cache" a box with dynamic ObjectStore for each bucket we access, since ObjectStore have some logic inside tied to a bucket level, like connection pooling, credential caching
107+
pub async fn parse_url_opts<I, K, V>(
108+
url: &Url,
109+
options: I,
110+
) -> Result<(Arc<DynObjectStore>, Path), crate::Error>
111+
where
112+
I: IntoIterator<Item = (K, V)>,
113+
K: AsRef<str>,
114+
V: Into<String>,
115+
{
116+
// TODO: Handle error properly
117+
let (scheme, path) = ObjectStoreScheme::parse(url).unwrap();
118+
119+
let path_string: String = path.clone().into();
120+
let path_str = path_string.as_str();
121+
// TODO: Handle error properly
122+
let base_url = url[..]
123+
.strip_suffix(path_str)
124+
.unwrap_or_default()
125+
.try_into()
126+
.unwrap();
127+
128+
let object_store_id = ObjectStoreIdentifier::new(base_url, options);
129+
let options = object_store_id.get_options();
130+
131+
{
132+
let cache = OBJECT_STORE_CACHE.read().await;
133+
if let Some(store) = cache.get(&object_store_id) {
134+
return Ok((store.clone(), path));
135+
}
136+
}
137+
138+
let store = create_object_store(scheme, url, options)?;
139+
{
140+
let mut cache = OBJECT_STORE_CACHE.write().await;
141+
142+
// TODO: Do we need this cache clean? What is a reasonable cache size here?
143+
if cache.len() >= 8 {
144+
cache.clear()
145+
}
146+
_ = cache.insert(object_store_id, store.clone());
147+
}
148+
149+
Ok((store.clone(), path))
150+
}
151+
152+
#[cfg(test)]
153+
mod tests {
154+
use url::Url;
155+
156+
use super::*;
157+
158+
#[tokio::test]
159+
async fn cache_works() {
160+
let url = Url::parse("s3://bucket/item").unwrap();
161+
let options: Vec<(String, String)> = Vec::new();
162+
163+
let (store1, _path) = parse_url_opts(&url, options.clone()).await.unwrap();
164+
165+
let url2 = Url::parse("s3://bucket/item2").unwrap();
166+
let (store2, _path) = parse_url_opts(&url2, options.clone()).await.unwrap();
167+
168+
assert!(Arc::ptr_eq(&store1, &store2));
169+
}
170+
#[tokio::test]
171+
async fn different_options() {
172+
let url = Url::parse("s3://bucket/item").unwrap();
173+
let options: Vec<(String, String)> = Vec::new();
174+
175+
let (store, _path) = parse_url_opts(&url, options).await.unwrap();
176+
177+
let url2 = Url::parse("s3://bucket/item2").unwrap();
178+
let options2: Vec<(String, String)> = vec![(String::from("some"), String::from("option"))];
179+
let (store2, _path) = parse_url_opts(&url2, options2).await.unwrap();
180+
181+
assert!(!Arc::ptr_eq(&store, &store2));
182+
}
183+
#[tokio::test]
184+
async fn different_urls() {
185+
let url = Url::parse("s3://bucket/item").unwrap();
186+
let options: Vec<(String, String)> = Vec::new();
187+
188+
let (store, _path) = parse_url_opts(&url, options.clone()).await.unwrap();
189+
190+
let url2 = Url::parse("s3://other-bucket/item").unwrap();
191+
// let options2: Vec<(String, String)> = vec![(String::from("some"), String::from("option"))];
192+
let (store2, _path) = parse_url_opts(&url2, options).await.unwrap();
193+
194+
assert!(!Arc::ptr_eq(&store, &store2));
195+
}
73196
}

0 commit comments

Comments
 (0)