diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index 609db520eb6d..c71def1429ff 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -565,6 +565,12 @@ impl S3Builder { self } + /// Enable write with append so that opendal will send write request with append headers. + pub fn enable_write_with_append(mut self) -> Self { + self.config.enable_write_with_append = true; + self + } + /// Detect region of S3 bucket. /// /// # Args @@ -896,6 +902,7 @@ impl Builder for S3Builder { checksum_algorithm, delete_max_size, disable_write_with_if_match: self.config.disable_write_with_if_match, + enable_write_with_append: self.config.enable_write_with_append, }), }) } @@ -957,6 +964,8 @@ impl Access for S3Backend { write: true, write_can_empty: true, write_can_multi: true, + write_can_append: self.core.enable_write_with_append, + write_with_cache_control: true, write_with_content_type: true, write_with_content_encoding: true, @@ -1052,11 +1061,17 @@ impl Access for S3Backend { } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let concurrent = args.concurrent(); - let executor = args.executor().cloned(); - let writer = S3Writer::new(self.core.clone(), path, args); + let writer = S3Writer::new(self.core.clone(), path, args.clone()); - let w = oio::MultipartWriter::new(writer, executor, concurrent); + let w = if args.append() { + S3Writers::Two(oio::AppendWriter::new(writer)) + } else { + S3Writers::One(oio::MultipartWriter::new( + writer, + args.executor().cloned(), + args.concurrent(), + )) + }; Ok((RpWrite::default(), w)) } diff --git a/core/src/services/s3/config.rs b/core/src/services/s3/config.rs index 0c40d73f85fd..74b78a03c3d2 100644 --- a/core/src/services/s3/config.rs +++ b/core/src/services/s3/config.rs @@ -139,6 +139,7 @@ pub struct S3Config { /// - `GLACIER_IR` /// - `INTELLIGENT_TIERING` /// - `ONEZONE_IA` + /// - `EXPRESS_ONEZONE` /// - `OUTPOSTS` /// - `REDUCED_REDUNDANCY` /// - `STANDARD` @@ -184,6 +185,9 @@ pub struct S3Config { /// /// For example, Ceph RADOS S3 doesn't support write with if match. pub disable_write_with_if_match: bool, + + /// Enable write with append so that opendal will send write request with append headers. + pub enable_write_with_append: bool, } impl Debug for S3Config { diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index c58c29fc9a9d..991d32d02498 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -70,6 +70,8 @@ pub mod constants { pub const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: &str = "x-amz-copy-source-server-side-encryption-customer-key-md5"; + pub const X_AMZ_WRITE_OFFSET_BYTES: &str = "x-amz-write-offset-bytes"; + pub const X_AMZ_META_PREFIX: &str = "x-amz-meta-"; pub const X_AMZ_VERSION_ID: &str = "x-amz-version-id"; @@ -103,6 +105,7 @@ pub struct S3Core { pub delete_max_size: usize, pub checksum_algorithm: Option, pub disable_write_with_if_match: bool, + pub enable_write_with_append: bool, } impl Debug for S3Core { @@ -292,6 +295,54 @@ impl S3Core { } req } + + pub fn insert_metadata_headers( + &self, + mut req: http::request::Builder, + size: Option, + args: &OpWrite, + ) -> http::request::Builder { + if let Some(size) = size { + req = req.header(CONTENT_LENGTH, size.to_string()) + } + + if let Some(mime) = args.content_type() { + req = req.header(CONTENT_TYPE, mime) + } + + if let Some(pos) = args.content_disposition() { + req = req.header(CONTENT_DISPOSITION, pos) + } + + if let Some(encoding) = args.content_encoding() { + req = req.header(CONTENT_ENCODING, encoding); + } + + if let Some(cache_control) = args.cache_control() { + req = req.header(CACHE_CONTROL, cache_control) + } + + if let Some(if_match) = args.if_match() { + req = req.header(IF_MATCH, if_match); + } + + if args.if_not_exists() { + req = req.header(IF_NONE_MATCH, "*"); + } + + // Set storage class header + if let Some(v) = &self.default_storage_class { + req = req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v); + } + + // Set user metadata headers. + if let Some(user_metadata) = args.user_metadata() { + for (key, value) in user_metadata { + req = req.header(format!("{X_AMZ_META_PREFIX}{key}"), value) + } + } + req + } } impl S3Core { @@ -471,55 +522,48 @@ impl S3Core { let mut req = Request::put(&url); - if let Some(size) = size { - req = req.header(CONTENT_LENGTH, size.to_string()) - } - - if let Some(mime) = args.content_type() { - req = req.header(CONTENT_TYPE, mime) - } + req = self.insert_metadata_headers(req, size, args); - if let Some(pos) = args.content_disposition() { - req = req.header(CONTENT_DISPOSITION, pos) - } + // Set SSE headers. + req = self.insert_sse_headers(req, true); - if let Some(encoding) = args.content_encoding() { - req = req.header(CONTENT_ENCODING, encoding); + // Calculate Checksum. + if let Some(checksum) = self.calculate_checksum(&body) { + // Set Checksum header. + req = self.insert_checksum_header(req, &checksum); } - if let Some(cache_control) = args.cache_control() { - req = req.header(CACHE_CONTROL, cache_control) - } + // Set body + let req = req.body(body).map_err(new_request_build_error)?; - if let Some(if_match) = args.if_match() { - req = req.header(IF_MATCH, if_match); - } + Ok(req) + } - if args.if_not_exists() { - req = req.header(IF_NONE_MATCH, "*"); - } + pub fn s3_append_object_request( + &self, + path: &str, + position: u64, + size: u64, + args: &OpWrite, + body: Buffer, + ) -> Result> { + let p = build_abs_path(&self.root, path); + let url = format!("{}/{}", self.endpoint, percent_encode_path(&p)); + let mut req = Request::put(&url); - // Set storage class header - if let Some(v) = &self.default_storage_class { - req = req.header(HeaderName::from_static(constants::X_AMZ_STORAGE_CLASS), v); + // Only include full metadata headers when creating a new object via append (position == 0) + // For existing objects or subsequent appends, only include content-length + if position == 0 { + req = self.insert_metadata_headers(req, Some(size), args); + } else { + req = req.header(CONTENT_LENGTH, size.to_string()); } - // Set user metadata headers. - if let Some(user_metadata) = args.user_metadata() { - for (key, value) in user_metadata { - req = req.header(format!("{X_AMZ_META_PREFIX}{key}"), value) - } - } + req = req.header(constants::X_AMZ_WRITE_OFFSET_BYTES, position.to_string()); // Set SSE headers. req = self.insert_sse_headers(req, true); - // Calculate Checksum. - if let Some(checksum) = self.calculate_checksum(&body) { - // Set Checksum header. - req = self.insert_checksum_header(req, &checksum); - } - // Set body let req = req.body(body).map_err(new_request_build_error)?; diff --git a/core/src/services/s3/docs.md b/core/src/services/s3/docs.md index 1bae73dedfcd..6386aaf5779d 100644 --- a/core/src/services/s3/docs.md +++ b/core/src/services/s3/docs.md @@ -5,6 +5,7 @@ This service can be used to: - [x] stat - [x] read - [x] write +- [x] append - [x] create_dir - [x] delete - [x] copy diff --git a/core/src/services/s3/writer.rs b/core/src/services/s3/writer.rs index 34f5ce99faa7..30045308326f 100644 --- a/core/src/services/s3/writer.rs +++ b/core/src/services/s3/writer.rs @@ -27,7 +27,7 @@ use bytes::Buf; use constants::{X_AMZ_OBJECT_SIZE, X_AMZ_VERSION_ID}; use http::StatusCode; -pub type S3Writers = oio::MultipartWriter; +pub type S3Writers = TwoWays, oio::AppendWriter>; pub struct S3Writer { core: Arc, @@ -222,3 +222,39 @@ impl oio::MultipartWrite for S3Writer { } } } + +impl oio::AppendWrite for S3Writer { + async fn offset(&self) -> Result { + let resp = self + .core + .s3_head_object(&self.path, OpStat::default()) + .await?; + + let status = resp.status(); + + match status { + StatusCode::OK => Ok(parse_content_length(resp.headers())?.unwrap_or_default()), + StatusCode::NOT_FOUND => Ok(0), + _ => Err(parse_error(resp)), + } + } + + async fn append(&self, offset: u64, size: u64, body: Buffer) -> Result { + let mut req = self + .core + .s3_append_object_request(&self.path, offset, size, &self.op, body)?; + + self.core.sign(&mut req).await?; + + let resp = self.core.send(req).await?; + + let status = resp.status(); + + let meta = S3Writer::parse_header_into_meta(&self.path, resp.headers())?; + + match status { + StatusCode::CREATED | StatusCode::OK => Ok(meta), + _ => Err(parse_error(resp)), + } + } +}