diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6edb924e..d9f9516d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -93,6 +93,23 @@ jobs: -e GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME=test-bucket \ gcr.io/cloud-devrel-public-resources/storage-testbench:latest + - name: Start S3-compatible Emulator + run: | + docker run -d \ + --name s3-emulator \ + -p 8088:9000 \ + -e MINIO_ROOT_USER=test-key \ + -e MINIO_ROOT_PASSWORD=test-secret \ + --health-cmd "wget --no-verbose --tries=1 --spider http://localhost:9000/minio/health/live" \ + --health-interval 10s \ + --health-timeout 5s \ + --health-retries 5 \ + docker.io/minio/minio:edge-cicd@sha256:29e8e51691d11e779468f275002779b221fd3902518d103e35c8a8bb2ef0f3ea + + apt-get update + apt-get install -y s3cmd + s3cmd --access_key=test-key --secret_key=test-secret --host=localhost:8088 --no-ssl mb s3://test-bucket + - name: Install Rust Toolchain run: | rustup toolchain install stable --profile minimal --no-self-update diff --git a/Cargo.lock b/Cargo.lock index 87568ac3..06b02409 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -264,12 +264,44 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "attohttpc" +version = "0.30.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e2cdb6d5ed835199484bb92bb8b3edd526effe995c61732580439c1a67e2e9" +dependencies = [ + "base64", + "http 1.3.1", + "log", + "native-tls", + "serde", + "serde_json", + "url", +] + [[package]] name = "autocfg" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "aws-creds" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b13804829a843b3f26e151c97acbb315ee1177a2724690edfcd28f1894146200" +dependencies = [ + "attohttpc", + "home", + "log", + "quick-xml", + "rust-ini", + "serde", + "thiserror", + "time", + "url", +] + [[package]] name = "aws-lc-rs" version = "1.14.1" @@ -293,6 +325,15 @@ dependencies = [ "fs_extra", ] +[[package]] +name = "aws-region" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5532f65342f789f9c1b7078ea9c9cd9293cd62dcc284fa99adc4a1c9ba43469c" +dependencies = [ + "thiserror", +] + [[package]] name = "axum" version = "0.8.6" @@ -491,6 +532,15 @@ dependencies = [ "bytes", ] +[[package]] +name = "castaway" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dec551ab6e7578819132c713a93c022a05d60159dc86e7a7050223577484c55a" +dependencies = [ + "rustversion", +] + [[package]] name = "cc" version = "1.2.44" @@ -558,6 +608,19 @@ dependencies = [ "cc", ] +[[package]] +name = "compact_str" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f86b9c4c00838774a6d902ef931eff7470720c51d90c2e32cfe15dc304737b3f" +dependencies = [ + "castaway", + "cfg-if", + "itoa", + "ryu", + "static_assertions", +] + [[package]] name = "compression-codecs" version = "0.4.31" @@ -588,6 +651,26 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "const-random" +version = "0.1.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359" +dependencies = [ + "const-random-macro", +] + +[[package]] +name = "const-random-macro" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e" +dependencies = [ + "getrandom 0.2.16", + "once_cell", + "tiny-keccak", +] + [[package]] name = "core-foundation" version = "0.9.4" @@ -653,6 +736,12 @@ version = "0.8.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" +[[package]] +name = "crunchy" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" + [[package]] name = "crypto-common" version = "0.1.6" @@ -763,6 +852,7 @@ checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ "block-buffer", "crypto-common", + "subtle", ] [[package]] @@ -776,6 +866,15 @@ dependencies = [ "syn", ] +[[package]] +name = "dlv-list" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "442039f5147480ba31067cb00ada1adae6892028e40e45fc5de7b7df6dcc1b5f" +dependencies = [ + "const-random", +] + [[package]] name = "dunce" version = "1.0.5" @@ -1133,6 +1232,12 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.16.0" @@ -1203,6 +1308,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + +[[package]] +name = "home" +version = "0.5.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc627f471c528ff0c4a49e1d5e60450c8f6461dd6d10ba9dcd3a61d3dff7728d" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "hostname" version = "0.4.1" @@ -1394,7 +1517,7 @@ dependencies = [ "js-sys", "log", "wasm-bindgen", - "windows-core", + "windows-core 0.62.2", ] [[package]] @@ -1752,6 +1875,23 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" +[[package]] +name = "maybe-async" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cf92c10c7e361d6b99666ec1c6f9805b0bea2c3bd8c78dc6fe98ac5bd78db11" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "md5" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae960838283323069879657ca3de837e9f7bbb4c7bf6ea7f1b290d5e9476d2e0" + [[package]] name = "memchr" version = "2.7.6" @@ -1801,6 +1941,15 @@ dependencies = [ "unicase", ] +[[package]] +name = "minidom" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e394a0e3c7ccc2daea3dffabe82f09857b6b510cb25af87d54bf3e910ac1642d" +dependencies = [ + "rxml", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -1891,6 +2040,15 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + [[package]] name = "nu-ansi-term" version = "0.50.3" @@ -1926,6 +2084,25 @@ dependencies = [ "libc", ] +[[package]] +name = "objc2-core-foundation" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a180dd8642fa45cdb7dd721cd4c11b1cadd4929ce112ebd8b9f5803cc79d536" +dependencies = [ + "bitflags", +] + +[[package]] +name = "objc2-io-kit" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33fafba39597d6dc1fb709123dfa8289d39406734be322956a69f0931c73bb15" +dependencies = [ + "libc", + "objc2-core-foundation", +] + [[package]] name = "object" version = "0.37.3" @@ -2003,6 +2180,7 @@ dependencies = [ "bigtable_rs", "bytes", "data-encoding", + "futures", "futures-util", "gcp_auth", "humantime", @@ -2010,6 +2188,7 @@ dependencies = [ "merni", "objectstore-types", "reqwest", + "rust-s3", "sentry", "serde", "serde_json", @@ -2085,6 +2264,16 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "ordered-multimap" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49203cdcae0030493bad186b28da2fa25645fa276a51b6fec8010d281e02ef79" +dependencies = [ + "dlv-list", + "hashbrown 0.14.5", +] + [[package]] name = "os_info" version = "3.12.0" @@ -2408,6 +2597,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42a232e7487fc2ef313d96dde7948e7a3c05101870d8985e4fd8d26aedd27b89" dependencies = [ "memchr", + "serde", ] [[package]] @@ -2602,6 +2792,51 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a157657054ffe556d8858504af8a672a054a6e0bd9e8ee531059100c0fa11bb2" +[[package]] +name = "rust-ini" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "796e8d2b6696392a43bea58116b667fb4c29727dc5abd27d6acf338bb4f688c7" +dependencies = [ + "cfg-if", + "ordered-multimap", +] + +[[package]] +name = "rust-s3" +version = "0.37.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94f9b973bd4097f5bb47e5827dcb9fb5dc17e93879e46badc27d2a4e9a4e5588" +dependencies = [ + "async-trait", + "aws-creds", + "aws-region", + "base64", + "bytes", + "cfg-if", + "futures-util", + "hex", + "hmac", + "http 1.3.1", + "log", + "maybe-async", + "md5", + "minidom", + "percent-encoding", + "quick-xml", + "reqwest", + "serde", + "serde_derive", + "serde_json", + "sha2", + "sysinfo", + "thiserror", + "time", + "tokio", + "tokio-stream", + "url", +] + [[package]] name = "rustc-demangle" version = "0.1.26" @@ -2700,6 +2935,25 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" +[[package]] +name = "rxml" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc94b580d0f5a6b7a2d604e597513d3c673154b52ddeccd1d5c32360d945ee" +dependencies = [ + "bytes", + "rxml_validation", +] + +[[package]] +name = "rxml_validation" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "826e80413b9a35e9d33217b3dcac04cf95f6559d15944b93887a08be5496c4a4" +dependencies = [ + "compact_str", +] + [[package]] name = "ryu" version = "1.0.20" @@ -3079,6 +3333,17 @@ dependencies = [ "digest", ] +[[package]] +name = "sha2" +version = "0.10.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -3162,6 +3427,12 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "stresstest" version = "0.1.0" @@ -3231,6 +3502,20 @@ dependencies = [ "syn", ] +[[package]] +name = "sysinfo" +version = "0.37.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16607d5caffd1c07ce073528f9ed972d88db15dd44023fa57142963be3feb11f" +dependencies = [ + "libc", + "memchr", + "ntapi", + "objc2-core-foundation", + "objc2-io-kit", + "windows", +] + [[package]] name = "system-configuration" version = "0.6.1" @@ -3331,6 +3616,15 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny-keccak" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237" +dependencies = [ + "crunchy", +] + [[package]] name = "tinystr" version = "0.8.2" @@ -3947,6 +4241,41 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.61.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893" +dependencies = [ + "windows-collections", + "windows-core 0.61.2", + "windows-future", + "windows-link 0.1.3", + "windows-numerics", +] + +[[package]] +name = "windows-collections" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3beeceb5e5cfd9eb1d76b381630e82c4241ccd0d27f1a39ed41b2760b255c5e8" +dependencies = [ + "windows-core 0.61.2", +] + +[[package]] +name = "windows-core" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link 0.1.3", + "windows-result 0.3.4", + "windows-strings 0.4.2", +] + [[package]] name = "windows-core" version = "0.62.2" @@ -3960,6 +4289,17 @@ dependencies = [ "windows-strings 0.5.1", ] +[[package]] +name = "windows-future" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e" +dependencies = [ + "windows-core 0.61.2", + "windows-link 0.1.3", + "windows-threading", +] + [[package]] name = "windows-implement" version = "0.60.2" @@ -3994,6 +4334,16 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-numerics" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1" +dependencies = [ + "windows-core 0.61.2", + "windows-link 0.1.3", +] + [[package]] name = "windows-registry" version = "0.5.3" @@ -4125,6 +4475,15 @@ dependencies = [ "windows_x86_64_msvc 0.53.1", ] +[[package]] +name = "windows-threading" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b66463ad2e0ea3bbf808b7f1d371311c80e115c0b71d60efc142cafbcfb057a6" +dependencies = [ + "windows-link 0.1.3", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" diff --git a/Cargo.toml b/Cargo.toml index ac257e25..cf17081c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,6 +39,7 @@ merni = "0.1.3" mimalloc = { version = "0.1.48", features = ["v3", "override"] } rand = "0.9.1" reqwest = { version = "0.12.22" } +rust-s3 = "0.37.0" sentry = { version = "0.45.0" } serde = { version = "1.0.219", features = ["derive"] } serde_json = "1.0.140" diff --git a/objectstore-server/src/config.rs b/objectstore-server/src/config.rs index 8ce9b423..9f52010f 100644 --- a/objectstore-server/src/config.rs +++ b/objectstore-server/src/config.rs @@ -150,7 +150,7 @@ pub enum Storage { /// Or for long-term storage: /// - `OS__LONG_TERM_STORAGE__TYPE=s3compatible` /// - `OS__LONG_TERM_STORAGE__ENDPOINT=https://s3.amazonaws.com` - endpoint: String, + endpoint: Option, /// S3 bucket name. /// @@ -161,6 +161,62 @@ pub enum Storage { /// - `OS__HIGH_VOLUME_STORAGE__BUCKET=my-bucket` /// - `OS__LONG_TERM_STORAGE__BUCKET=my-bucket` bucket: String, + + /// S3 region name. + /// + /// # Environment Variables + /// + /// - `OS__HIGH_VOLUME_STORAGE__REGION=us-east-1` + /// - `OS__LONG_TERM_STORAGE__REGION=us-east-1` + region: String, + + /// Whether to use path-style URLs for the S3-compatible storage. + /// + /// # Environment Variables + /// + /// - `OS__HIGH_VOLUME_STORAGE__USE_PATH_STYLE=true` + /// - `OS__LONG_TERM_STORAGE__USE_PATH_STYLE=true` + use_path_style: Option, + + /// Optional access key for the S3-compatible storage. + /// + /// # Environment Variables + /// + /// - `OS__HIGH_VOLUME_STORAGE__ACCESS_KEY=my-access-key` + /// - `OS__LONG_TERM_STORAGE__ACCESS_KEY=my-access-key` + access_key: Option, + + /// Optional secret key for the S3-compatible storage. + /// + /// # Environment Variables + /// + /// - `OS__HIGH_VOLUME_STORAGE__SECRET_KEY=my-secret-key` + /// - `OS__LONG_TERM_STORAGE__SECRET_KEY=my-secret-key` + secret_key: Option, + + /// Optional security token for the S3-compatible storage. + /// + /// # Environment Variables + /// + /// - `OS__HIGH_VOLUME_STORAGE__SECURITY_TOKEN=my-security-token` + /// - `OS__LONG_TERM_STORAGE__SECURITY_TOKEN=my-security-token` + security_token: Option, + + /// Optional session token for the S3-compatible storage. + /// + /// # Environment Variables + /// + /// - `OS__HIGH_VOLUME_STORAGE__SESSION_TOKEN=my-session-token` + /// - `OS__LONG_TERM_STORAGE__SESSION_TOKEN=my-session-token` + session_token: Option, + + /// Optional request timeout for the S3-compatible storage. + /// + /// # Environment Variables + /// + /// - `OS__HIGH_VOLUME_STORAGE__REQUEST_TIMEOUT_SECS=30` + /// - `OS__LONG_TERM_STORAGE__REQUEST_TIMEOUT_SECS=30` + request_timeout_secs: Option, }, /// [Google Cloud Storage] backend (type `"gcs"`). @@ -889,6 +945,13 @@ mod tests { jail.set_env("OS__LONG_TERM_STORAGE__TYPE", "s3compatible"); jail.set_env("OS__LONG_TERM_STORAGE__ENDPOINT", "http://localhost:8888"); jail.set_env("OS__LONG_TERM_STORAGE__BUCKET", "whatever"); + jail.set_env("OS__LONG_TERM_STORAGE__REGION", "us-east-1"); + jail.set_env("OS__LONG_TERM_STORAGE__USE_PATH_STYLE", "true"); + jail.set_env("OS__LONG_TERM_STORAGE__ACCESS_KEY", "my-access-key"); + jail.set_env("OS__LONG_TERM_STORAGE__SECRET_KEY", "my-secret-key"); + jail.set_env("OS__LONG_TERM_STORAGE__SECURITY_TOKEN", "my-security-token"); + jail.set_env("OS__LONG_TERM_STORAGE__SESSION_TOKEN", "my-session-token"); + jail.set_env("OS__LONG_TERM_STORAGE__REQUEST_TIMEOUT_SECS", "30"); jail.set_env("OS__METRICS__TAGS__FOO", "bar"); jail.set_env("OS__METRICS__TAGS__BAZ", "qux"); jail.set_env("OS__SENTRY__DSN", "abcde"); @@ -900,12 +963,30 @@ mod tests { let config = Config::load(None).unwrap(); - let Storage::S3Compatible { endpoint, bucket } = &dbg!(&config).long_term_storage + let Storage::S3Compatible { + endpoint, + bucket, + region, + use_path_style, + access_key, + secret_key, + security_token, + session_token, + request_timeout_secs, + } = &dbg!(&config).long_term_storage else { panic!("expected s3 storage"); }; - assert_eq!(endpoint, "http://localhost:8888"); + assert_eq!(endpoint.as_deref(), Some("http://localhost:8888")); assert_eq!(bucket, "whatever"); + assert_eq!(region, "us-east-1"); + assert_eq!(*use_path_style, Some(true)); + assert_eq!(access_key.as_deref(), Some("my-access-key")); + assert_eq!(secret_key.as_deref(), Some("my-secret-key")); + assert_eq!(security_token.as_deref(), Some("my-security-token")); + assert_eq!(session_token.as_deref(), Some("my-session-token")); + assert_eq!(*request_timeout_secs, Some(30)); + assert_eq!( config.metrics.tags, [("foo".into(), "bar".into()), ("baz".into(), "qux".into())].into() @@ -934,6 +1015,13 @@ mod tests { type: s3compatible endpoint: http://localhost:8888 bucket: whatever + region: us-east-1 + use_path_style: true + access_key: my-access-key + secret_key: my-secret-key + security_token: my-security-token + session_token: my-session-token + request_timeout_secs: 30 sentry: dsn: abcde environment: production @@ -947,12 +1035,29 @@ mod tests { figment::Jail::expect_with(|_jail| { let config = Config::load(Some(tempfile.path())).unwrap(); - let Storage::S3Compatible { endpoint, bucket } = &dbg!(&config).long_term_storage + let Storage::S3Compatible { + endpoint, + bucket, + region, + use_path_style, + access_key, + secret_key, + security_token, + session_token, + request_timeout_secs, + } = &dbg!(&config).long_term_storage else { panic!("expected s3 storage"); }; - assert_eq!(endpoint, "http://localhost:8888"); + assert_eq!(endpoint.as_deref(), Some("http://localhost:8888")); assert_eq!(bucket, "whatever"); + assert_eq!(region, "us-east-1"); + assert_eq!(*use_path_style, Some(true)); + assert_eq!(access_key.as_deref(), Some("my-access-key")); + assert_eq!(secret_key.as_deref(), Some("my-secret-key")); + assert_eq!(security_token.as_deref(), Some("my-security-token")); + assert_eq!(session_token.as_deref(), Some("my-session-token")); + assert_eq!(*request_timeout_secs, Some(30)); assert_eq!(config.sentry.dsn.unwrap().expose_secret().as_str(), "abcde"); assert_eq!(config.sentry.environment.as_deref(), Some("production")); @@ -989,12 +1094,19 @@ mod tests { let Storage::S3Compatible { endpoint, bucket: _bucket, + region: _, + use_path_style: _, + access_key: _, + secret_key: _, + security_token: _, + session_token: _, + request_timeout_secs: _, } = &dbg!(&config).long_term_storage else { panic!("expected s3 storage"); }; // Env should overwrite the yaml config - assert_eq!(endpoint, "http://localhost:9001"); + assert_eq!(endpoint.as_deref(), Some("http://localhost:9001")); Ok(()) }); diff --git a/objectstore-server/src/state.rs b/objectstore-server/src/state.rs index b102b146..cc717d2d 100644 --- a/objectstore-server/src/state.rs +++ b/objectstore-server/src/state.rs @@ -24,9 +24,30 @@ impl State { fn map_storage_config(config: &'_ Storage) -> StorageConfig<'_> { match config { Storage::FileSystem { path } => StorageConfig::FileSystem { path }, - Storage::S3Compatible { endpoint, bucket } => { - StorageConfig::S3Compatible { endpoint, bucket } - } + Storage::S3Compatible { + endpoint, + bucket, + region, + use_path_style, + access_key, + secret_key, + security_token, + session_token, + request_timeout_secs, + } => StorageConfig::S3Compatible { + endpoint: endpoint.as_deref(), + bucket, + region, + use_path_style: match use_path_style { + Some(value) => *value, + None => false, + }, + access_key: access_key.as_deref(), + secret_key: secret_key.as_deref(), + security_token: security_token.as_deref(), + session_token: session_token.as_deref(), + request_timeout_secs: *request_timeout_secs, + }, Storage::Gcs { endpoint, bucket } => StorageConfig::Gcs { endpoint: endpoint.as_deref(), bucket, diff --git a/objectstore-service/Cargo.toml b/objectstore-service/Cargo.toml index 896e66ad..20cfa9e5 100644 --- a/objectstore-service/Cargo.toml +++ b/objectstore-service/Cargo.toml @@ -15,6 +15,7 @@ async-trait = { workspace = true } bigtable_rs = "0.2.18" bytes = { workspace = true } data-encoding = "2.9.0" +futures = { workspace = true } futures-util = { workspace = true } gcp_auth = "0.12.3" humantime = { workspace = true } @@ -27,6 +28,7 @@ reqwest = { workspace = true, features = [ "multipart", "json", ] } +rust-s3 = { workspace = true } sentry = { workspace = true, features = ["anyhow"] } serde = { workspace = true } serde_json = { workspace = true } diff --git a/objectstore-service/src/backend/s3_compatible.rs b/objectstore-service/src/backend/s3_compatible.rs index 805bc137..5c36a93e 100644 --- a/objectstore-service/src/backend/s3_compatible.rs +++ b/objectstore-service/src/backend/s3_compatible.rs @@ -1,131 +1,117 @@ -use std::time::{Duration, SystemTime}; -use std::{fmt, io}; - -use anyhow::{Context, Result}; -use futures_util::{StreamExt, TryStreamExt}; -use objectstore_types::{ExpirationPolicy, Metadata}; -use reqwest::{Body, IntoUrl, Method, RequestBuilder, StatusCode}; - -use crate::backend::common::{self, Backend, BackendStream}; +use std::fmt; +use std::time::Duration; + +use anyhow::Result; +use bytes::Bytes; +use futures::stream; +use objectstore_types::Metadata; +use reqwest::StatusCode; +use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; +use s3::creds::Credentials; +use s3::request::ResponseData; +use s3::{Bucket, Region}; +use tokio_util::io::StreamReader; + +use crate::backend::common::{Backend, BackendStream}; use crate::path::ObjectPath; -/// Prefix used for custom metadata in headers for the GCS backend. -/// -/// See: -const GCS_CUSTOM_PREFIX: &str = "x-goog-meta-"; -/// Header used to store the expiration time for GCS using the `daysSinceCustomTime` lifecycle -/// condition. -/// -/// See: -const GCS_CUSTOM_TIME: &str = "x-goog-custom-time"; -/// Time to debounce bumping an object with configured TTI. -const TTI_DEBOUNCE: Duration = Duration::from_secs(24 * 3600); // 1 day - -pub trait Token: Send + Sync { - fn as_str(&self) -> &str; +pub struct S3CompatibleBackend { + bucket: Box, } -pub trait TokenProvider: Send + Sync + 'static { - fn get_token(&self) -> impl Future> + Send; +pub struct S3CompatibleBackendConfig { + pub bucket: String, + pub region: String, + pub endpoint: Option, + #[allow(dead_code)] + pub extra_headers: HeaderMap, + pub request_timeout: Option, + pub path_style: Option, + pub access_key: Option, + pub secret_key: Option, + pub security_token: Option, + pub session_token: Option, } -// this only exists because we have to provide *some* kind of provider -#[derive(Debug)] -pub struct NoToken; - -impl TokenProvider for NoToken { - #[allow(refining_impl_trait_internal)] // otherwise, returning `!` will not implement the required traits - async fn get_token(&self) -> Result { - unimplemented!() - } -} -impl Token for NoToken { - fn as_str(&self) -> &str { - unimplemented!() +impl Default for S3CompatibleBackendConfig { + fn default() -> Self { + Self { + bucket: String::new(), + region: String::new(), + endpoint: None, + extra_headers: HeaderMap::new(), + request_timeout: None, + path_style: None, + access_key: None, + secret_key: None, + security_token: None, + session_token: None, + } } } -pub struct S3CompatibleBackend { - client: reqwest::Client, - - endpoint: String, - bucket: String, - - token_provider: Option, -} - -impl S3CompatibleBackend { +impl S3CompatibleBackend { /// Creates a new S3 compatible backend bound to the given bucket. - #[expect(dead_code)] - pub fn new(endpoint: &str, bucket: &str, token_provider: T) -> Self { - Self { - client: common::reqwest_client(), - endpoint: endpoint.into(), - bucket: bucket.into(), - token_provider: Some(token_provider), + pub fn new(config: S3CompatibleBackendConfig) -> Self { + let credentials = Credentials::new( + config.access_key.as_deref(), + config.secret_key.as_deref(), + config.security_token.as_deref(), + config.session_token.as_deref(), + None, + ) + .unwrap(); + + let mut bucket = Bucket::new( + config.bucket.as_str(), + Region::Custom { + region: config.region.clone(), + endpoint: match config.endpoint { + Some(endpoint) => endpoint, + None => format!("s3-{}.amazonaws.com", config.region), + }, + }, + credentials, + ) + .unwrap(); + + if let Some(path_style) = config.path_style + && path_style + { + bucket = bucket.with_path_style(); } - } - /// Formats the S3 object URL for the given key. - fn object_url(&self, path: &ObjectPath) -> String { - format!("{}/{}/{path}", self.endpoint, self.bucket) - } -} - -impl S3CompatibleBackend -where - T: TokenProvider, -{ - /// Creates a request builder with the appropriate authentication. - async fn request(&self, method: Method, url: impl IntoUrl) -> Result { - let mut builder = self.client.request(method, url); - if let Some(provider) = &self.token_provider { - builder = builder.bearer_auth(provider.get_token().await?.as_str()); + if let Some(request_timeout) = config.request_timeout { + bucket = bucket.with_request_timeout(request_timeout).unwrap(); } - Ok(builder) - } - - /// Issues a request to update the metadata for the given object. - async fn update_metadata(&self, path: &ObjectPath, metadata: &Metadata) -> Result<()> { - // NB: Meta updates require copy + REPLACE along with *all* metadata. See - // https://cloud.google.com/storage/docs/xml-api/put-object-copy - self.request(Method::PUT, self.object_url(path)) - .await? - .header("x-goog-copy-source", format!("/{}/{path}", self.bucket)) - .header("x-goog-metadata-directive", "REPLACE") - .headers(metadata.to_headers(GCS_CUSTOM_PREFIX, true)?) - .send() - .await? - .error_for_status() - .context("failed to update expiration time for object with TTI")?; - Ok(()) + Self { bucket } + } + + async fn handle_get_object_response( + &self, + _path: &ObjectPath, + response: ResponseData, + ) -> Result> { + let metadata = Metadata::from_hashmap(response.headers(), "").unwrap_or_default(); + let bytes = Bytes::from(response.to_vec()); + let stream: BackendStream = Box::pin(stream::iter(std::iter::once(Ok(bytes)))); + + Ok(Some((metadata, stream))) } } -impl fmt::Debug for S3CompatibleBackend { +impl fmt::Debug for S3CompatibleBackend { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("S3Compatible") - .field("client", &self.client) - .field("endpoint", &self.endpoint) - .field("bucket", &self.bucket) + .field("bucket", &self.bucket.name()) + .field("endpoint", &self.bucket.host()) .finish_non_exhaustive() } } -impl S3CompatibleBackend { - pub fn without_token(endpoint: &str, bucket: &str) -> Self { - Self { - client: reqwest::Client::new(), - endpoint: endpoint.into(), - bucket: bucket.into(), - token_provider: None, - } - } -} - #[async_trait::async_trait] -impl Backend for S3CompatibleBackend { +impl Backend for S3CompatibleBackend { fn name(&self) -> &'static str { "s3-compatible" } @@ -138,14 +124,22 @@ impl Backend for S3CompatibleBackend { stream: BackendStream, ) -> Result<()> { tracing::debug!("Writing to s3_compatible backend"); - self.request(Method::PUT, self.object_url(path)) - .await? - .headers(metadata.to_headers(GCS_CUSTOM_PREFIX, true)?) - .body(Body::wrap_stream(stream)) - .send() - .await? - .error_for_status() - .context("failed to put object")?; + let header_map = metadata + .custom + .iter() + .filter_map(|(key, value)| { + let name = key.parse::().ok()?; + let val = value.parse::().ok()?; + Some((name, val)) + }) + .collect::(); + + self.bucket + .put_object_stream_builder(path.to_string()) + .with_content_type(metadata.content_type.as_ref()) + .with_headers(header_map) + .execute_stream(&mut StreamReader::new(stream)) + .await?; Ok(()) } @@ -153,61 +147,169 @@ impl Backend for S3CompatibleBackend { #[tracing::instrument(level = "trace", fields(?path), skip_all)] async fn get_object(&self, path: &ObjectPath) -> Result> { tracing::debug!("Reading from s3_compatible backend"); - let object_url = self.object_url(path); - let response = self.request(Method::GET, &object_url).await?.send().await?; - if response.status() == StatusCode::NOT_FOUND { + let response = self.bucket.get_object(path.to_string()).await?; + if response.status_code() == StatusCode::NOT_FOUND { tracing::debug!("Object not found"); return Ok(None); } - let response = response - .error_for_status() - .context("failed to get object")?; - - let headers = response.headers(); - // TODO: Populate size in metadata - let metadata = Metadata::from_headers(headers, GCS_CUSTOM_PREFIX)?; - - // TODO: Schedule into background persistently so this doesn't get lost on restarts - if let ExpirationPolicy::TimeToIdle(tti) = metadata.expiration_policy { - // TODO: Inject the access time from the request. - let access_time = SystemTime::now(); - - let expire_at = headers - .get(GCS_CUSTOM_TIME) - .and_then(|s| s.to_str().ok()) - .and_then(|s| humantime::parse_rfc3339(s).ok()) - .unwrap_or(access_time); - - if expire_at < access_time + tti - TTI_DEBOUNCE { - // This serializes a new custom-time internally. - self.update_metadata(path, &metadata).await?; - } - } - - // TODO: the object *GET* should probably also contain the expiration time? - - let stream = response.bytes_stream().map_err(io::Error::other); - Ok(Some((metadata, stream.boxed()))) + return self.handle_get_object_response(path, response).await; } #[tracing::instrument(level = "trace", fields(?path), skip_all)] async fn delete_object(&self, path: &ObjectPath) -> Result<()> { tracing::debug!("Deleting from s3_compatible backend"); - let response = self - .request(Method::DELETE, self.object_url(path)) - .await? - .send() - .await?; + self.bucket.delete_object(path.to_string()).await?; + Ok(()) + } +} - // Do not error for objects that do not exist. - if response.status() != StatusCode::NOT_FOUND { - tracing::debug!("Object not found"); - response - .error_for_status() - .context("failed to delete object")?; +#[cfg(test)] +mod tests { + use super::*; + use futures_util::{StreamExt, TryStreamExt}; + use objectstore_types::ExpirationPolicy; + use std::collections::BTreeMap; + use uuid::Uuid; + + // NB: Not run any of these tests, you need to have a S3 emulator running. This is done + // automatically in CI. + // + // Refer to the readme for how to set up the emulator. + + async fn create_test_backend() -> Result { + Ok(S3CompatibleBackend::new(S3CompatibleBackendConfig { + bucket: "test-bucket".into(), + region: "us-east-1".into(), + endpoint: Some("http://localhost:8088".into()), + extra_headers: HeaderMap::new(), + request_timeout: Some(Duration::from_secs(60)), + path_style: Some(true), + access_key: Some("test-key".into()), + secret_key: Some("test-secret".into()), + security_token: None, + session_token: None, + })) + } + fn make_stream(contents: &[u8]) -> BackendStream { + tokio_stream::once(Ok(contents.to_vec().into())).boxed() + } + + async fn read_to_vec(mut stream: BackendStream) -> Result> { + let mut payload = Vec::new(); + while let Some(chunk) = stream.try_next().await? { + payload.extend(&chunk); + } + Ok(payload) + } + + fn make_key() -> ObjectPath { + ObjectPath { + usecase: "testing".into(), + scope: "testing".into(), + key: Uuid::new_v4().to_string(), } + } + + #[tokio::test] + async fn test_roundtrip() -> Result<()> { + let backend = create_test_backend().await?; + + let path = make_key(); + let metadata = Metadata { + is_redirect_tombstone: None, + content_type: "text/plain".into(), + expiration_policy: ExpirationPolicy::Manual, + compression: None, + custom: BTreeMap::from_iter([("hello".into(), "world".into())]), + size: None, + }; + + backend + .put_object(&path, &metadata, make_stream(b"hello, world")) + .await?; + + let (meta, stream) = backend.get_object(&path).await?.unwrap(); + + let payload = read_to_vec(stream).await?; + let str_payload = str::from_utf8(&payload).unwrap(); + assert_eq!(str_payload, "hello, world"); + assert_eq!(meta.content_type, metadata.content_type); + assert_eq!(meta.custom, metadata.custom); + + Ok(()) + } + + #[tokio::test] + async fn test_get_nonexistent() -> Result<()> { + let backend = create_test_backend().await?; + + let path = make_key(); + let result = backend.get_object(&path).await?; + assert!(result.is_none()); + + Ok(()) + } + + #[tokio::test] + async fn test_delete_nonexistent() -> Result<()> { + let backend = create_test_backend().await?; + + let path = make_key(); + backend.delete_object(&path).await?; + + Ok(()) + } + + #[tokio::test] + async fn test_overwrite() -> Result<()> { + let backend = create_test_backend().await?; + + let path = make_key(); + let metadata = Metadata { + custom: BTreeMap::from_iter([("invalid".into(), "invalid".into())]), + ..Default::default() + }; + + backend + .put_object(&path, &metadata, make_stream(b"hello")) + .await?; + + let metadata = Metadata { + custom: BTreeMap::from_iter([("hello".into(), "world".into())]), + ..Default::default() + }; + + backend + .put_object(&path, &metadata, make_stream(b"world")) + .await?; + + let (meta, stream) = backend.get_object(&path).await?.unwrap(); + + let payload = read_to_vec(stream).await?; + let str_payload = str::from_utf8(&payload).unwrap(); + assert_eq!(str_payload, "world"); + assert_eq!(meta.custom, metadata.custom); + + Ok(()) + } + + #[tokio::test] + async fn test_read_after_delete() -> Result<()> { + let backend = create_test_backend().await?; + + let path = make_key(); + let metadata = Metadata::default(); + + backend + .put_object(&path, &metadata, make_stream(b"hello, world")) + .await?; + + backend.delete_object(&path).await?; + + let result = backend.get_object(&path).await?; + assert!(result.is_none()); Ok(()) } diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index 4b24883a..54f76351 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -15,9 +15,10 @@ use objectstore_types::Metadata; use std::path::Path; use std::sync::Arc; use std::sync::atomic::{AtomicU64, Ordering}; -use std::time::Instant; +use std::time::{Duration, Instant}; use crate::backend::common::{BackendStream, BoxedBackend}; +use crate::backend::s3_compatible::S3CompatibleBackendConfig; pub use path::*; @@ -49,10 +50,24 @@ pub enum StorageConfig<'a> { }, /// Use an S3-compatible storage backend. S3Compatible { - /// Optional endpoint URL for the S3-compatible storage. - endpoint: &'a str, + /// The name of the region to use. + region: &'a str, /// The name of the bucket to use. bucket: &'a str, + /// Optional endpoint URL for the S3-compatible storage. + endpoint: Option<&'a str>, + /// Whether to use path-style URLs for the S3-compatible storage. + use_path_style: bool, + /// Optional access key for the S3-compatible storage. + access_key: Option<&'a str>, + /// Optional secret key for the S3-compatible storage. + secret_key: Option<&'a str>, + /// Optional security token for the S3-compatible storage. + security_token: Option<&'a str>, + /// Optional session token for the S3-compatible storage. + session_token: Option<&'a str>, + /// Optional request timeout for the S3-compatible storage. + request_timeout_secs: Option, }, /// Use Google Cloud Storage as storage backend. Gcs { @@ -282,9 +297,30 @@ async fn create_backend(config: StorageConfig<'_>) -> anyhow::Result { Box::new(backend::local_fs::LocalFsBackend::new(path)) } - StorageConfig::S3Compatible { endpoint, bucket } => Box::new( - backend::s3_compatible::S3CompatibleBackend::without_token(endpoint, bucket), - ), + StorageConfig::S3Compatible { + endpoint, + bucket, + region, + use_path_style, + access_key, + secret_key, + security_token, + session_token, + request_timeout_secs, + } => Box::new(backend::s3_compatible::S3CompatibleBackend::new( + S3CompatibleBackendConfig { + bucket: bucket.to_string(), + region: region.to_string(), + endpoint: endpoint.map(|s| s.to_string()), + extra_headers: reqwest::header::HeaderMap::new(), + request_timeout: request_timeout_secs.map(Duration::from_secs), + path_style: Some(use_path_style), + access_key: access_key.map(|s| s.to_string()), + secret_key: secret_key.map(|s| s.to_string()), + security_token: security_token.map(|s| s.to_string()), + session_token: session_token.map(|s| s.to_string()), + }, + )), StorageConfig::Gcs { endpoint, bucket } => { Box::new(backend::gcs::GcsBackend::new(endpoint, bucket).await?) } diff --git a/objectstore-types/src/lib.rs b/objectstore-types/src/lib.rs index a5b1e4fb..ffa56259 100644 --- a/objectstore-types/src/lib.rs +++ b/objectstore-types/src/lib.rs @@ -7,7 +7,7 @@ #![warn(missing_debug_implementations)] use std::borrow::Cow; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use std::fmt; use std::str::FromStr; use std::time::{Duration, SystemTime}; @@ -240,6 +240,37 @@ impl Metadata { Ok(metadata) } + /// Extracts metadata from the given [`HashMap`]. + /// + /// A prefix can be also be provided which is being stripped from custom non-standard headers. + pub fn from_hashmap(hash_map: HashMap, prefix: &str) -> Result { + let mut metadata = Metadata::default(); + + for (key, value) in hash_map { + if key.to_lowercase() == header::CONTENT_TYPE.as_str() { + let content_type = value.as_str(); + metadata.content_type = content_type.to_owned().into(); + } else if key.to_lowercase() == header::CONTENT_ENCODING.as_str() { + let compression = value.as_str(); + metadata.compression = Some(Compression::from_str(compression)?); + } else if let Some(name) = key.strip_prefix(prefix) { + if name == HEADER_EXPIRATION { + let expiration_policy = value.as_str(); + metadata.expiration_policy = ExpirationPolicy::from_str(expiration_policy)?; + } else if name == HEADER_REDIRECT_TOMBSTONE { + if value.as_str() == "true" { + metadata.is_redirect_tombstone = Some(true); + } + } else if let Some(name) = name.strip_prefix(HEADER_META_PREFIX) { + let value = value.as_str(); + metadata.custom.insert(name.into(), value.into()); + } + } + } + + Ok(metadata) + } + /// Turns the metadata into a [`HeaderMap`]. /// /// It will prefix any non-standard headers with the given `prefix`.