diff --git a/Cargo.lock b/Cargo.lock index cbf75912..4ca6ed82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -238,6 +238,28 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-stream" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b5a71a6f37880a80d1d7f19efd781e4b5de42c88f0722cc13bcb6cc2cfe8476" +dependencies = [ + "async-stream-impl", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-stream-impl" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -295,9 +317,9 @@ dependencies = [ [[package]] name = "axum" -version = "0.8.6" +version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a18ed336352031311f4e0b4dd2ff392d4fbb370777c9d18d7fc9d7359f73871" +checksum = "5b098575ebe77cb6d14fc7f32749631a6e44edbef6b796f89b020e99ba20d425" dependencies = [ "axum-core", "bytes", @@ -312,6 +334,7 @@ dependencies = [ "matchit", "memchr", "mime", + "multer", "percent-encoding", "pin-project-lite", "serde_core", @@ -347,21 +370,22 @@ dependencies = [ [[package]] name = "axum-extra" -version = "0.10.3" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9963ff19f40c6102c76756ef0a46004c0d58957d87259fc9208ff8441c12ab96" +checksum = "dbfe9f610fe4e99cf0cfcd03ccf8c63c28c616fe714d80475ef731f3b13dd21b" dependencies = [ "axum", "axum-core", "bytes", + "fastrand", + "futures-core", "futures-util", "http 1.3.1", "http-body", "http-body-util", "mime", + "multer", "pin-project-lite", - "rustversion", - "serde_core", "tower-layer", "tower-service", "tracing", @@ -2037,6 +2061,23 @@ dependencies = [ "uuid", ] +[[package]] +name = "multer" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83e87776546dc87511aa5ee218730c92b666d7264ab6ed41f9d215af9cd5224b" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http 1.3.1", + "httparse", + "memchr", + "mime", + "spin", + "version_check", +] + [[package]] name = "multimap" version = "0.10.1" @@ -2199,17 +2240,23 @@ version = "0.1.0" dependencies = [ "anyhow", "argh", + "async-stream", "axum", "axum-extra", + "bytes", "console", "elegant-departure", "figment", + "futures", "futures-util", + "http 1.3.1", "humantime", "humantime-serde", "jsonwebtoken", "merni", "mimalloc", + "mime", + "multer", "nix", "num_cpus", "objectstore-service", diff --git a/objectstore-server/Cargo.toml b/objectstore-server/Cargo.toml index 2e0386f9..65243f24 100644 --- a/objectstore-server/Cargo.toml +++ b/objectstore-server/Cargo.toml @@ -13,17 +13,23 @@ publish = false [dependencies] anyhow = { workspace = true } argh = "0.1.13" -axum = "0.8.4" -axum-extra = "0.10.1" +async-stream = "0.3.6" +axum = { version = "0.8.4", features = ["multipart"] } +axum-extra = { version = "0.12.2", features = ["multipart"] } +bytes = { workspace = true } console = "0.16.1" elegant-departure = { version = "0.3.2", features = ["tokio"] } figment = { version = "0.10.19", features = ["env", "test", "yaml"] } +futures = { workspace = true } futures-util = { workspace = true } +http = { workspace = true } humantime = { workspace = true } humantime-serde = { workspace = true } jsonwebtoken = { workspace = true } merni = { workspace = true } mimalloc = { workspace = true } +mime = "0.3.17" +multer = "3.1.0" num_cpus = "1.17.0" objectstore-service = { workspace = true } objectstore-types = { workspace = true } diff --git a/objectstore-server/src/auth/service.rs b/objectstore-server/src/auth/service.rs index 8429ced1..f6474982 100644 --- a/objectstore-server/src/auth/service.rs +++ b/objectstore-server/src/auth/service.rs @@ -1,5 +1,5 @@ use objectstore_service::id::{ObjectContext, ObjectId}; -use objectstore_service::{PayloadStream, StorageService}; +use objectstore_service::{DeleteResult, GetResult, InsertResult, PayloadStream, StorageService}; use objectstore_types::{Metadata, Permission}; use crate::auth::AuthContext; @@ -55,7 +55,7 @@ impl AuthAwareService { key: Option, metadata: &Metadata, stream: PayloadStream, - ) -> anyhow::Result { + ) -> InsertResult { self.assert_authorized(Permission::ObjectWrite, &context)?; self.service .insert_object(context, key, metadata, stream) @@ -63,16 +63,13 @@ impl AuthAwareService { } /// Auth-aware wrapper around [`StorageService::get_object`]. - pub async fn get_object( - &self, - id: &ObjectId, - ) -> anyhow::Result> { + pub async fn get_object(&self, id: &ObjectId) -> GetResult { self.assert_authorized(Permission::ObjectRead, id.context())?; self.service.get_object(id).await } /// Auth-aware wrapper around [`StorageService::delete_object`]. - pub async fn delete_object(&self, id: &ObjectId) -> anyhow::Result<()> { + pub async fn delete_object(&self, id: &ObjectId) -> DeleteResult { self.assert_authorized(Permission::ObjectDelete, id.context())?; self.service.delete_object(id).await } diff --git a/objectstore-server/src/endpoints/batch.rs b/objectstore-server/src/endpoints/batch.rs new file mode 100644 index 00000000..47448cd0 --- /dev/null +++ b/objectstore-server/src/endpoints/batch.rs @@ -0,0 +1,22 @@ +use axum::Router; +use axum::http::StatusCode; +use axum::response::{IntoResponse, Response}; +use axum::routing; +use objectstore_service::id::ObjectContext; + +use crate::auth::AuthAwareService; +use crate::endpoints::common::ApiResult; +use crate::extractors::{BatchRequest, Xt}; +use crate::state::ServiceState; + +pub fn router() -> Router { + Router::new().route("/objects:batch/{usecase}/{scopes}/", routing::post(batch)) +} + +async fn batch( + _service: AuthAwareService, + Xt(_context): Xt, + _request: BatchRequest, +) -> ApiResult { + Ok(StatusCode::NOT_IMPLEMENTED.into_response()) +} diff --git a/objectstore-server/src/endpoints/common.rs b/objectstore-server/src/endpoints/common.rs index 8da1c300..799a035c 100644 --- a/objectstore-server/src/endpoints/common.rs +++ b/objectstore-server/src/endpoints/common.rs @@ -4,6 +4,7 @@ use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; +#[derive(Debug)] pub enum AnyhowResponse { Error(anyhow::Error), Response(Response), diff --git a/objectstore-server/src/endpoints/mod.rs b/objectstore-server/src/endpoints/mod.rs index 6907977e..b9c0c468 100644 --- a/objectstore-server/src/endpoints/mod.rs +++ b/objectstore-server/src/endpoints/mod.rs @@ -6,12 +6,15 @@ use axum::Router; use crate::state::ServiceState; +mod batch; mod common; mod health; mod objects; pub fn routes() -> Router { - let routes_v1 = Router::new().merge(objects::router()); + let routes_v1 = Router::new() + .merge(objects::router()) + .merge(batch::router()); Router::new() .merge(health::router()) diff --git a/objectstore-server/src/extractors/batch.rs b/objectstore-server/src/extractors/batch.rs new file mode 100644 index 00000000..51efcd5a --- /dev/null +++ b/objectstore-server/src/extractors/batch.rs @@ -0,0 +1,232 @@ +use std::fmt::Debug; + +use anyhow::Context; +use axum::{ + extract::{FromRequest, Request}, + http::StatusCode, + response::{IntoResponse, Response}, +}; +use bytes::Bytes; +use futures::{StreamExt, stream::BoxStream}; +use http::header::CONTENT_TYPE; +use multer::Field; +use multer::{Constraints, Multipart, SizeLimit}; +use objectstore_service::id::ObjectKey; +use objectstore_types::Metadata; + +#[derive(Debug)] +pub struct GetOperation { + pub key: ObjectKey, +} + +#[derive(Debug)] +pub struct InsertOperation { + pub key: Option, + pub metadata: Metadata, + pub payload: Bytes, +} + +#[derive(Debug)] +pub struct DeleteOperation { + pub key: ObjectKey, +} + +#[derive(Debug)] +pub enum Operation { + Get(GetOperation), + Insert(InsertOperation), + Delete(DeleteOperation), +} + +impl Operation { + async fn try_from_field(field: Field<'_>) -> anyhow::Result { + let kind = field + .headers() + .get(HEADER_BATCH_OPERATION_KIND) + .ok_or(anyhow::anyhow!( + "missing {HEADER_BATCH_OPERATION_KIND} header" + ))?; + let kind = kind + .to_str() + .context(format!("invalid {HEADER_BATCH_OPERATION_KIND} header"))? + .to_lowercase(); + + let key = match field.headers().get(HEADER_BATCH_OPERATION_KEY) { + Some(key) => Some(key.to_str().context("invalid object key")?.to_owned()), + None => None, + }; + + let operation = match kind.as_str() { + "get" => { + let key = key.context("missing object key for get operation")?; + Operation::Get(GetOperation { key }) + } + "insert" => Operation::Insert(InsertOperation { + key, + metadata: Metadata::from_headers(field.headers(), "")?, + payload: field.bytes().await?, + }), + "delete" => { + let key = key.context("missing object key for delet operation")?; + Operation::Delete(DeleteOperation { key }) + } + _ => anyhow::bail!("invalid {HEADER_BATCH_OPERATION_KIND} header"), + }; + Ok(operation) + } +} + +pub struct BatchRequest { + pub operations: BoxStream<'static, anyhow::Result>, +} + +impl Debug for BatchRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BatchRequest").finish() + } +} + +pub const HEADER_BATCH_OPERATION_KIND: &str = "x-sn-batch-operation-kind"; +pub const HEADER_BATCH_OPERATION_KEY: &str = "x-sn-batch-operation-key"; + +impl FromRequest for BatchRequest +where + S: Send + Sync, +{ + type Rejection = Response; + + async fn from_request(request: Request, _: &S) -> Result { + let Some(content_type) = request + .headers() + .get(CONTENT_TYPE) + .and_then(|ct| ct.to_str().ok()) + else { + return Err((StatusCode::BAD_REQUEST, "expected valid Content-Type").into_response()); + }; + + let Ok(mime) = content_type.parse::() else { + return Err((StatusCode::BAD_REQUEST, "expected valid Content-Type").into_response()); + }; + if !(mime.type_() == mime::MULTIPART && mime.subtype() == "mixed") { + return Err(( + StatusCode::BAD_REQUEST, + "expected Content-Type: multipart/mixed", + ) + .into_response()); + } + + // XXX: `multer::parse_boundary` requires the content-type to be `multipart/form-data` + let content_type = content_type.replace("multipart/mixed", "multipart/form-data"); + let Ok(boundary) = multer::parse_boundary(content_type) else { + return Err(( + StatusCode::BAD_REQUEST, + "failed to parse multipart boundary", + ) + .into_response()); + }; + let mut parts = Multipart::with_constraints( + request.into_body().into_data_stream(), + boundary, + Constraints::new().size_limit( + // TODO(lcian): tentative limits that should be tested + SizeLimit::new() + .per_field(1024 * 1024) // 1 MB + .whole_stream(1024 * 1024 * 1024), // 1 GB + ), + ); + let operations = async_stream::try_stream! { + let mut count = 0; + while let Some(field) = parts.next_field().await? { + if count >= 1000 { + Err(anyhow::anyhow!("exceeded limit of 1000 operations per batch request"))?; + } + count += 1; + yield Operation::try_from_field(field).await?; + } + } + .boxed(); + + Ok(Self { operations }) + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use super::*; + use axum::body::Body; + use axum::http::{Request, header::CONTENT_TYPE}; + use futures::StreamExt; + use objectstore_types::{ExpirationPolicy, HEADER_EXPIRATION}; + + #[tokio::test] + async fn test_valid_request_works() { + let insert1_data = b"first blob data"; + let insert2_data = b"second blob data"; + let expiration = ExpirationPolicy::TimeToLive(Duration::from_hours(1)); + let body = format!( + "--boundary\r\n\ + {HEADER_BATCH_OPERATION_KEY}: test0\r\n\ + {HEADER_BATCH_OPERATION_KIND}: get\r\n\ + \r\n\ + \r\n\ + --boundary\r\n\ + {HEADER_BATCH_OPERATION_KEY}: test1\r\n\ + {HEADER_BATCH_OPERATION_KIND}: insert\r\n\ + Content-Type: application/octet-stream\r\n\ + \r\n\ + {insert1}\r\n\ + --boundary\r\n\ + {HEADER_BATCH_OPERATION_KEY}: test2\r\n\ + {HEADER_BATCH_OPERATION_KIND}: insert\r\n\ + {HEADER_EXPIRATION}: {expiration}\r\n\ + Content-Type: text/plain\r\n\ + \r\n\ + {insert2}\r\n\ + --boundary\r\n\ + {HEADER_BATCH_OPERATION_KEY}: test3\r\n\ + {HEADER_BATCH_OPERATION_KIND}: delete\r\n\ + \r\n\ + \r\n\ + --boundary--\r\n", + insert1 = String::from_utf8_lossy(insert1_data), + insert2 = String::from_utf8_lossy(insert2_data), + ); + + let request = Request::builder() + .header(CONTENT_TYPE, "multipart/mixed; boundary=boundary") + .body(Body::from(body)) + .unwrap(); + + let batch_request = BatchRequest::from_request(request, &()).await.unwrap(); + + let operations: Vec<_> = batch_request.operations.collect().await; + assert_eq!(operations.len(), 4); + + let Operation::Get(get_op) = &operations[0].as_ref().unwrap() else { + panic!("expected get operation"); + }; + assert_eq!(get_op.key, "test0"); + + let Operation::Insert(insert_op1) = &operations[1].as_ref().unwrap() else { + panic!("expected insert operation"); + }; + assert_eq!(insert_op1.key.as_ref().unwrap(), "test1"); + assert_eq!(insert_op1.metadata.content_type, "application/octet-stream"); + assert_eq!(insert_op1.payload.as_ref(), insert1_data); + + let Operation::Insert(insert_op2) = &operations[2].as_ref().unwrap() else { + panic!("expected insert operation"); + }; + assert_eq!(insert_op2.key.as_ref().unwrap(), "test2"); + assert_eq!(insert_op2.metadata.content_type, "text/plain"); + assert_eq!(insert_op2.metadata.expiration_policy, expiration); + assert_eq!(insert_op2.payload.as_ref(), insert2_data); + + let Operation::Delete(delete_op) = &operations[3].as_ref().unwrap() else { + panic!("expected delete operation"); + }; + assert_eq!(delete_op.key, "test3"); + } +} diff --git a/objectstore-server/src/extractors/mod.rs b/objectstore-server/src/extractors/mod.rs index 5b270335..839463f0 100644 --- a/objectstore-server/src/extractors/mod.rs +++ b/objectstore-server/src/extractors/mod.rs @@ -1,3 +1,4 @@ +mod batch; mod id; mod service; @@ -8,3 +9,5 @@ mod service; /// `Xt` for this to work. #[derive(Debug)] pub struct Xt(pub T); + +pub use batch::{BatchRequest, DeleteOperation, GetOperation, InsertOperation, Operation}; diff --git a/objectstore-server/src/lib.rs b/objectstore-server/src/lib.rs index b57d655c..541d9a3d 100644 --- a/objectstore-server/src/lib.rs +++ b/objectstore-server/src/lib.rs @@ -2,6 +2,7 @@ //! //! This builds on top of the [`objectstore_service`], and exposes the underlying storage layer as //! an `HTTP` layer which can serve files directly to *external clients* and our SDK. +#![warn(missing_debug_implementations)] pub mod auth; pub mod cli; diff --git a/objectstore-service/src/id.rs b/objectstore-service/src/id.rs index 622cf652..5154ba72 100644 --- a/objectstore-service/src/id.rs +++ b/objectstore-service/src/id.rs @@ -11,9 +11,9 @@ use std::fmt; use objectstore_types::scope::{Scope, Scopes}; -/// Defines where an object belongs within the object store. +/// Defines where an object, or batch of objects, belongs within the object store. /// -/// This is part of the full object identifier, see [`ObjectId`]. +/// This is part of the full object identifier for single objects, see [`ObjectId`]. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ObjectContext { /// The usecase, or "product" this object belongs to. @@ -73,9 +73,12 @@ pub struct ObjectId { /// a key makes a unique identifier. /// /// Keys can be assigned by the service. For this, use [`ObjectId::random`]. - pub key: String, + pub key: ObjectKey, } +/// A key that uniquely identifies an object within its usecase and scopes. +pub type ObjectKey = String; + impl ObjectId { /// Creates a new `ObjectId` with the given `context` and `key`. pub fn new(context: ObjectContext, key: String) -> Self { diff --git a/objectstore-service/src/lib.rs b/objectstore-service/src/lib.rs index 3566044a..98cbdb33 100644 --- a/objectstore-service/src/lib.rs +++ b/objectstore-service/src/lib.rs @@ -86,6 +86,13 @@ pub enum StorageConfig<'a> { }, } +/// Result type for get operations. +pub type GetResult = anyhow::Result>; +/// Result type for insert operations. +pub type InsertResult = anyhow::Result; +/// Result type for delete operations. +pub type DeleteResult = anyhow::Result<()>; + impl StorageService { /// Creates a new `StorageService` with the specified configuration. pub async fn new( @@ -112,7 +119,7 @@ impl StorageService { key: Option, metadata: &Metadata, mut stream: PayloadStream, - ) -> anyhow::Result { + ) -> InsertResult { let start = Instant::now(); let mut first_chunk = BytesMut::new(); @@ -217,10 +224,7 @@ impl StorageService { } /// Streams the contents of an object stored at the given key. - pub async fn get_object( - &self, - id: &ObjectId, - ) -> anyhow::Result> { + pub async fn get_object(&self, id: &ObjectId) -> GetResult { let start = Instant::now(); let mut backend_choice = "high-volume"; @@ -257,7 +261,7 @@ impl StorageService { } /// Deletes an object stored at the given key, if it exists. - pub async fn delete_object(&self, id: &ObjectId) -> anyhow::Result<()> { + pub async fn delete_object(&self, id: &ObjectId) -> DeleteResult { let start = Instant::now(); if let Some((metadata, _stream)) = self.0.high_volume_backend.get_object(id).await? {