Skip to content

Commit

Permalink
feat(core): Implement list with deleted for s3 service
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo committed Jan 2, 2025
1 parent 8398e1f commit bd255c7
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 76 deletions.
11 changes: 4 additions & 7 deletions core/src/services/s3/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,7 @@ impl Access for S3Backend {
list_with_start_after: true,
list_with_recursive: true,
list_with_versions: self.core.enable_versioning,
list_with_deleted: self.core.enable_versioning,
list_has_etag: true,
list_has_content_md5: true,
list_has_content_length: true,
Expand Down Expand Up @@ -1060,21 +1061,17 @@ impl Access for S3Backend {
}

async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
let l = if args.versions() {
let l = if args.versions() || args.deleted() {
TwoWays::Two(PageLister::new(S3ObjectVersionsLister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
args,
)))
} else {
TwoWays::One(PageLister::new(S3Lister::new(
self.core.clone(),
path,
args.recursive(),
args.limit(),
args.start_after(),
args,
)))
};

Expand Down
31 changes: 30 additions & 1 deletion core/src/services/s3/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,6 @@ impl S3Core {
write!(url, "&max-keys={limit}").expect("write into string must succeed");
}
if let Some(start_after) = start_after {
let start_after = build_abs_path(&self.root, &start_after);
write!(url, "&start-after={}", percent_encode_path(&start_after))
.expect("write into string must succeed");
}
Expand Down Expand Up @@ -994,6 +993,7 @@ pub struct ListObjectVersionsOutput {
pub next_version_id_marker: Option<String>,
pub common_prefixes: Vec<OutputCommonPrefix>,
pub version: Vec<ListObjectVersionsOutputVersion>,
pub delete_marker: Vec<ListObjectVersionsOutputDeleteMarker>,
}

#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
Expand All @@ -1008,6 +1008,15 @@ pub struct ListObjectVersionsOutputVersion {
pub etag: Option<String>,
}

#[derive(Default, Debug, Eq, PartialEq, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct ListObjectVersionsOutputDeleteMarker {
pub key: String,
pub version_id: String,
pub is_latest: bool,
pub last_modified: String,
}

pub enum ChecksumAlgorithm {
Crc32c,
}
Expand Down Expand Up @@ -1284,6 +1293,16 @@ mod tests {
<CommonPrefixes>
<Prefix>videos/</Prefix>
</CommonPrefixes>
<DeleteMarker>
<Key>my-third-image.jpg</Key>
<VersionId>03jpff543dhffds434rfdsFDN943fdsFkdmqnh892</VersionId>
<IsLatest>true</IsLatest>
<LastModified>2009-10-15T17:50:30.000Z</LastModified>
<Owner>
<ID>75aa57f09aa0c8caeab4f8c24e99d10f8e7faeebf76c078efc7c6caea54ba06a</ID>
<DisplayName>[email protected]</DisplayName>
</Owner>
</DeleteMarker>
</ListVersionsResult>"#,
);

Expand Down Expand Up @@ -1329,5 +1348,15 @@ mod tests {
}
]
);

assert_eq!(
output.delete_marker,
vec![ListObjectVersionsOutputDeleteMarker {
key: "my-third-image.jpg".to_owned(),
version_id: "03jpff543dhffds434rfdsFDN943fdsFkdmqnh892".to_owned(),
is_latest: true,
last_modified: "2009-10-15T17:50:30.000Z".to_owned(),
},]
);
}
}
116 changes: 64 additions & 52 deletions core/src/services/s3/lister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,26 @@ pub struct S3Lister {
core: Arc<S3Core>,

path: String,
delimiter: &'static str,
limit: Option<usize>,
args: OpList,

/// Amazon S3 starts listing **after** this specified key
start_after: Option<String>,
delimiter: &'static str,
abs_start_after: Option<String>,
}

