Skip to content

Commit

Permalink
fix(integration/object_store): object_store requires metadata in list (
Browse files Browse the repository at this point in the history
  • Loading branch information
Xuanwo authored Jan 3, 2025
1 parent 0146a12 commit f7f9990
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 34 deletions.
73 changes: 50 additions & 23 deletions integrations/object_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,32 +323,52 @@ impl ObjectStore for OpendalStore {
let offset = offset.clone();

let fut = async move {
let fut = if self.inner.info().full_capability().list_with_start_after {
self.inner
.lister_with(&path)
.start_after(offset.as_ref())
.recursive(true)
.into_future()
.into_send()
.await
.map_err(|err| format_object_store_error(err, &path))?
.then(try_format_object_meta)
.into_send()
.boxed()
let list_with_start_after = self.inner.info().full_capability().list_with_start_after;
let mut fut = self.inner.lister_with(&path).recursive(true);

// Use native start_after support if possible.
if list_with_start_after {
fut = fut.start_after(offset.as_ref());
}

let lister = fut
.await
.map_err(|err| format_object_store_error(err, &path))?
.then(move |entry| {
let path = path.clone();

async move {
let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
let (path, metadata) = entry.into_parts();

// If it's a dir or last_modified is present, we can use it directly.
if metadata.is_dir() || metadata.last_modified().is_some() {
let object_meta = format_object_meta(&path, &metadata);
return Ok(object_meta);
}

let metadata = self
.inner
.stat(&path)
.await
.map_err(|err| format_object_store_error(err, &path))?;
let object_meta = format_object_meta(&path, &metadata);
Ok::<_, object_store::Error>(object_meta)
}
})
.into_send()
.boxed();

let stream = if list_with_start_after {
lister
} else {
self.inner
.lister_with(&path)
.recursive(true)
.into_future()
.into_send()
.await
.map_err(|err| format_object_store_error(err, &path))?
.try_filter(move |entry| futures::future::ready(entry.path() > offset.as_ref()))
.then(try_format_object_meta)
lister
.try_filter(move |entry| futures::future::ready(entry.location > offset))
.into_send()
.boxed()
};
Ok::<_, object_store::Error>(fut)

Ok::<_, object_store::Error>(stream)
};

fut.into_stream().into_send().try_flatten().boxed()
Expand All @@ -374,8 +394,15 @@ impl ObjectStore for OpendalStore {

if meta.is_dir() {
common_prefixes.push(entry.path().into());
} else {
} else if meta.last_modified().is_some() {
objects.push(format_object_meta(entry.path(), meta));
} else {
let meta = self
.inner
.stat(entry.path())
.await
.map_err(|err| format_object_store_error(err, entry.path()))?;
objects.push(format_object_meta(entry.path(), &meta));
}
}

Expand Down
12 changes: 1 addition & 11 deletions integrations/object_store/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use futures::Stream;
use object_store::ObjectMeta;
use opendal::{Entry, Metadata};
use opendal::Metadata;
use std::future::IntoFuture;

/// Conditionally add the `Send` marker trait for the wrapped type.
Expand Down Expand Up @@ -60,16 +60,6 @@ pub fn format_object_meta(path: &str, meta: &Metadata) -> ObjectMeta {
}
}

/// Try to format `opendal::Entry` to `object_store::ObjectMeta`.
pub async fn try_format_object_meta(
res: object_store::Result<Entry, opendal::Error>,
) -> object_store::Result<ObjectMeta> {
let entry = res.map_err(|err| format_object_store_error(err, ""))?;
let meta = entry.metadata();

Ok(format_object_meta(entry.path(), meta))
}

/// Make given future `Send`.
pub trait IntoSendFuture {
type Output;
Expand Down

0 comments on commit f7f9990

Please sign in to comment.