diff --git a/design/volume-repository.md b/design/volume-repository.md new file mode 100644 index 0000000..c214c91 --- /dev/null +++ b/design/volume-repository.md @@ -0,0 +1,507 @@ +# Volume Repository Design + +## Status + +Proposed. + +This document describes a new `VolumeRepository` layer to sit between: + +- the block engine in `Export` +- the low-level object storage interface in `ObjectStore` + +The goal is to move remote metadata and object-layout policy out of `Export` without changing the fundamental read/write behavior of the server. + +## Summary + +Today, `Export` is responsible for two different concerns: + +1. block-device behavior + - read + - write + - flush + - chunk materialization + - dirty/resident tracking + +2. remote metadata and object-layout policy + - `volume.json` load/store + - CAS update of volume head + - snapshot object-key construction + - manifest path construction + - current head resolution + - clone source manifest resolution + +That is too much responsibility for one type. + +The proposed change is: + +```text +Export + -> VolumeRepository + -> ObjectStore +``` + +`Export` should become a pure block engine plus snapshot orchestration. + +`VolumeRepository` should own all remote persistence policy for: + +- `volume.json` +- manifests +- snapshot object naming +- head publish +- clone source resolution + +`ObjectStore` should remain a raw backend abstraction over S3/R2/OSS-like storage. + +## Problem + +The current `Export` implementation directly constructs remote object keys and performs head updates itself. + +Examples of logic that currently lives in `Export`: + +- build `manifest.json` key from export prefix and snapshot id +- build `delta.blob` and `base.blob` object keys +- read `volume.json` +- perform CAS update of `volume.json` +- update `refs/current.json` +- resolve snapshot id to manifest object path + +This creates three problems: + +### 1. Wrong abstraction boundary + +`Export` is supposed to be the block engine. It should not know the storage naming scheme. + +### 2. Storage policy leaks into the data path + +Changes to: + +- object naming +- snapshot publish policy +- volume metadata shape +- clone source layout + +all force changes to the block engine. + +### 3. Harder future evolution + +Features like: + +- snapshot delete +- list snapshots +- retention policy +- metadata version migration +- alternate object layouts + +should not require touching read/write chunk logic. + +## Goals + +- Move volume- and snapshot-level remote persistence logic out of `Export`. +- Keep `ObjectStore` as a minimal low-level abstraction. +- Make remote metadata flows easier to test independently. +- Reduce the number of places that know remote path conventions. +- Keep current runtime behavior unchanged while improving layering. + +## Non-Goals + +- Redesigning the manifest format. +- Changing read/write block semantics. +- Introducing a database or external metadata service. +- Supporting transactional multi-object commits in object storage. +- Changing the local cache layout. + +## Current Layering + +Current structure: + +```text +Export + |- local cache operations + |- manifest interpretation + |- snapshot packing + |- object key construction + |- volume.json read/CAS update + |- refs/current.json update + `- direct ObjectStore calls +``` + +This means `Export` depends on: + +- local persistence details +- remote metadata details +- object-store semantics + +That coupling is broader than necessary. + +## Proposed Layering + +### Layer 1: `ObjectStore` + +Purpose: + +- raw object IO +- no domain concepts + +Allowed responsibilities: + +- `get_range` +- `get_object` +- `get_object_with_etag` +- `put_bytes` +- `put_bytes_if_match` +- `put_bytes_if_absent` +- `put_file` +- `delete_object` +- `list_prefix` + +Not allowed: + +- knowledge of `volume.json` +- knowledge of manifests +- knowledge of export ids +- knowledge of snapshot naming rules + +### Layer 2: `VolumeRepository` + +Purpose: + +- remote volume metadata and snapshot persistence policy + +Allowed responsibilities: + +- derive object keys from export id and snapshot id +- load and validate `volume.json` +- create `volume.json` +- resolve current snapshot manifest +- resolve explicit snapshot manifest +- resolve clone seed manifest +- stage and publish manifest objects +- CAS-update the current volume head +- optionally maintain `refs/current.json` compatibility + +Not allowed: + +- block read/write logic +- chunk dirty tracking +- chunk residency policy +- local cache mutation + +### Layer 3: `Export` + +Purpose: + +- block device engine + +Allowed responsibilities: + +- map read/write requests to chunks +- materialize missing chunks +- track dirty/resident state +- flush local state +- ask repository to publish a prepared snapshot + +Not allowed: + +- constructing object keys directly +- parsing and writing `volume.json` directly +- performing raw CAS writes itself + +## Repository Responsibilities + +The repository should own these workflows. + +### Volume lifecycle + +- `create_empty_volume(export_id, image_size, chunk_size)` +- `load_volume(export_id)` +- `load_all_volumes(export_root)` +- `update_volume_head(export_id, expected_etag, next_volume)` + +### Manifest resolution + +- `load_current_manifest(export_id)` +- `load_manifest_by_snapshot_id(export_id, snapshot_id)` +- `resolve_clone_source(source_export_id, snapshot_id_or_current)` + +### Snapshot persistence + +- build object paths for a given snapshot id +- write `manifest.json` +- optionally write/update `refs/current.json` +- CAS-publish `volume.json` to point to the new snapshot id + +### Optional later responsibilities + +- list snapshots +- delete snapshots +- retention and garbage collection + +## Proposed Interface Shape + +This is intentionally illustrative, not final Rust code. + +```rust +pub struct SnapshotLayout { + pub manifest_key: String, + pub delta_key: String, + pub base_key: String, +} + +pub struct LoadedVolume { + pub metadata: VolumeMetadata, + pub etag: Option, +} + +pub struct ResolvedManifest { + pub snapshot_id: Option, + pub manifest_key: String, + pub manifest: Manifest, +} + +pub struct PublishRequest { + pub export_id: String, + pub snapshot_id: String, + pub manifest: Manifest, +} + +#[async_trait] +pub trait VolumeRepository: Send + Sync { + async fn create_volume(&self, volume: &VolumeMetadata) -> Result; + async fn load_volume(&self, export_id: &str) -> Result; + async fn list_volumes(&self) -> Result>; + + async fn load_current_manifest(&self, export_id: &str) -> Result; + async fn load_manifest_by_snapshot_id( + &self, + export_id: &str, + snapshot_id: &str, + ) -> Result; + async fn resolve_clone_source( + &self, + export_id: &str, + snapshot_id: Option<&str>, + ) -> Result; + + fn snapshot_layout(&self, export_id: &str, snapshot_id: &str) -> SnapshotLayout; + + async fn put_manifest(&self, manifest_key: &str, manifest: &Manifest) -> Result<()>; + async fn publish_volume_head( + &self, + export_id: &str, + expected_etag: &str, + snapshot_id: &str, + ) -> Result; +} +``` + +The actual final API may merge some of these calls, but the important thing is the responsibility split. + +## What `Export` Would Keep + +`Export` should still do all chunk-oriented logic: + +- `read()` +- `write()` +- `flush()` +- `materialize_chunk()` +- `fetch_chunk_bytes()` +- `publish_initial_sparse_snapshot()` +- `publish_delta_snapshot()` +- clone sparse snapshot packing +- compact packing + +But the last step of publish should move from: + +- raw object-store calls +- inline key construction + +to: + +- repository calls + +### Example before + +Current shape: + +```text +Export + -> build manifest key + -> put manifest object + -> get volume.json + etag + -> update volume.json with CAS + -> update refs/current.json +``` + +### Example after + +Target shape: + +```text +Export + -> prepare manifest + snapshot id + -> repository.publish_snapshot(...) +``` + +That is the main simplification. + +## Object Layout Policy + +Today the path layout is implicitly spread through helper functions and inline string formatting. + +Examples: + +- `//volume.json` +- `//snapshots//manifest.json` +- `//snapshots//delta.blob` +- `//snapshots//base.blob` + +This policy should move fully into the repository implementation. + +That gives one place to change later if: + +- `refs/current.json` is removed +- manifest path conventions change +- compatibility shims are retired + +## Interaction With Read Path + +The read path still needs a manifest in memory. + +That does not mean the repository belongs in the read hot path for every request. + +The intended model is: + +1. repository resolves and loads the active manifest when an export is opened +2. `Export` stores that manifest as its current `read_source` +3. normal read misses use the in-memory manifest directly +4. repository is only involved again when: + - a new export is opened + - a snapshot is published + - a clone source is resolved + +So the repository is control-plane, not per-request data-plane. + +## Interaction With Manager + +The manager should talk to the repository for: + +- volume discovery at server startup +- create/open/clone metadata operations +- volume existence checks + +The manager should not need to know how `volume.json` or manifest paths are encoded either. + +Target relationship: + +```text +ExportManager + -> VolumeRepository + -> Export +``` + +instead of: + +```text +ExportManager + -> ObjectStore + -> path helpers + -> Export +``` + +## Testing Benefits + +This split allows cleaner tests. + +### `ObjectStore` tests + +- backend-specific semantics +- CAS behavior +- list/get/put/delete behavior + +### `VolumeRepository` tests + +- path derivation +- volume metadata round trip +- manifest resolution by current head +- clone source resolution +- CAS publish behavior + +### `Export` tests + +- chunk materialization +- write semantics +- sparse snapshot packing +- clone sparse snapshot packing + +That is a better separation than the current state, where `Export` tests also have to exercise path construction and remote metadata behavior directly. + +## Migration Plan + +This refactor should be done incrementally. + +### Step 1 + +Introduce a concrete repository implementation over the current object store: + +- `ObjectStoreVolumeRepository` + +Do not change behavior yet. + +### Step 2 + +Move helper functions and direct metadata load/store from `Export` into the repository: + +- manifest resolution helpers +- volume head publish +- key construction + +### Step 3 + +Update `Export` to depend on `Arc` plus any still-needed read-only object access path. + +One practical option: + +- keep `ObjectStore` available for chunk range GETs +- use `VolumeRepository` for metadata operations + +### Step 4 + +Update `ExportManager` to use the repository for startup discovery and create/open/clone flows. + +### Step 5 + +Delete remaining duplicate path helpers and direct metadata object manipulation from engine/manager code. + +## Practical Note About Chunk Reads + +The repository should not necessarily replace `ObjectStore` for chunk body reads. + +Chunk fetches are simple: + +- resolve chunk location from in-memory manifest +- range GET object bytes + +That path is already efficient and does not need repository indirection. + +So the practical dependency shape may be: + +```text +Export + -> ObjectStore # data objects + -> VolumeRepository # metadata and publish +``` + +That is still a substantial improvement. + +## Recommended First Scope + +For the first refactor, `VolumeRepository` should own only: + +- `volume.json` +- manifest loading by snapshot id or current head +- snapshot object key derivation +- head publish + +Do not move chunk data reads into it. + +That keeps the repository clearly about metadata and remote layout policy, which is the actual problem we want to solve. diff --git a/src/app/config.rs b/src/app/config.rs index 38691fd..6e79186 100644 --- a/src/app/config.rs +++ b/src/app/config.rs @@ -3,7 +3,7 @@ use std::path::PathBuf; use clap::{Parser, Subcommand, ValueEnum}; -use crate::core::engine::spec::{CloneSource, ExportSpec, StorageNamespace}; +use crate::core::engine::spec::{CloneSource, ExportSpec}; pub const CHUNK_SIZE: u64 = 4 * 1024 * 1024; @@ -61,7 +61,7 @@ pub struct StorageConfig { #[derive(Debug, Clone)] pub struct CloneSourceConfig { - pub prefix: String, + pub export_id: String, pub snapshot_id: Option, } @@ -112,7 +112,7 @@ impl From for ServeConfig { impl From for CloneSource { fn from(value: CloneSourceConfig) -> Self { Self { - prefix: value.prefix, + export_id: value.export_id, snapshot_id: value.snapshot_id, } } @@ -127,10 +127,6 @@ impl From for ExportSpec { snapshot_id: value.snapshot_id, image_size: value.image_size, clone_source: value.clone_source.map(Into::into), - storage: StorageNamespace { - prefix: value.storage.prefix, - volume_key: value.volume_key, - }, } } } diff --git a/src/core/engine/export.rs b/src/core/engine/export.rs index 3c1aef3..58143c1 100644 --- a/src/core/engine/export.rs +++ b/src/core/engine/export.rs @@ -6,20 +6,19 @@ use std::sync::Arc; use std::sync::Mutex as StdMutex; use std::sync::atomic::{AtomicBool, Ordering}; -use bytes::Bytes; use serde::{Deserialize, Serialize}; use tokio::sync::{Mutex, RwLock}; use uuid::Uuid; use crate::core::cache::local_cache::LocalCache; -use crate::core::engine::spec::{CloneSource, ExportSpec}; +use crate::core::engine::spec::ExportSpec; use crate::core::error::{Error, Result}; use crate::core::model::journal::{JournalOperation, JournalRecord}; use crate::core::model::manifest::{ ChunkLocation, ChunkSource, Manifest, ReplacementChunk, chunk_len, chunk_offset, }; -use crate::core::model::volume::VolumeMetadata; use crate::core::storage::object_store::ObjectStore; +use crate::core::storage::volume_repository::{ResolvedManifest, VolumeRepository}; #[derive(Clone)] enum ReadSource { @@ -86,26 +85,12 @@ pub struct ResetCacheResponse { pub manifest_generation: u64, } -#[derive(Debug, Serialize, Deserialize)] -struct CurrentRef { - generation: u64, - #[serde(default)] - snapshot_id: Option, - manifest_key: String, -} - #[derive(Debug, Clone, Serialize, Deserialize)] struct CloneSeedRecord { version: u32, source_manifest_key: String, } -#[derive(Clone)] -struct ResolvedManifest { - manifest_key: String, - manifest: Manifest, -} - struct PreparedPublish { generation: u64, snapshot_id: String, @@ -118,6 +103,7 @@ pub struct Export { config: ExportSpec, cache: Arc, remote: Arc, + repository: Arc, read_source: RwLock>, published_manifest: RwLock>, write_gate: RwLock<()>, @@ -132,6 +118,7 @@ impl Export { pub async fn create( config: impl Into, remote: Arc, + repository: Arc, ) -> Result> { let config = config.into(); if config.snapshot_id.is_some() { @@ -174,6 +161,7 @@ impl Export { config, cache: Arc::new(cache), remote, + repository, read_source: RwLock::new(Arc::new(ReadSource::Zero { image_size, chunk_size, @@ -189,6 +177,7 @@ impl Export { pub async fn open( config: impl Into, remote: Arc, + repository: Arc, ) -> Result> { let config = config.into(); if config.clone_source.is_some() { @@ -196,8 +185,9 @@ impl Export { "clone source is only valid with the clone command".to_string(), )); } - let resolved = - resolve_manifest(&*remote, &config.storage.prefix, config.snapshot_id.clone()).await?; + let resolved = repository + .load_manifest(&config.export_id, config.snapshot_id.as_deref()) + .await?; let manifest = resolved.manifest; let cache = if config.cache_dir.join("cache.meta").exists() { @@ -223,6 +213,7 @@ impl Export { config, cache: Arc::new(cache), remote, + repository, read_source: RwLock::new(Arc::new(ReadSource::Manifest(manifest.clone()))), published_manifest: RwLock::new(Some(manifest)), write_gate: RwLock::new(()), @@ -235,6 +226,7 @@ impl Export { pub async fn clone_from_snapshot( config: impl Into, remote: Arc, + repository: Arc, ) -> Result> { let config = config.into(); if config.snapshot_id.is_some() { @@ -268,7 +260,7 @@ impl Export { .to_string(), ) })?; - let resolved = resolve_manifest_by_key(&*remote, &seed.source_manifest_key).await?; + let resolved = resolve_manifest_by_key(&*repository, &seed.source_manifest_key).await?; cache.validate_layout( &config.export_id, resolved.manifest.image_size, @@ -282,6 +274,7 @@ impl Export { config, cache: Arc::new(cache), remote, + repository, read_source: RwLock::new(Arc::new(ReadSource::Manifest(resolved.manifest))), published_manifest: RwLock::new(None), write_gate: RwLock::new(()), @@ -290,7 +283,9 @@ impl Export { chunk_locks: (0..chunk_total).map(|_| Mutex::new(())).collect(), })) } else { - let resolved = resolve_manifest_for_clone(&*remote, &clone_source).await?; + let resolved = repository + .load_manifest(&clone_source.export_id, clone_source.snapshot_id.as_deref()) + .await?; CloneSeedRecord { version: 1, source_manifest_key: resolved.manifest_key.clone(), @@ -312,6 +307,7 @@ impl Export { config, cache: Arc::new(cache), remote, + repository, read_source: RwLock::new(Arc::new(ReadSource::Manifest(resolved.manifest))), published_manifest: RwLock::new(None), write_gate: RwLock::new(()), @@ -522,15 +518,15 @@ impl Export { self.cache.sync_data()?; let generation = self.cache.manifest_generation() + 1; let snapshot_id = new_snapshot_id(); + let layout = self + .repository + .snapshot_layout(&self.config.export_id, &snapshot_id); let result = self .publish_full_snapshot( generation, snapshot_id.clone(), JournalOperation::Compact, - format!( - "{}/snapshots/{}/base.blob", - self.config.storage.prefix, snapshot_id - ), + layout.base_key, ) .await; let result = self.finish_publish(result).await?; @@ -595,14 +591,16 @@ impl Export { ))); } - let manifest_key = manifest_key(&self.config.storage.prefix, &snapshot_id); + let layout = self + .repository + .snapshot_layout(&self.config.export_id, &snapshot_id); JournalRecord { version: 1, operation: operation.clone(), generation, staging_path: None, object_key: object_key.clone(), - manifest_key: manifest_key.clone(), + manifest_key: layout.manifest_key, } .persist(&self.journal_path)?; @@ -658,11 +656,10 @@ impl Export { .config .cache_dir .join(format!("snapshot-{snapshot_id}.delta.blob")); - let delta_key = format!( - "{}/snapshots/{}/delta.blob", - self.config.storage.prefix, snapshot_id - ); - let manifest_key = manifest_key(&self.config.storage.prefix, &snapshot_id); + let layout = self + .repository + .snapshot_layout(&self.config.export_id, &snapshot_id); + let delta_key = layout.delta_key.clone(); let mut delta_file = OpenOptions::new() .create(true) @@ -701,7 +698,7 @@ impl Export { generation, staging_path: Some(delta_path.display().to_string()), object_key: delta_key.clone(), - manifest_key: manifest_key.clone(), + manifest_key: layout.manifest_key.clone(), } .persist(&self.journal_path)?; @@ -743,11 +740,10 @@ impl Export { .config .cache_dir .join(format!("snapshot-{snapshot_id}.delta.blob")); - let delta_key = format!( - "{}/snapshots/{}/delta.blob", - self.config.storage.prefix, snapshot_id - ); - let manifest_key = manifest_key(&self.config.storage.prefix, &snapshot_id); + let layout = self + .repository + .snapshot_layout(&self.config.export_id, &snapshot_id); + let delta_key = layout.delta_key.clone(); let mut delta_file = OpenOptions::new() .create(true) @@ -786,7 +782,7 @@ impl Export { generation, staging_path: Some(delta_path.display().to_string()), object_key: delta_key.clone(), - manifest_key: manifest_key.clone(), + manifest_key: layout.manifest_key.clone(), } .persist(&self.journal_path)?; @@ -819,11 +815,10 @@ impl Export { .config .cache_dir .join(format!("snapshot-{snapshot_id}.delta.blob")); - let delta_key = format!( - "{}/snapshots/{}/delta.blob", - self.config.storage.prefix, snapshot_id - ); - let manifest_key = manifest_key(&self.config.storage.prefix, &snapshot_id); + let layout = self + .repository + .snapshot_layout(&self.config.export_id, &snapshot_id); + let delta_key = layout.delta_key.clone(); let mut delta_file = OpenOptions::new() .create(true) @@ -871,7 +866,7 @@ impl Export { generation, staging_path: Some(delta_path.display().to_string()), object_key: delta_key.clone(), - manifest_key: manifest_key.clone(), + manifest_key: layout.manifest_key.clone(), } .persist(&self.journal_path)?; @@ -907,12 +902,8 @@ impl Export { snapshot_id: String, manifest: Manifest, ) -> Result { - let manifest_key = manifest_key(&self.config.storage.prefix, &snapshot_id); - self.remote - .put_bytes( - &manifest_key, - Bytes::from(serde_json::to_vec_pretty(&manifest)?), - ) + self.repository + .put_manifest(&self.config.export_id, &snapshot_id, &manifest) .await?; Ok(PreparedPublish { @@ -923,42 +914,16 @@ impl Export { } async fn commit_prepared_publish(&self, prepared: PreparedPublish) -> Result { - let manifest_key = manifest_key(&self.config.storage.prefix, &prepared.snapshot_id); - if let Some(volume_key) = &self.config.storage.volume_key { - let stored = self.remote.get_object_with_etag(volume_key).await?; - let current: VolumeMetadata = serde_json::from_slice(&stored.body)?; - current.validate()?; - let next = current.with_current_snapshot_id(prepared.snapshot_id.clone()); - let etag = stored.etag.ok_or_else(|| { - Error::InvalidRequest(format!( - "volume metadata {} is missing an etag; conditional publish is unavailable", - volume_key - )) - })?; - let updated = self - .remote - .put_bytes_if_match( - volume_key, - Bytes::from(serde_json::to_vec_pretty(&next)?), - &etag, - ) - .await?; - if !updated { - return Err(Error::Conflict(format!( - "volume head changed while publishing snapshot {} for export {}", - prepared.snapshot_id, self.config.export_id - ))); - } - } - - self.remote - .put_bytes( - ¤t_ref_key(&self.config.storage.prefix), - Bytes::from(serde_json::to_vec_pretty(&CurrentRef { - generation: prepared.manifest.generation, - snapshot_id: Some(prepared.snapshot_id.clone()), - manifest_key: manifest_key.clone(), - })?), + let manifest_key = self + .repository + .snapshot_layout(&self.config.export_id, &prepared.snapshot_id) + .manifest_key; + self.repository + .publish_snapshot( + &self.config.export_id, + &prepared.snapshot_id, + prepared.manifest.generation, + &manifest_key, ) .await?; @@ -1074,51 +1039,11 @@ impl CloneSeedRecord { } } -async fn resolve_manifest( - remote: &dyn ObjectStore, - prefix: &str, - snapshot_id: Option, -) -> Result { - let manifest_key = if let Some(snapshot_id) = snapshot_id { - manifest_key(prefix, &snapshot_id) - } else { - let current_ref: CurrentRef = - serde_json::from_slice(&remote.get_object(¤t_ref_key(prefix)).await?)?; - current_ref.manifest_key - }; - resolve_manifest_by_key(remote, &manifest_key).await -} - -async fn resolve_manifest_for_clone( - remote: &dyn ObjectStore, - clone_source: &CloneSource, -) -> Result { - resolve_manifest( - remote, - &clone_source.prefix, - clone_source.snapshot_id.clone(), - ) - .await -} - async fn resolve_manifest_by_key( - remote: &dyn ObjectStore, + repository: &dyn VolumeRepository, manifest_key: &str, ) -> Result { - let manifest: Manifest = serde_json::from_slice(&remote.get_object(manifest_key).await?)?; - manifest.validate()?; - Ok(ResolvedManifest { - manifest_key: manifest_key.to_string(), - manifest, - }) -} - -fn current_ref_key(prefix: &str) -> String { - format!("{prefix}/refs/current.json") -} - -fn manifest_key(prefix: &str, snapshot_id: &str) -> String { - format!("{prefix}/snapshots/{snapshot_id}/manifest.json") + repository.load_manifest_by_key(manifest_key).await } fn new_snapshot_id() -> String { @@ -1135,9 +1060,12 @@ mod tests { use bytes::Bytes; use tempfile::tempdir; - use crate::core::engine::spec::{CloneSource, ExportSpec, StorageNamespace}; + use crate::core::engine::spec::{CloneSource, ExportSpec}; use crate::core::model::journal::{JournalOperation, JournalRecord}; + use crate::core::model::volume::VolumeMetadata; use crate::core::storage::object_store::{ObjectStore, StoredObject}; + use crate::core::storage::volume_repository::VolumeRepository; + use crate::storage::build_volume_repository; use super::Export; @@ -1169,7 +1097,9 @@ mod tests { async fn get_object_with_etag(&self, key: &str) -> crate::Result { let objects = self.objects.lock().unwrap(); - let object = objects.get(key).unwrap(); + let object = objects + .get(key) + .ok_or_else(|| crate::Error::InvalidRequest(format!("missing object {key}")))?; Ok(StoredObject { body: Bytes::copy_from_slice(&object.body), etag: Some(object.etag.clone()), @@ -1264,10 +1194,6 @@ mod tests { ExportSpec { export_id: "export".to_string(), cache_dir: dir.join("cache"), - storage: StorageNamespace { - prefix: "exports/export".to_string(), - volume_key: None, - }, chunk_size: 4, snapshot_id: None, image_size: Some(8), @@ -1279,20 +1205,56 @@ mod tests { ExportSpec { export_id: "clone".to_string(), cache_dir: dir.join("clone-cache"), - storage: StorageNamespace { - prefix: "exports/clone".to_string(), - volume_key: None, - }, chunk_size: 4, snapshot_id: None, image_size: None, clone_source: Some(CloneSource { - prefix: "exports/export".to_string(), + export_id: "export".to_string(), snapshot_id: Some("2".to_string()), }), } } + fn repository(remote: Arc) -> Arc { + build_volume_repository("exports".to_string(), remote) + } + + async fn seed_empty_volume( + repository: Arc, + export_id: &str, + image_size: u64, + chunk_size: u64, + ) { + repository + .create_volume(&VolumeMetadata::new_empty( + export_id.to_string(), + image_size, + chunk_size, + )) + .await + .unwrap(); + } + + async fn seed_clone_volume( + repository: Arc, + export_id: &str, + image_size: u64, + chunk_size: u64, + source_export_id: &str, + source_snapshot_id: Option<&str>, + ) { + repository + .create_volume(&VolumeMetadata::new_clone( + export_id.to_string(), + image_size, + chunk_size, + source_export_id.to_string(), + source_snapshot_id.map(str::to_string), + )) + .await + .unwrap(); + } + #[tokio::test] async fn partial_write_materializes_remote_chunk() { let dir = tempdir().unwrap(); @@ -1334,7 +1296,9 @@ mod tests { .await .unwrap(); - let export = Export::open(base_config(dir.path()), remote).await.unwrap(); + let export = Export::open(base_config(dir.path()), remote.clone(), repository(remote)) + .await + .unwrap(); export.write(1, b"ZZ", false).await.unwrap(); let data = export.read(0, 4).await.unwrap(); assert_eq!(&data, b"aZZd"); @@ -1344,7 +1308,9 @@ mod tests { async fn snapshot_publishes_delta_only_for_dirty_chunks() { let dir = tempdir().unwrap(); let remote = Arc::new(MemoryRemote::default()); - let export = Export::create(base_config(dir.path()), remote.clone()) + let repository = repository(remote.clone()); + seed_empty_volume(repository.clone(), "export", 8, 4).await; + let export = Export::create(base_config(dir.path()), remote.clone(), repository) .await .unwrap(); export.write(0, b"abcd", false).await.unwrap(); @@ -1436,7 +1402,9 @@ mod tests { let mut config = base_config(dir.path()); config.snapshot_id = Some("1".to_string()); - let export = Export::open(config, remote).await.unwrap(); + let export = Export::open(config, remote.clone(), repository(remote)) + .await + .unwrap(); let status = export.status().await; assert_eq!(status.remote_head_generation, 1); @@ -1475,9 +1443,13 @@ mod tests { .await .unwrap(); - let clone = Export::clone_from_snapshot(clone_config(dir.path()), remote) - .await - .unwrap(); + let clone = Export::clone_from_snapshot( + clone_config(dir.path()), + remote.clone(), + repository(remote), + ) + .await + .unwrap(); let status = clone.status().await; assert_eq!(status.snapshot_generation, 0); @@ -1489,6 +1461,7 @@ mod tests { async fn first_clone_snapshot_publishes_packed_sparse_data_and_cuts_over_to_target_lineage() { let dir = tempdir().unwrap(); let remote = Arc::new(MemoryRemote::default()); + let repository = repository(remote.clone()); remote .put_bytes( "exports/export/base/full.blob", @@ -1516,10 +1489,12 @@ mod tests { ) .await .unwrap(); + seed_clone_volume(repository.clone(), "clone", 8, 4, "export", Some("2")).await; - let clone = Export::clone_from_snapshot(clone_config(dir.path()), remote.clone()) - .await - .unwrap(); + let clone = + Export::clone_from_snapshot(clone_config(dir.path()), remote.clone(), repository) + .await + .unwrap(); clone.write(0, b"WXYZ", false).await.unwrap(); let snapshot = clone.snapshot().await.unwrap(); @@ -1623,14 +1598,18 @@ mod tests { let mut first_config = base_config(dir.path()); first_config.snapshot_id = Some("1".to_string()); - let first = Export::open(first_config, remote.clone()).await.unwrap(); + let first = Export::open(first_config, remote.clone(), repository(remote.clone())) + .await + .unwrap(); assert_eq!(first.read(0, 8).await.unwrap(), b"abcdefgh"); first.shutdown().unwrap(); drop(first); let mut second_config = base_config(dir.path()); second_config.snapshot_id = Some("2".to_string()); - let second = Export::open(second_config, remote).await.unwrap(); + let second = Export::open(second_config, remote.clone(), repository(remote)) + .await + .unwrap(); assert_eq!(second.read(0, 8).await.unwrap(), b"wxyz1234"); } @@ -1695,14 +1674,16 @@ mod tests { let mut first_config = base_config(dir.path()); first_config.snapshot_id = Some("1".to_string()); - let first = Export::open(first_config, remote.clone()).await.unwrap(); + let first = Export::open(first_config, remote.clone(), repository(remote.clone())) + .await + .unwrap(); first.write(0, b"ZZZZ", false).await.unwrap(); first.shutdown().unwrap(); drop(first); let mut second_config = base_config(dir.path()); second_config.snapshot_id = Some("2".to_string()); - let error = match Export::open(second_config, remote).await { + let error = match Export::open(second_config, remote.clone(), repository(remote)).await { Ok(_) => panic!("expected snapshot switch with dirty cache to fail"), Err(error) => error, }; @@ -1770,7 +1751,9 @@ mod tests { let mut first_config = base_config(dir.path()); first_config.snapshot_id = Some("1".to_string()); - let first = Export::open(first_config, remote.clone()).await.unwrap(); + let first = Export::open(first_config, remote.clone(), repository(remote.clone())) + .await + .unwrap(); assert_eq!(first.read(0, 8).await.unwrap(), b"abcdefgh"); first.write(0, b"ZZZZ", false).await.unwrap(); @@ -1787,7 +1770,9 @@ mod tests { let mut second_config = base_config(dir.path()); second_config.snapshot_id = Some("2".to_string()); - let second = Export::open(second_config, remote).await.unwrap(); + let second = Export::open(second_config, remote.clone(), repository(remote)) + .await + .unwrap(); assert_eq!(second.read(0, 8).await.unwrap(), b"wxyz1234"); } @@ -1795,7 +1780,9 @@ mod tests { async fn first_snapshot_of_new_export_stores_only_dirty_chunks() { let dir = tempdir().unwrap(); let remote = Arc::new(MemoryRemote::default()); - let export = Export::create(base_config(dir.path()), remote.clone()) + let repository = repository(remote.clone()); + seed_empty_volume(repository.clone(), "export", 8, 4).await; + let export = Export::create(base_config(dir.path()), remote.clone(), repository) .await .unwrap(); @@ -1830,7 +1817,9 @@ mod tests { async fn compact_rewrites_full_base_without_deleting_older_snapshots() { let dir = tempdir().unwrap(); let remote = Arc::new(MemoryRemote::default()); - let export = Export::create(base_config(dir.path()), remote.clone()) + let repository = repository(remote.clone()); + seed_empty_volume(repository.clone(), "export", 8, 4).await; + let export = Export::create(base_config(dir.path()), remote.clone(), repository) .await .unwrap(); @@ -1855,9 +1844,13 @@ mod tests { async fn startup_recovery_cleans_journal_and_keeps_dirty_cache() { let dir = tempdir().unwrap(); let remote = Arc::new(MemoryRemote::default()); - let export = Export::create(base_config(dir.path()), remote.clone()) - .await - .unwrap(); + let export = Export::create( + base_config(dir.path()), + remote.clone(), + repository(remote.clone()), + ) + .await + .unwrap(); export.write(0, b"abcd", false).await.unwrap(); export.flush().await.unwrap(); @@ -1877,7 +1870,7 @@ mod tests { drop(export); - let reopened = Export::create(base_config(dir.path()), remote) + let reopened = Export::create(base_config(dir.path()), remote.clone(), repository(remote)) .await .unwrap(); let status = reopened.status().await; diff --git a/src/core/engine/spec.rs b/src/core/engine/spec.rs index 919ee22..33cf5fa 100644 --- a/src/core/engine/spec.rs +++ b/src/core/engine/spec.rs @@ -1,14 +1,8 @@ use std::path::PathBuf; -#[derive(Debug, Clone)] -pub struct StorageNamespace { - pub prefix: String, - pub volume_key: Option, -} - #[derive(Debug, Clone)] pub struct CloneSource { - pub prefix: String, + pub export_id: String, pub snapshot_id: Option, } @@ -20,5 +14,4 @@ pub struct ExportSpec { pub snapshot_id: Option, pub image_size: Option, pub clone_source: Option, - pub storage: StorageNamespace, } diff --git a/src/core/storage/mod.rs b/src/core/storage/mod.rs index ef12e4f..5401354 100644 --- a/src/core/storage/mod.rs +++ b/src/core/storage/mod.rs @@ -1 +1,2 @@ pub mod object_store; +pub mod volume_repository; diff --git a/src/core/storage/volume_repository.rs b/src/core/storage/volume_repository.rs new file mode 100644 index 0000000..8b5a94d --- /dev/null +++ b/src/core/storage/volume_repository.rs @@ -0,0 +1,52 @@ +use async_trait::async_trait; + +use crate::core::error::Result; +use crate::core::model::manifest::Manifest; +use crate::core::model::volume::VolumeMetadata; + +#[derive(Debug, Clone)] +pub struct LoadedVolume { + pub metadata: VolumeMetadata, + pub etag: Option, +} + +#[derive(Debug, Clone)] +pub struct ResolvedManifest { + pub manifest_key: String, + pub manifest: Manifest, +} + +#[derive(Debug, Clone)] +pub struct SnapshotLayout { + pub manifest_key: String, + pub delta_key: String, + pub base_key: String, + pub current_ref_key: String, +} + +#[async_trait] +pub trait VolumeRepository: Send + Sync { + async fn list_volumes(&self) -> Result>; + async fn load_volume(&self, export_id: &str) -> Result; + async fn create_volume(&self, volume: &VolumeMetadata) -> Result; + async fn load_manifest( + &self, + export_id: &str, + snapshot_id: Option<&str>, + ) -> Result; + async fn load_manifest_by_key(&self, manifest_key: &str) -> Result; + async fn put_manifest( + &self, + export_id: &str, + snapshot_id: &str, + manifest: &Manifest, + ) -> Result; + async fn publish_snapshot( + &self, + export_id: &str, + snapshot_id: &str, + generation: u64, + manifest_key: &str, + ) -> Result<()>; + fn snapshot_layout(&self, export_id: &str, snapshot_id: &str) -> SnapshotLayout; +} diff --git a/src/main.rs b/src/main.rs index 82872f6..c3139d8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,7 @@ use nbd_server::Cli; use nbd_server::app::config::{Command, ServeConfig}; use nbd_server::server::admin::serve_manager_admin; use nbd_server::server::manager::ExportManager; -use nbd_server::storage::build_object_store; +use nbd_server::storage::{build_object_store, build_volume_repository}; #[tokio::main] async fn main() -> nbd_server::Result<()> { @@ -18,7 +18,8 @@ async fn main() -> nbd_server::Result<()> { Command::Serve(args) => ServeConfig::from(args), }; let remote = build_object_store(&serve.storage).await?; - let manager = ExportManager::new(serve.clone(), remote).await?; + let repository = build_volume_repository(serve.export_root.clone(), remote.clone()); + let manager = ExportManager::new(serve.clone(), remote, repository).await?; let admin_socket = serve.admin_sock.clone(); let nbd_addr = serve.listen; let admin_manager = manager.clone(); diff --git a/src/server/admin.rs b/src/server/admin.rs index bd38b21..3269ae5 100644 --- a/src/server/admin.rs +++ b/src/server/admin.rs @@ -11,9 +11,7 @@ use serde::Serialize; use tokio::net::UnixListener; use crate::core::error::Error; -use crate::server::manager::{ - CloneExportRequest, CreateExportRequest, ExportManager, OpenExportRequest, -}; +use crate::server::manager::{CloneExportRequest, CreateExportRequest, ExportId, ExportManager, OpenExportRequest}; #[derive(Clone)] struct ManagerAdminState { @@ -55,17 +53,14 @@ pub async fn serve_manager_admin( .map_err(crate::Error::Io) } -async fn get_exports( - State(state): State, -) -> Json> { +async fn get_exports(State(state): State) -> Json> { Json(state.manager.list().await) } async fn post_create_export( State(state): State, Json(request): Json, -) -> std::result::Result, (StatusCode, Json)> -{ +) -> std::result::Result, (StatusCode, Json)> { state .manager .create_export(request) @@ -77,8 +72,7 @@ async fn post_create_export( async fn post_open_export( State(state): State, Json(request): Json, -) -> std::result::Result, (StatusCode, Json)> -{ +) -> std::result::Result, (StatusCode, Json)> { state .manager .open_export(request) @@ -90,8 +84,7 @@ async fn post_open_export( async fn post_clone_export( State(state): State, Json(request): Json, -) -> std::result::Result, (StatusCode, Json)> -{ +) -> std::result::Result, (StatusCode, Json)> { state .manager .clone_export(request) diff --git a/src/server/manager.rs b/src/server/manager.rs index fb8a82d..260e9a2 100644 --- a/src/server/manager.rs +++ b/src/server/manager.rs @@ -1,18 +1,18 @@ use std::collections::BTreeMap; use std::sync::Arc; -use bytes::Bytes; use serde::{Deserialize, Serialize}; -use tokio::sync::{Mutex, RwLock}; +use tokio::sync::RwLock; use crate::app::config::ServeConfig; use crate::core::engine::export::{ CompactResponse, Export, ResetCacheResponse, SnapshotResponse, Status, }; -use crate::core::engine::spec::{CloneSource, ExportSpec, StorageNamespace}; +use crate::core::engine::spec::{CloneSource, ExportSpec}; use crate::core::error::{Error, Result}; -use crate::core::model::volume::{VolumeMetadata, export_prefix, volume_key}; +use crate::core::model::volume::VolumeMetadata; use crate::core::storage::object_store::ObjectStore; +use crate::core::storage::volume_repository::VolumeRepository; #[derive(Debug, Clone, Deserialize)] pub struct CreateExportRequest { @@ -32,118 +32,94 @@ pub struct CloneExportRequest { pub source_snapshot_id: Option, } -#[derive(Debug, Clone, Serialize)] -pub struct ExportSummary { - pub export_id: String, -} - -struct ManagedExport { - export: Arc, - volume: Mutex, - volume_etag: Mutex>, -} +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize)] +#[serde(transparent)] +pub struct ExportId(pub String); pub struct ExportManager { serve: ServeConfig, storage: Arc, - exports: RwLock>>, + repository: Arc, + exports: RwLock>>, } impl ExportManager { - pub async fn new(serve: ServeConfig, storage: Arc) -> Result> { + pub async fn new( + serve: ServeConfig, + storage: Arc, + repository: Arc, + ) -> Result> { let manager = Arc::new(Self { serve, storage, + repository, exports: RwLock::new(BTreeMap::new()), }); manager.discover().await?; Ok(manager) } - pub async fn list(&self) -> Vec { + pub async fn list(&self) -> Vec { self.exports .read() .await .keys() .cloned() - .map(|export_id| ExportSummary { export_id }) + .map(ExportId) .collect() } - pub async fn create_export(&self, request: CreateExportRequest) -> Result { + pub async fn create_export(&self, request: CreateExportRequest) -> Result { self.ensure_not_loaded(&request.export_id).await?; let config = self.export_spec_for(&request.export_id, Some(request.size)); - let export = Export::create(config, self.storage.clone()).await?; + let export = Export::create(config, self.storage.clone(), self.repository.clone()).await?; let volume = VolumeMetadata::new_empty( request.export_id.clone(), request.size, self.serve.chunk_size, ); - let etag = self.create_remote_volume(&volume).await?; - self.exports.write().await.insert( - request.export_id.clone(), - Arc::new(ManagedExport { - export, - volume: Mutex::new(volume), - volume_etag: Mutex::new(Some(etag)), - }), - ); - Ok(ExportSummary { - export_id: request.export_id, - }) + self.repository.create_volume(&volume).await?; + self.exports.write().await.insert(request.export_id.clone(), export); + Ok(ExportId(request.export_id)) } - pub async fn open_export(&self, request: OpenExportRequest) -> Result { + pub async fn open_export(&self, request: OpenExportRequest) -> Result { self.ensure_not_loaded(&request.export_id).await?; - let (volume, etag) = self.load_volume_with_etag(&request.export_id).await?; + let loaded = self.repository.load_volume(&request.export_id).await?; + let volume = loaded.metadata; let export = self.instantiate_from_volume(&volume).await?; - self.exports.write().await.insert( - request.export_id.clone(), - Arc::new(ManagedExport { - export, - volume: Mutex::new(volume), - volume_etag: Mutex::new(etag), - }), - ); - Ok(ExportSummary { - export_id: request.export_id, - }) + self.exports.write().await.insert(request.export_id.clone(), export); + Ok(ExportId(request.export_id)) } - pub async fn clone_export(&self, request: CloneExportRequest) -> Result { + pub async fn clone_export(&self, request: CloneExportRequest) -> Result { self.ensure_not_loaded(&request.export_id).await?; - let (source_volume, _) = self - .load_volume_with_etag(&request.source_export_id) + let source_volume = self + .repository + .load_volume(&request.source_export_id) .await?; let source_snapshot_id = request .source_snapshot_id .clone() - .or_else(|| source_volume.current_snapshot_id.clone()); + .or_else(|| source_volume.metadata.current_snapshot_id.clone()); let volume = VolumeMetadata::new_clone( request.export_id.clone(), - source_volume.image_size, - source_volume.chunk_size, + source_volume.metadata.image_size, + source_volume.metadata.chunk_size, request.source_export_id.clone(), source_snapshot_id.clone(), ); let mut config = self.export_spec_for(&request.export_id, None); config.clone_source = Some(CloneSource { - prefix: export_prefix(&self.serve.export_root, &request.source_export_id), + export_id: request.source_export_id.clone(), snapshot_id: source_snapshot_id.clone(), }); - let export = Export::clone_from_snapshot(config, self.storage.clone()).await?; - let etag = self.create_remote_volume(&volume).await?; - self.exports.write().await.insert( - request.export_id.clone(), - Arc::new(ManagedExport { - export, - volume: Mutex::new(volume), - volume_etag: Mutex::new(Some(etag)), - }), - ); - Ok(ExportSummary { - export_id: request.export_id, - }) + let export = + Export::clone_from_snapshot(config, self.storage.clone(), self.repository.clone()) + .await?; + self.repository.create_volume(&volume).await?; + self.exports.write().await.insert(request.export_id.clone(), export); + Ok(ExportId(request.export_id)) } pub async fn remove_export(&self, export_id: &str) -> Result<()> { @@ -156,14 +132,12 @@ impl ExportManager { } pub async fn get_export(&self, export_id: &str) -> Result> { - Ok(self - .exports + self.exports .read() .await .get(export_id) - .ok_or_else(|| Error::InvalidRequest(format!("unknown export {export_id}")))? - .export - .clone()) + .cloned() + .ok_or_else(|| Error::InvalidRequest(format!("unknown export {export_id}"))) } pub async fn status(&self, export_id: &str) -> Result { @@ -171,23 +145,11 @@ impl ExportManager { } pub async fn snapshot(&self, export_id: &str) -> Result { - let managed = self.get_managed(export_id).await?; - let response = managed.export.snapshot().await?; - if response.snapshot_created { - let (volume, etag) = self.load_volume_with_etag(export_id).await?; - *managed.volume.lock().await = volume; - *managed.volume_etag.lock().await = etag; - } - Ok(response) + self.get_export(export_id).await?.snapshot().await } pub async fn compact(&self, export_id: &str) -> Result { - let managed = self.get_managed(export_id).await?; - let response = managed.export.compact().await?; - let (volume, etag) = self.load_volume_with_etag(export_id).await?; - *managed.volume.lock().await = volume; - *managed.volume_etag.lock().await = etag; - Ok(response) + self.get_export(export_id).await?.compact().await } pub async fn reset_cache(&self, export_id: &str) -> Result { @@ -200,7 +162,7 @@ impl ExportManager { .read() .await .values() - .map(|managed| managed.export.clone()) + .cloned() .collect(); for export in exports { export.shutdown()?; @@ -209,37 +171,14 @@ impl ExportManager { } async fn discover(&self) -> Result<()> { - let prefix = format!("{}/", self.serve.export_root.trim_end_matches('/')); - let keys = self.storage.list_prefix(&prefix).await?; - for key in keys { - if !key.ends_with("/volume.json") { - continue; - } - let stored = self.storage.get_object_with_etag(&key).await?; - let volume: VolumeMetadata = serde_json::from_slice(&stored.body)?; - volume.validate()?; + for loaded in self.repository.list_volumes().await? { + let volume = loaded.metadata; let export = self.instantiate_from_volume(&volume).await?; - self.exports.write().await.insert( - volume.export_id.clone(), - Arc::new(ManagedExport { - export, - volume: Mutex::new(volume), - volume_etag: Mutex::new(stored.etag), - }), - ); + self.exports.write().await.insert(volume.export_id.clone(), export); } Ok(()) } - async fn get_managed(&self, export_id: &str) -> Result> { - self.exports - .read() - .await - .get(export_id) - .cloned() - .ok_or_else(|| Error::InvalidRequest(format!("unknown export {export_id}"))) - } - async fn ensure_not_loaded(&self, export_id: &str) -> Result<()> { if self.exports.read().await.contains_key(export_id) { return Err(Error::InvalidRequest(format!( @@ -249,57 +188,31 @@ impl ExportManager { Ok(()) } - async fn load_volume_with_etag( - &self, - export_id: &str, - ) -> Result<(VolumeMetadata, Option)> { - let key = volume_key(&self.serve.export_root, export_id); - let stored = self.storage.get_object_with_etag(&key).await?; - let volume: VolumeMetadata = serde_json::from_slice(&stored.body)?; - volume.validate()?; - Ok((volume, stored.etag)) - } - - async fn create_remote_volume(&self, volume: &VolumeMetadata) -> Result { - let key = volume_key(&self.serve.export_root, &volume.export_id); - let created = self - .storage - .put_bytes_if_absent(&key, Bytes::from(serde_json::to_vec_pretty(volume)?)) - .await?; - if !created { - return Err(Error::Conflict(format!( - "export {} already exists remotely", - volume.export_id - ))); - } - let (_, etag) = self.load_volume_with_etag(&volume.export_id).await?; - etag.ok_or_else(|| { - Error::InvalidRequest(format!( - "volume {} was created without an etag", - volume.export_id - )) - }) - } - async fn instantiate_from_volume(&self, volume: &VolumeMetadata) -> Result> { if let Some(snapshot_id) = &volume.current_snapshot_id { let mut config = self.export_spec_for(&volume.export_id, None); config.snapshot_id = Some(snapshot_id.clone()); - return Export::open(config, self.storage.clone()).await; + return Export::open(config, self.storage.clone(), self.repository.clone()).await; } if let Some(seed) = &volume.clone_seed { let mut config = self.export_spec_for(&volume.export_id, None); config.clone_source = Some(CloneSource { - prefix: export_prefix(&self.serve.export_root, &seed.source_export_id), + export_id: seed.source_export_id.clone(), snapshot_id: seed.source_snapshot_id.clone(), }); - return Export::clone_from_snapshot(config, self.storage.clone()).await; + return Export::clone_from_snapshot( + config, + self.storage.clone(), + self.repository.clone(), + ) + .await; } Export::create( self.export_spec_for(&volume.export_id, Some(volume.image_size)), self.storage.clone(), + self.repository.clone(), ) .await } @@ -312,10 +225,6 @@ impl ExportManager { snapshot_id: None, image_size, clone_source: None, - storage: StorageNamespace { - prefix: export_prefix(&self.serve.export_root, export_id), - volume_key: Some(volume_key(&self.serve.export_root, export_id)), - }, } } } @@ -333,8 +242,9 @@ mod tests { use crate::app::config::{ServeConfig, StorageBackendKind, StorageConfig}; use crate::core::model::volume::{VolumeMetadata, volume_key}; use crate::core::storage::object_store::{ObjectStore, StoredObject}; + use crate::storage::build_volume_repository; - use super::{CreateExportRequest, ExportManager}; + use super::{CreateExportRequest, ExportId, ExportManager}; #[derive(Clone)] struct MemoryObject { @@ -479,7 +389,8 @@ mod tests { async fn create_export_persists_volume_metadata() { let dir = tempdir().unwrap(); let remote = Arc::new(MemoryRemote::default()); - let manager = ExportManager::new(serve_config(dir.path()), remote.clone()) + let repository = build_volume_repository("exports".to_string(), remote.clone()); + let manager = ExportManager::new(serve_config(dir.path()), remote.clone(), repository) .await .unwrap(); @@ -498,6 +409,7 @@ mod tests { let volume: VolumeMetadata = serde_json::from_slice(&volume).unwrap(); assert_eq!(volume.export_id, "vm01"); assert_eq!(manager.list().await.len(), 1); + assert_eq!(manager.list().await[0], ExportId("vm01".to_string())); } #[tokio::test] @@ -522,12 +434,13 @@ mod tests { .await .unwrap(); - let manager = ExportManager::new(serve_config(dir.path()), remote) + let repository = build_volume_repository("exports".to_string(), remote.clone()); + let manager = ExportManager::new(serve_config(dir.path()), remote, repository) .await .unwrap(); let exports = manager.list().await; assert_eq!(exports.len(), 1); - assert_eq!(exports[0].export_id, "vm01"); + assert_eq!(exports[0], ExportId("vm01".to_string())); } } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 1b5aa9f..5a500b1 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,3 +1,4 @@ +pub mod object_store_volume_repository; pub mod r2; pub mod s3; @@ -9,6 +10,7 @@ use std::sync::Arc; use crate::app::config::{StorageBackendKind, StorageConfig}; use crate::core::error::Result; use crate::core::storage::object_store::ObjectStore; +use crate::core::storage::volume_repository::VolumeRepository; pub async fn build_object_store(config: &StorageConfig) -> Result> { match config.backend { @@ -16,3 +18,10 @@ pub async fn build_object_store(config: &StorageConfig) -> Result r2::build_r2_object_store(config).await, } } + +pub fn build_volume_repository( + export_root: String, + store: Arc, +) -> Arc { + Arc::new(object_store_volume_repository::ObjectStoreVolumeRepository::new(export_root, store)) +} diff --git a/src/storage/object_store_volume_repository.rs b/src/storage/object_store_volume_repository.rs new file mode 100644 index 0000000..042cf0f --- /dev/null +++ b/src/storage/object_store_volume_repository.rs @@ -0,0 +1,210 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use serde::{Deserialize, Serialize}; + +use crate::core::error::{Error, Result}; +use crate::core::model::manifest::Manifest; +use crate::core::model::volume::{VolumeMetadata, manifest_key_from_snapshot_id, volume_key}; +use crate::core::storage::object_store::ObjectStore; +use crate::core::storage::volume_repository::{ + LoadedVolume, ResolvedManifest, SnapshotLayout, VolumeRepository, +}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct CurrentRef { + generation: u64, + #[serde(default)] + snapshot_id: Option, + manifest_key: String, +} + +pub struct ObjectStoreVolumeRepository { + export_root: String, + store: Arc, +} + +impl ObjectStoreVolumeRepository { + pub fn new(export_root: String, store: Arc) -> Self { + Self { + export_root: export_root.trim_end_matches('/').to_string(), + store, + } + } + + fn current_ref_key(&self, export_id: &str) -> String { + format!("{}/{}/refs/current.json", self.export_root, export_id) + } + + fn manifest_key(&self, export_id: &str, snapshot_id: &str) -> String { + manifest_key_from_snapshot_id(&self.export_root, export_id, snapshot_id) + } +} + +#[async_trait] +impl VolumeRepository for ObjectStoreVolumeRepository { + async fn list_volumes(&self) -> Result> { + let prefix = format!("{}/", self.export_root); + let keys = self.store.list_prefix(&prefix).await?; + let mut volumes = Vec::new(); + for key in keys { + if !key.ends_with("/volume.json") { + continue; + } + let stored = self.store.get_object_with_etag(&key).await?; + let volume: VolumeMetadata = serde_json::from_slice(&stored.body)?; + volume.validate()?; + volumes.push(LoadedVolume { + metadata: volume, + etag: stored.etag, + }); + } + Ok(volumes) + } + + async fn load_volume(&self, export_id: &str) -> Result { + let key = volume_key(&self.export_root, export_id); + let stored = self.store.get_object_with_etag(&key).await?; + let volume: VolumeMetadata = serde_json::from_slice(&stored.body)?; + volume.validate()?; + Ok(LoadedVolume { + metadata: volume, + etag: stored.etag, + }) + } + + async fn create_volume(&self, volume: &VolumeMetadata) -> Result { + let key = volume_key(&self.export_root, &volume.export_id); + let created = self + .store + .put_bytes_if_absent(&key, Bytes::from(serde_json::to_vec_pretty(volume)?)) + .await?; + if !created { + return Err(Error::Conflict(format!( + "export {} already exists remotely", + volume.export_id + ))); + } + let loaded = self.load_volume(&volume.export_id).await?; + if loaded.etag.is_none() { + return Err(Error::InvalidRequest(format!( + "volume {} was created without an etag", + volume.export_id + ))); + } + Ok(loaded) + } + + async fn load_manifest( + &self, + export_id: &str, + snapshot_id: Option<&str>, + ) -> Result { + let manifest_key = if let Some(snapshot_id) = snapshot_id { + self.manifest_key(export_id, snapshot_id) + } else { + let current_ref: CurrentRef = serde_json::from_slice( + &self + .store + .get_object(&self.current_ref_key(export_id)) + .await?, + )?; + current_ref.manifest_key + }; + let manifest: Manifest = + serde_json::from_slice(&self.store.get_object(&manifest_key).await?)?; + manifest.validate()?; + Ok(ResolvedManifest { + manifest_key, + manifest, + }) + } + + async fn load_manifest_by_key(&self, manifest_key: &str) -> Result { + let manifest: Manifest = + serde_json::from_slice(&self.store.get_object(manifest_key).await?)?; + manifest.validate()?; + Ok(ResolvedManifest { + manifest_key: manifest_key.to_string(), + manifest, + }) + } + + async fn put_manifest( + &self, + export_id: &str, + snapshot_id: &str, + manifest: &Manifest, + ) -> Result { + let manifest_key = self.manifest_key(export_id, snapshot_id); + self.store + .put_bytes( + &manifest_key, + Bytes::from(serde_json::to_vec_pretty(manifest)?), + ) + .await?; + Ok(manifest_key) + } + + async fn publish_snapshot( + &self, + export_id: &str, + snapshot_id: &str, + generation: u64, + manifest_key: &str, + ) -> Result<()> { + let volume_key = volume_key(&self.export_root, export_id); + let stored = self.store.get_object_with_etag(&volume_key).await?; + let current: VolumeMetadata = serde_json::from_slice(&stored.body)?; + current.validate()?; + let next = current.with_current_snapshot_id(snapshot_id.to_string()); + let etag = stored.etag.ok_or_else(|| { + Error::InvalidRequest(format!( + "volume metadata {} is missing an etag; conditional publish is unavailable", + volume_key + )) + })?; + let updated = self + .store + .put_bytes_if_match( + &volume_key, + Bytes::from(serde_json::to_vec_pretty(&next)?), + &etag, + ) + .await?; + if !updated { + return Err(Error::Conflict(format!( + "volume head changed while publishing snapshot {} for export {}", + snapshot_id, export_id + ))); + } + + self.store + .put_bytes( + &self.current_ref_key(export_id), + Bytes::from(serde_json::to_vec_pretty(&CurrentRef { + generation, + snapshot_id: Some(snapshot_id.to_string()), + manifest_key: manifest_key.to_string(), + })?), + ) + .await?; + Ok(()) + } + + fn snapshot_layout(&self, export_id: &str, snapshot_id: &str) -> SnapshotLayout { + SnapshotLayout { + manifest_key: self.manifest_key(export_id, snapshot_id), + delta_key: format!( + "{}/{}/snapshots/{}/delta.blob", + self.export_root, export_id, snapshot_id + ), + base_key: format!( + "{}/{}/snapshots/{}/base.blob", + self.export_root, export_id, snapshot_id + ), + current_ref_key: self.current_ref_key(export_id), + } + } +} diff --git a/tests/intergration/run_test.sh b/tests/intergration/run_test.sh index 618c2f0..1b4a069 100755 --- a/tests/intergration/run_test.sh +++ b/tests/intergration/run_test.sh @@ -48,23 +48,30 @@ REPO_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)" : "${NBD_SERVER_VM01:=vm01}" : "${NBD_SERVER_VM02:=vm02}" : "${NBD_SERVER_VM03:=vm03}" +: "${NBD_SERVER_VM04:=vm04}" : "${NBD_SERVER_NBD0:=/dev/nbd0}" : "${NBD_SERVER_NBD1:=/dev/nbd1}" : "${NBD_SERVER_NBD2:=/dev/nbd2}" +: "${NBD_SERVER_NBD3:=/dev/nbd3}" : "${NBD_SERVER_MOUNT_ROOT:=/var/tmp/nbd-server-it-mnt}" SERVER_LOG="${NBD_SERVER_MOUNT_ROOT}/server.log" VM01_MOUNT="${NBD_SERVER_MOUNT_ROOT}/${NBD_SERVER_VM01}" VM02_MOUNT="${NBD_SERVER_MOUNT_ROOT}/${NBD_SERVER_VM02}" VM03_MOUNT="${NBD_SERVER_MOUNT_ROOT}/${NBD_SERVER_VM03}" +VM04_MOUNT="${NBD_SERVER_MOUNT_ROOT}/${NBD_SERVER_VM04}" SERVER_PID="" VM01_SNAPSHOT_ID="" VM02_SNAPSHOT_ID="" +VM03_SNAPSHOT_ID="" VM02_FIXTURE_DIR="agent-fixture-tree" VM02_FIXTURE_MD5_MANIFEST="${NBD_SERVER_MOUNT_ROOT}/vm02-fixture.md5" +VM03_HISTORY_DIR="history-rounds" +VM03_FULL_MD5_MANIFEST="${NBD_SERVER_MOUNT_ROOT}/vm03-full-tree.md5" +VM04_FULL_MD5_MANIFEST="${NBD_SERVER_MOUNT_ROOT}/vm04-full-tree.md5" mkdir -p "${NBD_SERVER_MOUNT_ROOT}" -mkdir -p "${VM01_MOUNT}" "${VM02_MOUNT}" "${VM03_MOUNT}" +mkdir -p "${VM01_MOUNT}" "${VM02_MOUNT}" "${VM03_MOUNT}" "${VM04_MOUNT}" mkdir -p "${NBD_SERVER_CACHE_ROOT}" cleanup() { @@ -79,7 +86,11 @@ cleanup() { if mountpoint -q "${VM03_MOUNT}"; then umount "${VM03_MOUNT}" fi + if mountpoint -q "${VM04_MOUNT}"; then + umount "${VM04_MOUNT}" + fi + nbd-client -d "${NBD_SERVER_NBD3}" >/dev/null 2>&1 || true nbd-client -d "${NBD_SERVER_NBD2}" >/dev/null 2>&1 || true nbd-client -d "${NBD_SERVER_NBD1}" >/dev/null 2>&1 || true nbd-client -d "${NBD_SERVER_NBD0}" >/dev/null 2>&1 || true @@ -90,11 +101,16 @@ cleanup() { fi rm -f "${NBD_SERVER_ADMIN_SOCK}" - rm -f "${VM02_FIXTURE_MD5_MANIFEST}" "${NBD_SERVER_MOUNT_ROOT}/vm03-fixture.md5" + rm -f \ + "${VM02_FIXTURE_MD5_MANIFEST}" \ + "${NBD_SERVER_MOUNT_ROOT}/vm03-fixture.md5" \ + "${VM03_FULL_MD5_MANIFEST}" \ + "${VM04_FULL_MD5_MANIFEST}" rm -rf \ "${NBD_SERVER_CACHE_ROOT}/${NBD_SERVER_VM01}" \ "${NBD_SERVER_CACHE_ROOT}/${NBD_SERVER_VM02}" \ - "${NBD_SERVER_CACHE_ROOT}/${NBD_SERVER_VM03}" + "${NBD_SERVER_CACHE_ROOT}/${NBD_SERVER_VM03}" \ + "${NBD_SERVER_CACHE_ROOT}/${NBD_SERVER_VM04}" } trap cleanup EXIT @@ -290,6 +306,61 @@ verify_fixture_tree() { echo "Directory checksum verification succeeded" } +record_tree_manifest() { + local mount_dir="$1" + local output_path="$2" + + ( + cd "${mount_dir}" + find . -type f -print0 \ + | sort -z \ + | xargs -0 md5sum + ) >"${output_path}" +} + +verify_tree_manifest() { + local mount_dir="$1" + local expected_manifest="$2" + local actual_manifest="$3" + local label="$4" + + find "${mount_dir}" -type f -exec cat {} + >/dev/null + record_tree_manifest "${mount_dir}" "${actual_manifest}" + + if ! diff -u "${expected_manifest}" "${actual_manifest}"; then + echo "${label} checksum manifest mismatch" >&2 + return 1 + fi + + echo "${label} checksum verification succeeded" +} + +write_vm03_round_files() { + local mount_dir="$1" + local round="$2" + local root="${mount_dir}/${VM03_HISTORY_DIR}/round-${round}" + + mkdir -p "${root}/alpha" "${root}/beta" + + for index in $(seq 1 6); do + { + printf 'vm=%s round=%s series=alpha file=%02d\n' "${NBD_SERVER_VM03}" "${round}" "${index}" + for line in $(seq 1 48); do + printf 'alpha.%s.%02d.%03d seed=%08d\n' "${round}" "${index}" "${line}" "$((round * index * 100 + line))" + done + } >"${root}/alpha/file-${index}.txt" + done + + for index in $(seq 1 4); do + { + printf 'vm=%s round=%s series=beta file=%02d\n' "${NBD_SERVER_VM03}" "${round}" "${index}" + for line in $(seq 1 64); do + printf 'beta.%s.%02d.%03d token=%08x\n' "${round}" "${index}" "${line}" "$((round * 4096 + index * 256 + line))" + done + } >"${root}/beta/blob-${index}.log" + done +} + modprobe nbd max_part=16 # cleanup any leftover state from previous runs to ensure a clean slate for the integration test cleanup @@ -372,4 +443,46 @@ sync umount "${VM03_MOUNT}" nbd-client -d "${NBD_SERVER_NBD2}" +for round in 1 2 3; do + echo "Attaching ${NBD_SERVER_VM03} for round ${round}" + attach_export "${NBD_SERVER_VM03}" "${NBD_SERVER_NBD2}" + mount "${NBD_SERVER_NBD2}" "${VM03_MOUNT}" + write_vm03_round_files "${VM03_MOUNT}" "${round}" + sync + umount "${VM03_MOUNT}" + nbd-client -d "${NBD_SERVER_NBD2}" + + echo "Snapshotting ${NBD_SERVER_VM03} after round ${round}" + VM03_SNAPSHOT_ID="$( + admin_curl POST "/v1/exports/${NBD_SERVER_VM03}/snapshot" \ + | tee /dev/stderr \ + | jq -r '.snapshot_id' + )" + if [[ -z "${VM03_SNAPSHOT_ID}" || "${VM03_SNAPSHOT_ID}" == "null" ]]; then + echo "Snapshot did not return a snapshot_id for ${NBD_SERVER_VM03} round ${round}" >&2 + exit 1 + fi +done + +echo "Recording full tree manifest for ${NBD_SERVER_VM03}" +attach_export "${NBD_SERVER_VM03}" "${NBD_SERVER_NBD2}" +mount "${NBD_SERVER_NBD2}" "${VM03_MOUNT}" +record_tree_manifest "${VM03_MOUNT}" "${VM03_FULL_MD5_MANIFEST}" +sync +umount "${VM03_MOUNT}" +nbd-client -d "${NBD_SERVER_NBD2}" + +echo "Cloning ${NBD_SERVER_VM04} from ${NBD_SERVER_VM03}@${VM03_SNAPSHOT_ID}" +admin_curl POST /v1/exports/clone \ + "{\"export_id\":\"${NBD_SERVER_VM04}\",\"source_export_id\":\"${NBD_SERVER_VM03}\",\"source_snapshot_id\":\"${VM03_SNAPSHOT_ID}\"}" \ + | jq . + +echo "Attaching ${NBD_SERVER_VM04} to ${NBD_SERVER_NBD3}" +attach_export "${NBD_SERVER_VM04}" "${NBD_SERVER_NBD3}" +mount "${NBD_SERVER_NBD3}" "${VM04_MOUNT}" +verify_tree_manifest "${VM04_MOUNT}" "${VM03_FULL_MD5_MANIFEST}" "${VM04_FULL_MD5_MANIFEST}" "${NBD_SERVER_VM04}" +sync +umount "${VM04_MOUNT}" +nbd-client -d "${NBD_SERVER_NBD3}" + echo "Integration flow completed successfully."