impl S3Lister {
pub fn new(
core: Arc<S3Core>,
path: &str,
recursive: bool,
limit: Option<usize>,
start_after: Option<&str>,
) -> Self {
let delimiter = if recursive { "" } else { "/" };
pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
let delimiter = if args.recursive() { "" } else { "/" };
let abs_start_after = args
.start_after()
.map(|start_after| build_abs_path(&core.root, start_after));

Self {
core,

path: path.to_string(),
args,
delimiter,
limit,
start_after: start_after.map(String::from),
abs_start_after,
}
}
}
Expand All @@ -70,10 +67,10 @@ impl oio::PageList for S3Lister {
&self.path,
&ctx.token,
self.delimiter,
self.limit,
self.args.limit(),
// start after should only be set for the first page.
if ctx.token.is_empty() {
self.start_after.clone()
self.abs_start_after.clone()
} else {
None
},
Expand Down Expand Up @@ -143,35 +140,29 @@ impl oio::PageList for S3Lister {
}
}

// refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html
/// refer: https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectVersions.html
pub struct S3ObjectVersionsLister {
core: Arc<S3Core>,

prefix: String,
args: OpList,

delimiter: &'static str,
limit: Option<usize>,
start_after: String,
abs_start_after: String,
abs_start_after: Option<String>,
}

