Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Implement list with deleted for s3 service #5498

Merged
merged 4 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions core/src/raw/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ pub struct OpList {
///
/// Default to `false`
versions: bool,
/// The deleted is used to control whether the deleted objects should be returned.
///
/// - If `false`, list operation will not return with deleted objects
/// - If `true`, list operation will return with deleted objects if object versioning is supported
/// by the underlying service
///
/// Default to `false`
deleted: bool,
}

impl Default for OpList {
Expand All @@ -122,6 +130,7 @@ impl Default for OpList {
recursive: false,
concurrent: 1,
versions: false,
deleted: false,
}
}
}
Expand Down Expand Up @@ -206,6 +215,17 @@ impl OpList {
pub fn versions(&self) -> bool {
self.versions
}

/// Change the deleted of this list operation
pub fn with_deleted(mut self, deleted: bool) -> Self {
self.deleted = deleted;
self
}

/// Get the deleted of this list operation
pub fn deleted(&self) -> bool {
self.deleted
}
}

/// Args for `presign` operation.
Expand Down
11 changes: 4 additions & 7 deletions core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,7 @@ impl Access for S3Backend {
list_with_start_after: true,
list_with_recursive: true,
list_with_versions: self.core.enable_versioning,
list_with_deleted: self.core.enable_versioning,
list_has_etag: true,
list_has_content_md5: true,
list_has_content_length: true,
Expand Down Expand Up @@ -1060,21 +1061,17 @@ impl Access for S3Backend {
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let l = if args.versions() {
let l = if args.versions() || args.deleted() {
TwoWays::Two(PageLister::new(S3ObjectVersionsLister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
args,
)))
} else {
TwoWays::One(PageLister::new(S3Lister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
args,
)))
};

Expand Down
31 changes: 30 additions & 1 deletion core/src/services/s3/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,6 @@ impl S3Core {
write!(url, "&max-keys={limit}").expect("write into string must succeed");
}
if let Some(start_after) = start_after {
let start_after = build_abs_path(&self.root, &start_after);
write!(url, "&start-after={}", percent_encode_path(&start_after))
.expect("write into string must succeed");
}
Expand Down Expand Up @@ -994,6 +993,7 @@ pub struct ListObjectVersionsOutput {
pub next_version_id_marker: Option<String>,
pub common_prefixes: Vec<OutputCommonPrefix>,
pub version: Vec<ListObjectVersionsOutputVersion>,
pub delete_marker: Vec<ListObjectVersionsOutputDeleteMarker>,
}

#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
Expand All @@ -1008,6 +1008,15 @@ pub struct ListObjectVersionsOutputVersion {
pub etag: Option<String>,
}

#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ListObjectVersionsOutputDeleteMarker {
pub key: String,
pub version_id: String,
pub is_latest: bool,
pub last_modified: String,
}

pub enum ChecksumAlgorithm {
Crc32c,
}
Expand Down Expand Up @@ -1284,6 +1293,16 @@ mod tests {
<CommonPrefixes>
<Prefix>videos/</Prefix>
</CommonPrefixes>
<DeleteMarker>
<Key>my-third-image.jpg</Key>
<VersionId>03jpff543dhffds434rfdsFDN943fdsFkdmqnh892</VersionId>
<IsLatest>true</IsLatest>
<LastModified>2009-10-15T17:50:30.000Z</LastModified>
<Owner>
<ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
<DisplayName>[email protected]</DisplayName>
</Owner>
</DeleteMarker>
</ListVersionsResult>"#,
);

Expand Down Expand Up @@ -1329,5 +1348,15 @@ mod tests {
}
]
);

assert_eq!(
output.delete_marker,
vec![ListObjectVersionsOutputDeleteMarker {
key: "my-third-image.jpg".to_owned(),
version_id: "03jpff543dhffds434rfdsFDN943fdsFkdmqnh892".to_owned(),
is_latest: true,
last_modified: "2009-10-15T17:50:30.000Z".to_owned(),
},]
);
}
}
117 changes: 65 additions & 52 deletions core/src/services/s3/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,26 @@ pub struct S3Lister {
core: Arc<S3Core>,

path: String,
delimiter: &'static str,
limit: Option<usize>,
args: OpList,

/// Amazon S3 starts listing **after** this specified key
start_after: Option<String>,
delimiter: &'static str,
abs_start_after: Option<String>,
}

impl S3Lister {
pub fn new(
core: Arc<S3Core>,
path: &str,
recursive: bool,
limit: Option<usize>,
start_after: Option<&str>,
) -> Self {
let delimiter = if recursive { "" } else { "/" };
pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
let delimiter = if args.recursive() { "" } else { "/" };
let abs_start_after = args
.start_after()
.map(|start_after| build_abs_path(&core.root, start_after));

Self {
core,

path: path.to_string(),
args,
delimiter,
limit,
start_after: start_after.map(String::from),
abs_start_after,
}
}
}
Expand All @@ -70,10 +67,10 @@ impl oio::PageList for S3Lister {
&self.path,
&ctx.token,
self.delimiter,
self.limit,
self.args.limit(),
// start after should only be set for the first page.
if ctx.token.is_empty() {
self.start_after.clone()
self.abs_start_after.clone()
} else {
None
},
Expand Down Expand Up @@ -143,35 +140,29 @@ impl oio::PageList for S3Lister {
}
}

// refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html
/// refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html
pub struct S3ObjectVersionsLister {
core: Arc<S3Core>,

prefix: String,
args: OpList,

delimiter: &'static str,
limit: Option<usize>,
start_after: String,
abs_start_after: String,
abs_start_after: Option<String>,
}

impl S3ObjectVersionsLister {
pub fn new(
core: Arc<S3Core>,
path: &str,
recursive: bool,
limit: Option<usize>,
start_after: Option<&str>,
) -> Self {
let delimiter = if recursive { "" } else { "/" };
let start_after = start_after.unwrap_or_default().to_owned();
let abs_start_after = build_abs_path(core.root.as_str(), start_after.as_str());
pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
let delimiter = if args.recursive() { "" } else { "/" };
let abs_start_after = args
.start_after()
.map(|start_after| build_abs_path(&core.root, start_after));

Self {
core,
prefix: path.to_string(),
args,
delimiter,
limit,
start_after,
abs_start_after,
}
}
Expand All @@ -182,8 +173,8 @@ impl oio::PageList for S3ObjectVersionsLister {
let markers = ctx.token.rsplit_once(" ");
let (key_marker, version_id_marker) = if let Some(data) = markers {
data
} else if !self.start_after.is_empty() {
(self.abs_start_after.as_str(), "")
} else if let Some(start_after) = &self.abs_start_after {
(start_after.as_str(), "")
} else {
("", "")
};
Expand All @@ -193,7 +184,7 @@ impl oio::PageList for S3ObjectVersionsLister {
.s3_list_object_versions(
&self.prefix,
self.delimiter,
self.limit,
self.args.limit(),
key_marker,
version_id_marker,
)
Expand Down Expand Up @@ -231,26 +222,48 @@ impl oio::PageList for S3ObjectVersionsLister {
ctx.entries.push_back(de);
}

for version_object in output.version {
let mut path = build_rel_path(&self.core.root, &version_object.key);
if path.is_empty() {
path = "/".to_owned();
if self.args.versions() {
for version_object in output.version {
let mut path = build_rel_path(&self.core.root, &version_object.key);
if path.is_empty() {
path = "/".to_owned();
}

let mut meta = Metadata::new(EntryMode::from_path(&path));
meta.set_version(&version_object.version_id);
meta.set_is_current(version_object.is_latest);
meta.set_content_length(version_object.size);
meta.set_last_modified(parse_datetime_from_rfc3339(
version_object.last_modified.as_str(),
)?);
if let Some(etag) = version_object.etag {
meta.set_etag(&etag);
meta.set_content_md5(etag.trim_matches('"'));
}

let entry = oio::Entry::new(&path, meta);
ctx.entries.push_back(entry);
}
}

let mut meta = Metadata::new(EntryMode::from_path(&path));
meta.set_version(&version_object.version_id);
meta.set_is_current(version_object.is_latest);
meta.set_content_length(version_object.size);
meta.set_last_modified(parse_datetime_from_rfc3339(
version_object.last_modified.as_str(),
)?);
if let Some(etag) = version_object.etag {
meta.set_etag(&etag);
meta.set_content_md5(etag.trim_matches('"'));
if self.args.deleted() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did I misunderstand something ? this implementation seems to differ from the RFC description:

Please note that `deleted` here means "including deleted files" rather than "only deleted files." Therefore, `list_with(path).deleted(true)` will list both current files and deleted ones.

for delete_marker in output.delete_marker {
let mut path = build_rel_path(&self.core.root, &delete_marker.key);
if path.is_empty() {
path = "/".to_owned();
}

let mut meta = Metadata::new(EntryMode::FILE);
meta.set_version(&delete_marker.version_id);
meta.set_is_deleted(true);
meta.set_is_current(delete_marker.is_latest);
meta.set_last_modified(parse_datetime_from_rfc3339(
delete_marker.last_modified.as_str(),
)?);

let entry = oio::Entry::new(&path, meta);
ctx.entries.push_back(entry);
}

let entry = oio::Entry::new(&path, meta);
ctx.entries.push_back(entry);
}

Ok(())
Expand Down
4 changes: 3 additions & 1 deletion core/src/types/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,10 @@ pub struct Capability {
/// Indicates if versions listing is supported.
#[deprecated(since = "0.51.1", note = "use with_versions instead")]
pub list_with_version: bool,
/// Indicates if versions listing is supported.
/// Indicates if listing with versions included is supported.
pub list_with_versions: bool,
/// Indicates if listing with deleted files included is supported.
pub list_with_deleted: bool,
/// Indicates whether cache control information is available in list response
pub list_has_cache_control: bool,
/// Indicates whether content disposition information is available in list response
Expand Down
Loading
Loading