diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 24f441f36c4..f0f019100c4 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -983,6 +983,7 @@ impl Access for S3Backend { list_with_start_after: true, list_with_recursive: true, list_with_versions: self.core.enable_versioning, + list_with_deleted: self.core.enable_versioning, list_has_etag: true, list_has_content_md5: true, list_has_content_length: true, @@ -1060,21 +1061,17 @@ impl Access for S3Backend { } async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - let l = if args.versions() { + let l = if args.versions() || args.deleted() { TwoWays::Two(PageLister::new(S3ObjectVersionsLister::new( self.core.clone(), path, - args.recursive(), - args.limit(), - args.start_after(), + args, ))) } else { TwoWays::One(PageLister::new(S3Lister::new( self.core.clone(), path, - args.recursive(), - args.limit(), - args.start_after(), + args, ))) }; diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index 2f9a359c770..f8140c5e230 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -612,7 +612,6 @@ impl S3Core { write!(url, "&max-keys={limit}").expect("write into string must succeed"); } if let Some(start_after) = start_after { - let start_after = build_abs_path(&self.root, &start_after); write!(url, "&start-after={}", percent_encode_path(&start_after)) .expect("write into string must succeed"); } @@ -994,6 +993,7 @@ pub struct ListObjectVersionsOutput { pub next_version_id_marker: Option, pub common_prefixes: Vec, pub version: Vec, + pub delete_marker: Vec, } #[derive(Default, Debug, Eq, PartialEq, Deserialize)] @@ -1008,6 +1008,15 @@ pub struct ListObjectVersionsOutputVersion { pub etag: Option, } +#[derive(Default, Debug, Eq, PartialEq, Deserialize)] +#[serde(rename_all = "PascalCase")] +pub struct ListObjectVersionsOutputDeleteMarker { + pub key: String, + pub version_id: String, + pub is_latest: bool, + pub last_modified: String, +} + pub enum ChecksumAlgorithm { Crc32c, } @@ -1284,6 +1293,16 @@ mod tests { videos/ + + my-third-image.jpg + 03jpff543dhffds434rfdsFDN943fdsFkdmqnh892 + true + 2009-10-15T17:50:30.000Z + + 75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a + mtd@amazon.com + + "#, ); @@ -1329,5 +1348,15 @@ mod tests { } ] ); + + assert_eq!( + output.delete_marker, + vec![ListObjectVersionsOutputDeleteMarker { + key: "my-third-image.jpg".to_owned(), + version_id: "03jpff543dhffds434rfdsFDN943fdsFkdmqnh892".to_owned(), + is_latest: true, + last_modified: "2009-10-15T17:50:30.000Z".to_owned(), + },] + ); } } diff --git a/core/src/services/s3/lister.rs b/core/src/services/s3/lister.rs index fb27f23ee5b..361658265ef 100644 --- a/core/src/services/s3/lister.rs +++ b/core/src/services/s3/lister.rs @@ -35,29 +35,26 @@ pub struct S3Lister { core: Arc, path: String, - delimiter: &'static str, - limit: Option, + args: OpList, - /// Amazon S3 starts listing **after** this specified key - start_after: Option, + delimiter: &'static str, + abs_start_after: Option, } impl S3Lister { - pub fn new( - core: Arc, - path: &str, - recursive: bool, - limit: Option, - start_after: Option<&str>, - ) -> Self { - let delimiter = if recursive { "" } else { "/" }; + pub fn new(core: Arc, path: &str, args: OpList) -> Self { + let delimiter = if args.recursive() { "" } else { "/" }; + let abs_start_after = args + .start_after() + .map(|start_after| build_abs_path(&core.root, start_after)); + Self { core, path: path.to_string(), + args, delimiter, - limit, - start_after: start_after.map(String::from), + abs_start_after, } } } @@ -70,10 +67,10 @@ impl oio::PageList for S3Lister { &self.path, &ctx.token, self.delimiter, - self.limit, + self.args.limit(), // start after should only be set for the first page. if ctx.token.is_empty() { - self.start_after.clone() + self.abs_start_after.clone() } else { None }, @@ -143,35 +140,29 @@ impl oio::PageList for S3Lister { } } -// refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html +/// refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html pub struct S3ObjectVersionsLister { core: Arc, prefix: String, + args: OpList, + delimiter: &'static str, - limit: Option, - start_after: String, - abs_start_after: String, + abs_start_after: Option, } impl S3ObjectVersionsLister { - pub fn new( - core: Arc, - path: &str, - recursive: bool, - limit: Option, - start_after: Option<&str>, - ) -> Self { - let delimiter = if recursive { "" } else { "/" }; - let start_after = start_after.unwrap_or_default().to_owned(); - let abs_start_after = build_abs_path(core.root.as_str(), start_after.as_str()); + pub fn new(core: Arc, path: &str, args: OpList) -> Self { + let delimiter = if args.recursive() { "" } else { "/" }; + let abs_start_after = args + .start_after() + .map(|start_after| build_abs_path(&core.root, start_after)); Self { core, prefix: path.to_string(), + args, delimiter, - limit, - start_after, abs_start_after, } } @@ -182,8 +173,8 @@ impl oio::PageList for S3ObjectVersionsLister { let markers = ctx.token.rsplit_once(" "); let (key_marker, version_id_marker) = if let Some(data) = markers { data - } else if !self.start_after.is_empty() { - (self.abs_start_after.as_str(), "") + } else if let Some(start_after) = &self.abs_start_after { + (start_after.as_str(), "") } else { ("", "") }; @@ -193,7 +184,7 @@ impl oio::PageList for S3ObjectVersionsLister { .s3_list_object_versions( &self.prefix, self.delimiter, - self.limit, + self.args.limit(), key_marker, version_id_marker, ) @@ -231,26 +222,47 @@ impl oio::PageList for S3ObjectVersionsLister { ctx.entries.push_back(de); } - for version_object in output.version { - let mut path = build_rel_path(&self.core.root, &version_object.key); - if path.is_empty() { - path = "/".to_owned(); + if self.args.versions() { + for version_object in output.version { + let mut path = build_rel_path(&self.core.root, &version_object.key); + if path.is_empty() { + path = "/".to_owned(); + } + + let mut meta = Metadata::new(EntryMode::from_path(&path)); + meta.set_version(&version_object.version_id); + meta.set_is_current(version_object.is_latest); + meta.set_content_length(version_object.size); + meta.set_last_modified(parse_datetime_from_rfc3339( + version_object.last_modified.as_str(), + )?); + if let Some(etag) = version_object.etag { + meta.set_etag(&etag); + meta.set_content_md5(etag.trim_matches('"')); + } + + let entry = oio::Entry::new(&path, meta); + ctx.entries.push_back(entry); } + } - let mut meta = Metadata::new(EntryMode::from_path(&path)); - meta.set_version(&version_object.version_id); - meta.set_is_current(version_object.is_latest); - meta.set_content_length(version_object.size); - meta.set_last_modified(parse_datetime_from_rfc3339( - version_object.last_modified.as_str(), - )?); - if let Some(etag) = version_object.etag { - meta.set_etag(&etag); - meta.set_content_md5(etag.trim_matches('"')); + if self.args.deleted() { + for delete_marker in output.delete_marker { + let mut path = build_rel_path(&self.core.root, &delete_marker.key); + if path.is_empty() { + path = "/".to_owned(); + } + + let mut meta = Metadata::new(EntryMode::FILE); + meta.set_version(&delete_marker.version_id); + meta.set_is_current(delete_marker.is_latest); + meta.set_last_modified(parse_datetime_from_rfc3339( + delete_marker.last_modified.as_str(), + )?); + + let entry = oio::Entry::new(&path, meta); + ctx.entries.push_back(entry); } - - let entry = oio::Entry::new(&path, meta); - ctx.entries.push_back(entry); } Ok(()) diff --git a/core/src/types/capability.rs b/core/src/types/capability.rs index 2c4d3faf402..c5165a6a586 100644 --- a/core/src/types/capability.rs +++ b/core/src/types/capability.rs @@ -177,8 +177,10 @@ pub struct Capability { /// Indicates if versions listing is supported. #[deprecated(since = "0.51.1", note = "use with_versions instead")] pub list_with_version: bool, - /// Indicates if versions listing is supported. + /// Indicates if listing with versions included is supported. pub list_with_versions: bool, + /// Indicates if listing with deleted files included is supported. + pub list_with_deleted: bool, /// Indicates whether cache control information is available in list response pub list_has_cache_control: bool, /// Indicates whether content disposition information is available in list response diff --git a/core/src/types/metadata.rs b/core/src/types/metadata.rs index 6f02484798b..3bb0ab7149a 100644 --- a/core/src/types/metadata.rs +++ b/core/src/types/metadata.rs @@ -217,6 +217,10 @@ impl Metadata { /// `Content-Length` is defined by [RFC 7230](https://httpwg.org/specs/rfc7230.html#header.content-length) /// /// Refer to [MDN Content-Length](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Length) for more information. + /// + /// # Returns + /// + /// Content length of this entry. It will be `0` if the content length is not set by the storage services. pub fn content_length(&self) -> u64 { self.content_length.unwrap_or_default() } diff --git a/core/src/types/operator/operator_futures.rs b/core/src/types/operator/operator_futures.rs index f3dceb2d67b..478025a3065 100644 --- a/core/src/types/operator/operator_futures.rs +++ b/core/src/types/operator/operator_futures.rs @@ -501,16 +501,30 @@ impl>>> FutureList { self.map(|args| args.with_versions(v)) } - /// The version is used to control whether the object versions should be returned. + /// Controls whether the `list` operation should return file versions. /// - /// - If `false`, list operation will not return with object versions - /// - If `true`, list operation will return with object versions if object versioning is supported - /// by the underlying service + /// This function allows you to specify if the `list` operation, when executed, should include + /// information about different versions of files, if versioning is supported and enabled. + /// + /// If `true`, subsequent `list` operations will include version information for each file. + /// If `false`, version information will be omitted from the `list` results. /// /// Default to `false` pub fn versions(self, v: bool) -> Self { self.map(|args| args.with_versions(v)) } + + /// Controls whether the `list` operation should include deleted files (or versions). + /// + /// This function allows you to specify if the `list` operation, when executed, should include + /// entries for files or versions that have been marked as deleted. This is particularly relevant + /// in object storage systems that support soft deletion or versioning. + /// + /// If `true`, subsequent `list` operations will include deleted files or versions. + /// If `false`, deleted files or versions will be excluded from the `list` results. + pub fn deleted(self, v: bool) -> Self { + self.map(|args| args.with_deleted(v)) + } } /// Future that generated by [`Operator::list_with`] or [`Operator::lister_with`]. @@ -555,14 +569,28 @@ impl>> FutureLister { self.map(|args| args.with_versions(v)) } - /// The version is used to control whether the object versions should be returned. + /// Controls whether the `list` operation should return file versions. /// - /// - If `false`, list operation will not return with object versions - /// - If `true`, list operation will return with object versions if object versioning is supported - /// by the underlying service + /// This function allows you to specify if the `list` operation, when executed, should include + /// information about different versions of files, if versioning is supported and enabled. + /// + /// If `true`, subsequent `list` operations will include version information for each file. + /// If `false`, version information will be omitted from the `list` results. /// /// Default to `false` pub fn versions(self, v: bool) -> Self { self.map(|args| args.with_versions(v)) } + + /// Controls whether the `list` operation should include deleted files (or versions). + /// + /// This function allows you to specify if the `list` operation, when executed, should include + /// entries for files or versions that have been marked as deleted. This is particularly relevant + /// in object storage systems that support soft deletion or versioning. + /// + /// If `true`, subsequent `list` operations will include deleted files or versions. + /// If `false`, deleted files or versions will be excluded from the `list` results. + pub fn deleted(self, v: bool) -> Self { + self.map(|args| args.with_deleted(v)) + } } diff --git a/core/tests/behavior/async_list.rs b/core/tests/behavior/async_list.rs index 6a0aa2ffb5e..854a071baab 100644 --- a/core/tests/behavior/async_list.rs +++ b/core/tests/behavior/async_list.rs @@ -49,7 +49,8 @@ pub fn tests(op: &Operator, tests: &mut Vec) { test_remove_all, test_list_files_with_versions, test_list_with_versions_and_limit, - test_list_with_versions_and_start_after + test_list_with_versions_and_start_after, + test_list_files_with_deleted )) } @@ -595,15 +596,33 @@ pub async fn test_list_files_with_versions(op: Operator) -> Result<()> { assert_eq!(de.name(), file_name); let meta = de.metadata(); assert_eq!(meta.mode(), EntryMode::FILE); + } + + Ok(()) +} - // just ensure they don't panic - let _ = meta.content_length(); - let _ = meta.version(); - let _ = meta.last_modified(); - let _ = meta.etag(); - let _ = meta.content_md5(); +pub async fn test_list_files_with_deleted(op: Operator) -> Result<()> { + if !op.info().full_capability().list_with_deleted { + return Ok(()); } + let parent = TEST_FIXTURE.new_dir_path(); + let file_name = TEST_FIXTURE.new_file_path(); + let file_path = format!("{}{}", parent, file_name); + op.write(file_path.as_str(), "1").await?; + op.write(file_path.as_str(), "2").await?; + op.delete(file_path.as_str()).await?; + + // This file has been deleted + let mut ds = op.list_with(parent.as_str()).deleted(true).await?; + ds.retain(|de| de.path() == file_path && de.metadata().is_deleted()); + + assert_eq!( + ds.len(), + 1, + "deleted file should be found and only have one" + ); + Ok(()) }