impl S3ObjectVersionsLister {
pub fn new(
core: Arc<S3Core>,
path: &str,
recursive: bool,
limit: Option<usize>,
start_after: Option<&str>,
) -> Self {
let delimiter = if recursive { "" } else { "/" };
let start_after = start_after.unwrap_or_default().to_owned();
let abs_start_after = build_abs_path(core.root.as_str(), start_after.as_str());
pub fn new(core: Arc<S3Core>, path: &str, args: OpList) -> Self {
let delimiter = if args.recursive() { "" } else { "/" };
let abs_start_after = args
.start_after()
.map(|start_after| build_abs_path(&core.root, start_after));

Self {
core,
prefix: path.to_string(),
args,
delimiter,
limit,
start_after,
abs_start_after,
}
}
Expand All @@ -182,8 +173,8 @@ impl oio::PageList for S3ObjectVersionsLister {
let markers = ctx.token.rsplit_once(" ");
let (key_marker, version_id_marker) = if let Some(data) = markers {
data
} else if !self.start_after.is_empty() {
(self.abs_start_after.as_str(), "")
} else if let Some(start_after) = &self.abs_start_after {
(start_after.as_str(), "")
} else {
("", "")
};
Expand All @@ -193,7 +184,7 @@ impl oio::PageList for S3ObjectVersionsLister {
.s3_list_object_versions(
&self.prefix,
self.delimiter,
self.limit,
self.args.limit(),
key_marker,
version_id_marker,
)
Expand Down Expand Up @@ -231,26 +222,47 @@ impl oio::PageList for S3ObjectVersionsLister {
ctx.entries.push_back(de);
}

for version_object in output.version {
let mut path = build_rel_path(&self.core.root, &version_object.key);
if path.is_empty() {
path = "/".to_owned();
if self.args.versions() {
for version_object in output.version {
let mut path = build_rel_path(&self.core.root, &version_object.key);
if path.is_empty() {
path = "/".to_owned();
}

let mut meta = Metadata::new(EntryMode::from_path(&path));
meta.set_version(&version_object.version_id);
meta.set_is_current(version_object.is_latest);
meta.set_content_length(version_object.size);
meta.set_last_modified(parse_datetime_from_rfc3339(
version_object.last_modified.as_str(),
)?);
if let Some(etag) = version_object.etag {
meta.set_etag(&etag);
meta.set_content_md5(etag.trim_matches('"'));
}

let entry = oio::Entry::new(&path, meta);
ctx.entries.push_back(entry);
}
}

let mut meta = Metadata::new(EntryMode::from_path(&path));
meta.set_version(&version_object.version_id);
meta.set_is_current(version_object.is_latest);
meta.set_content_length(version_object.size);
meta.set_last_modified(parse_datetime_from_rfc3339(
version_object.last_modified.as_str(),
)?);
if let Some(etag) = version_object.etag {
meta.set_etag(&etag);
meta.set_content_md5(etag.trim_matches('"'));
if self.args.deleted() {
for delete_marker in output.delete_marker {
let mut path = build_rel_path(&self.core.root, &delete_marker.key);
if path.is_empty() {
path = "/".to_owned();
}

let mut meta = Metadata::new(EntryMode::FILE);
meta.set_version(&delete_marker.version_id);
meta.set_is_current(delete_marker.is_latest);
meta.set_last_modified(parse_datetime_from_rfc3339(
delete_marker.last_modified.as_str(),
)?);

let entry = oio::Entry::new(&path, meta);
ctx.entries.push_back(entry);
}

let entry = oio::Entry::new(&path, meta);
ctx.entries.push_back(entry);
}

Ok(())
Expand Down
4 changes: 3 additions & 1 deletion core/src/types/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,10 @@ pub struct Capability {
/// Indicates if versions listing is supported.
#[deprecated(since = "0.51.1", note = "use with_versions instead")]
pub list_with_version: bool,
/// Indicates if versions listing is supported.
/// Indicates if listing with versions included is supported.
pub list_with_versions: bool,
/// Indicates if listing with deleted files included is supported.
pub list_with_deleted: bool,
/// Indicates whether cache control information is available in list response
pub list_has_cache_control: bool,
/// Indicates whether content disposition information is available in list response
Expand Down
4 changes: 4 additions & 0 deletions core/src/types/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ impl Metadata {
/// `Content-Length` is defined by [RFC 7230](https://httpwg.org/specs/rfc7230.html#header.content-length)
///
/// Refer to [MDN Content-Length](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Length) for more information.
///
/// # Returns
///
/// Content length of this entry. It will be `0` if the content length is not set by the storage services.
pub fn content_length(&self) -> u64 {
self.content_length.unwrap_or_default()
}
Expand Down
44 changes: 36 additions & 8 deletions core/src/types/operator/operator_futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,16 +501,30 @@ impl<F: Future<Output = Result<Vec<Entry>>>> FutureList<F> {
self.map(|args| args.with_versions(v))
}

/// The version is used to control whether the object versions should be returned.
/// Controls whether the `list` operation should return file versions.
///
/// - If `false`, list operation will not return with object versions
/// - If `true`, list operation will return with object versions if object versioning is supported
/// by the underlying service
/// This function allows you to specify if the `list` operation, when executed, should include
/// information about different versions of files, if versioning is supported and enabled.
///
/// If `true`, subsequent `list` operations will include version information for each file.
/// If `false`, version information will be omitted from the `list` results.
///
/// Default to `false`
pub fn versions(self, v: bool) -> Self {
self.map(|args| args.with_versions(v))
}

/// Controls whether the `list` operation should include deleted files (or versions).
///
/// This function allows you to specify if the `list` operation, when executed, should include
/// entries for files or versions that have been marked as deleted. This is particularly relevant
/// in object storage systems that support soft deletion or versioning.
///
/// If `true`, subsequent `list` operations will include deleted files or versions.
/// If `false`, deleted files or versions will be excluded from the `list` results.
pub fn deleted(self, v: bool) -> Self {
self.map(|args| args.with_deleted(v))
}
}

/// Future that generated by [`Operator::list_with`] or [`Operator::lister_with`].
Expand Down Expand Up @@ -555,14 +569,28 @@ impl<F: Future<Output = Result<Lister>>> FutureLister<F> {
self.map(|args| args.with_versions(v))
}

/// The version is used to control whether the object versions should be returned.
/// Controls whether the `list` operation should return file versions.
///
/// - If `false`, list operation will not return with object versions
/// - If `true`, list operation will return with object versions if object versioning is supported
/// by the underlying service
/// This function allows you to specify if the `list` operation, when executed, should include
/// information about different versions of files, if versioning is supported and enabled.
///
/// If `true`, subsequent `list` operations will include version information for each file.
/// If `false`, version information will be omitted from the `list` results.
///
/// Default to `false`
pub fn versions(self, v: bool) -> Self {
self.map(|args| args.with_versions(v))
}

/// Controls whether the `list` operation should include deleted files (or versions).
///
/// This function allows you to specify if the `list` operation, when executed, should include
/// entries for files or versions that have been marked as deleted. This is particularly relevant
/// in object storage systems that support soft deletion or versioning.
///
/// If `true`, subsequent `list` operations will include deleted files or versions.
/// If `false`, deleted files or versions will be excluded from the `list` results.
pub fn deleted(self, v: bool) -> Self {
self.map(|args| args.with_deleted(v))
}
}
Loading

0 comments on commit bd255c7

Please sign in to comment.