Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(integration/object_store): object_store requires metadata in list #5501

Merged
merged 2 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 50 additions & 23 deletions integrations/object_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,32 +323,52 @@ impl ObjectStore for OpendalStore {
let offset = offset.clone();

let fut = async move {
let fut = if self.inner.info().full_capability().list_with_start_after {
self.inner
.lister_with(&path)
.start_after(offset.as_ref())
.recursive(true)
.into_future()
.into_send()
.await
.map_err(|err| format_object_store_error(err, &path))?
.then(try_format_object_meta)
.into_send()
.boxed()
let list_with_start_after = self.inner.info().full_capability().list_with_start_after;
let mut fut = self.inner.lister_with(&path).recursive(true);

// Use native start_after support if possible.
if list_with_start_after {
fut = fut.start_after(offset.as_ref());
}

let lister = fut
.await
.map_err(|err| format_object_store_error(err, &path))?
.then(move |entry| {
let path = path.clone();

async move {
let entry = entry.map_err(|err| format_object_store_error(err, &path))?;
let (path, metadata) = entry.into_parts();

// If it's a dir or last_modified is present, we can use it directly.
if metadata.is_dir() || metadata.last_modified().is_some() {
let object_meta = format_object_meta(&path, &metadata);
return Ok(object_meta);
}

let metadata = self
.inner
.stat(&path)
.await
.map_err(|err| format_object_store_error(err, &path))?;
let object_meta = format_object_meta(&path, &metadata);
Ok::<_, object_store::Error>(object_meta)
}
})
.into_send()
.boxed();

let stream = if list_with_start_after {
lister
} else {
self.inner
.lister_with(&path)
.recursive(true)
.into_future()
.into_send()
.await
.map_err(|err| format_object_store_error(err, &path))?
.try_filter(move |entry| futures::future::ready(entry.path() > offset.as_ref()))
.then(try_format_object_meta)
lister
.try_filter(move |entry| futures::future::ready(entry.location > offset))
.into_send()
.boxed()
};
Ok::<_, object_store::Error>(fut)

Ok::<_, object_store::Error>(stream)
};

fut.into_stream().into_send().try_flatten().boxed()
Expand All @@ -374,8 +394,15 @@ impl ObjectStore for OpendalStore {

if meta.is_dir() {
common_prefixes.push(entry.path().into());
} else {
} else if meta.last_modified().is_some() {
objects.push(format_object_meta(entry.path(), meta));
} else {
let meta = self
.inner
.stat(entry.path())
.await
.map_err(|err| format_object_store_error(err, entry.path()))?;
objects.push(format_object_meta(entry.path(), &meta));
}
}

Expand Down
12 changes: 1 addition & 11 deletions integrations/object_store/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use futures::Stream;
use object_store::ObjectMeta;
use opendal::{Entry, Metadata};
use opendal::Metadata;
use std::future::IntoFuture;

/// Conditionally add the `Send` marker trait for the wrapped type.
Expand Down Expand Up @@ -60,16 +60,6 @@ pub fn format_object_meta(path: &str, meta: &Metadata) -> ObjectMeta {
}
}

/// Try to format `opendal::Entry` to `object_store::ObjectMeta`.
pub async fn try_format_object_meta(
res: object_store::Result<Entry, opendal::Error>,
) -> object_store::Result<ObjectMeta> {
let entry = res.map_err(|err| format_object_store_error(err, ""))?;
let meta = entry.metadata();

Ok(format_object_meta(entry.path(), meta))
}

/// Make given future `Send`.
pub trait IntoSendFuture {
type Output;
Expand Down
Loading