Skip to content

Commit b1f6eb0

Browse files
JWackerbauerJan Wackerbauer
and
Jan Wackerbauer
authored
feat: Add crc32c checksums to S3 Service (#4533)
* Add crc32c checksums to S3 * Properly encode checksum based on https://docs.aws.amazon.com/AmazonS3/latest/API/API_Checksum.html * Fix formatting * Remove S3 prefix, don't copy body & declare checksum_algorithm in initiate MultipartUpload * Improve documentation * Fix formatting * Refactor ChecksumAlgorithm * Fix formatting --------- Co-authored-by: Jan Wackerbauer <[email protected]>
1 parent e677ba2 commit b1f6eb0

File tree

4 files changed

+95
-0
lines changed

4 files changed

+95
-0
lines changed

core/Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,7 @@ prometheus-client = { version = "0.22.2", optional = true }
350350
tracing = { version = "0.1", optional = true }
351351
# for layers-dtrace
352352
probe = { version = "0.5.1", optional = true }
353+
crc32c = "0.6.5"
353354

354355
[target.'cfg(target_arch = "wasm32")'.dependencies]
355356
getrandom = { version = "0.2", features = ["js"] }

core/src/services/s3/backend.rs

+29
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,12 @@ pub struct S3Config {
204204
///
205205
/// For example, R2 doesn't support stat with `response_content_type` query.
206206
pub disable_stat_with_override: bool,
207+
/// Checksum Algorithm to use when sending checksums in HTTP headers.
208+
/// This is necessary when writing to AWS S3 Buckets with Object Lock enabled for example.
209+
///
210+
/// Available options:
211+
/// - "crc32c"
212+
pub checksum_algorithm: Option<String>,
207213
}
208214

209215
impl Debug for S3Config {
@@ -663,6 +669,17 @@ impl S3Builder {
663669
self
664670
}
665671

672+
/// Set checksum algorithm of this backend.
673+
/// This is necessary when writing to AWS S3 Buckets with Object Lock enabled for example.
674+
///
675+
/// Available options:
676+
/// - "crc32c"
677+
pub fn checksum_algorithm(&mut self, checksum_algorithm: &str) -> &mut Self {
678+
self.config.checksum_algorithm = Some(checksum_algorithm.to_string());
679+
680+
self
681+
}
682+
666683
/// Detect region of S3 bucket.
667684
///
668685
/// # Args
@@ -858,6 +875,17 @@ impl Builder for S3Builder {
858875
})?),
859876
};
860877

878+
let checksum_algorithm = match self.config.checksum_algorithm.as_deref() {
879+
Some("crc32c") => Some(ChecksumAlgorithm::Crc32c),
880+
None => None,
881+
_ => {
882+
return Err(Error::new(
883+
ErrorKind::ConfigInvalid,
884+
"{v} is not a supported checksum_algorithm.",
885+
))
886+
}
887+
};
888+
861889
let client = if let Some(client) = self.http_client.take() {
862890
client
863891
} else {
@@ -979,6 +1007,7 @@ impl Builder for S3Builder {
9791007
credential_loaded: AtomicBool::new(false),
9801008
client,
9811009
batch_max_operations,
1010+
checksum_algorithm,
9821011
}),
9831012
})
9841013
}

core/src/services/s3/core.rs

+64
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717

1818
use std::fmt;
1919
use std::fmt::Debug;
20+
use std::fmt::Display;
2021
use std::fmt::Formatter;
2122
use std::fmt::Write;
2223
use std::sync::atomic;
2324
use std::sync::atomic::AtomicBool;
2425
use std::time::Duration;
2526

27+
use base64::prelude::BASE64_STANDARD;
28+
use base64::Engine;
2629
use bytes::Bytes;
2730
use http::header::HeaderName;
2831
use http::header::CACHE_CONTROL;
@@ -88,6 +91,7 @@ pub struct S3Core {
8891
pub credential_loaded: AtomicBool,
8992
pub client: HttpClient,
9093
pub batch_max_operations: usize,
94+
pub checksum_algorithm: Option<ChecksumAlgorithm>,
9195
}
9296

9397
impl Debug for S3Core {
@@ -246,6 +250,35 @@ impl S3Core {
246250

247251
req
248252
}
253+
254+
pub fn insert_checksum_header(
255+
&self,
256+
mut req: http::request::Builder,
257+
body: &Buffer,
258+
) -> http::request::Builder {
259+
if let Some(checksum_algorithm) = self.checksum_algorithm.as_ref() {
260+
let checksum = match checksum_algorithm {
261+
ChecksumAlgorithm::Crc32c => {
262+
let mut crc = 0u32;
263+
body.clone()
264+
.for_each(|b| crc = crc32c::crc32c_append(crc, &b));
265+
BASE64_STANDARD.encode(crc.to_be_bytes())
266+
}
267+
};
268+
req = req.header(checksum_algorithm.to_header_name(), checksum);
269+
}
270+
req
271+
}
272+
273+
pub fn insert_checksum_type_header(
274+
&self,
275+
mut req: http::request::Builder,
276+
) -> http::request::Builder {
277+
if let Some(checksum_algorithm) = self.checksum_algorithm.as_ref() {
278+
req = req.header("x-amz-checksum-algorithm", checksum_algorithm.to_string());
279+
}
280+
req
281+
}
249282
}
250283

251284
impl S3Core {
@@ -408,6 +441,9 @@ impl S3Core {
408441
// Set SSE headers.
409442
req = self.insert_sse_headers(req, true);
410443

444+
// Set Checksum header.
445+
req = self.insert_checksum_header(req, &body);
446+
411447
// Set body
412448
let req = req.body(body).map_err(new_request_build_error)?;
413449

@@ -573,6 +609,9 @@ impl S3Core {
573609
// Set SSE headers.
574610
let req = self.insert_sse_headers(req, true);
575611

612+
// Set SSE headers.
613+
let req = self.insert_checksum_type_header(req);
614+
576615
let mut req = req.body(Buffer::new()).map_err(new_request_build_error)?;
577616

578617
self.sign(&mut req).await?;
@@ -605,6 +644,9 @@ impl S3Core {
605644
// Set SSE headers.
606645
req = self.insert_sse_headers(req, true);
607646

647+
// Set Checksum header.
648+
req = self.insert_checksum_header(req, &body);
649+
608650
// Set body
609651
let req = req.body(body).map_err(new_request_build_error)?;
610652

@@ -821,6 +863,28 @@ pub struct OutputCommonPrefix {
821863
pub prefix: String,
822864
}
823865

866+
pub enum ChecksumAlgorithm {
867+
Crc32c,
868+
}
869+
impl ChecksumAlgorithm {
870+
pub fn to_header_name(&self) -> HeaderName {
871+
match self {
872+
Self::Crc32c => HeaderName::from_static("x-amz-checksum-crc32c"),
873+
}
874+
}
875+
}
876+
impl Display for ChecksumAlgorithm {
877+
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
878+
write!(
879+
f,
880+
"{}",
881+
match self {
882+
Self::Crc32c => "CRC32C",
883+
}
884+
)
885+
}
886+
}
887+
824888
#[cfg(test)]
825889
mod tests {
826890
use bytes::Buf;

0 commit comments

Comments
 (0)