diff --git a/src/core/engine/export.rs b/src/core/engine/export.rs index 334f5d0..3c1aef3 100644 --- a/src/core/engine/export.rs +++ b/src/core/engine/export.rs @@ -496,19 +496,8 @@ impl Export { .await } ReadSource::Manifest(_) => { - self.materialize_all_chunks().await?; - self.cache.sync_data()?; - let snapshot_id = new_snapshot_id(); - self.publish_full_snapshot( - next_generation, - snapshot_id.clone(), - JournalOperation::Snapshot, - format!( - "{}/snapshots/{}/base.blob", - self.config.storage.prefix, snapshot_id - ), - ) - .await + self.publish_clone_sparse_snapshot(next_generation, new_snapshot_id()) + .await } } } else { @@ -821,6 +810,98 @@ impl Export { self.stage_manifest(snapshot_id, manifest).await } + async fn publish_clone_sparse_snapshot( + &self, + generation: u64, + snapshot_id: String, + ) -> Result { + let delta_path = self + .config + .cache_dir + .join(format!("snapshot-{snapshot_id}.delta.blob")); + let delta_key = format!( + "{}/snapshots/{}/delta.blob", + self.config.storage.prefix, snapshot_id + ); + let manifest_key = manifest_key(&self.config.storage.prefix, &snapshot_id); + + let mut delta_file = OpenOptions::new() + .create(true) + .truncate(true) + .write(true) + .open(&delta_path)?; + let mut replacements = BTreeMap::new(); + let mut cursor = 0_u64; + + for index in 0..self.cache.chunk_count() { + let _chunk_guard = self.chunk_locks[index].lock().await; + let bytes = self.effective_chunk_bytes(index as u64).await?; + if bytes.iter().all(|byte| *byte == 0) { + continue; + } + + let logical_len = bytes.len() as u64; + delta_file.write_all(&bytes)?; + replacements.insert( + index as u64, + ReplacementChunk { + object_offset: cursor, + logical_len, + checksum: blake3::hash(&bytes).to_hex().to_string(), + }, + ); + cursor += logical_len; + } + delta_file.sync_all()?; + + if replacements.is_empty() { + remove_file(&delta_path).ok(); + let manifest = Manifest::empty( + self.config.export_id.clone(), + generation, + self.cache.image_size(), + self.cache.chunk_size(), + )?; + return self.stage_manifest(snapshot_id, manifest).await; + } + + JournalRecord { + version: 1, + operation: JournalOperation::Snapshot, + generation, + staging_path: Some(delta_path.display().to_string()), + object_key: delta_key.clone(), + manifest_key: manifest_key.clone(), + } + .persist(&self.journal_path)?; + + tracing::info!( + generation, + packed_chunks = replacements.len(), + packed_bytes = cursor, + object_key = %delta_key, + "starting clone sparse snapshot upload" + ); + self.remote.put_file(&delta_key, &delta_path).await?; + tracing::info!( + generation, + packed_chunks = replacements.len(), + packed_bytes = cursor, + object_key = %delta_key, + "finished clone sparse snapshot upload" + ); + + let manifest = Manifest::empty( + self.config.export_id.clone(), + generation, + self.cache.image_size(), + self.cache.chunk_size(), + )? + .with_new_ref(generation, delta_key, replacements)?; + remove_file(&delta_path).ok(); + self.stage_manifest(snapshot_id, manifest).await + } + async fn stage_manifest( &self, snapshot_id: String, @@ -939,6 +1020,17 @@ impl Export { } } + async fn effective_chunk_bytes(&self, index: u64) -> Result> { + let logical_len = chunk_len(self.cache.image_size(), self.cache.chunk_size(), index); + if self.cache.is_resident(index as usize) { + return self.cache.read_exact_at( + chunk_offset(self.cache.chunk_size(), index), + logical_len as usize, + ); + } + self.fetch_chunk_bytes(index).await + } + fn validate_range(&self, offset: u64, len: u64) -> Result<()> { let size = self.cache.image_size(); if offset.checked_add(len).is_none() || offset + len > size { @@ -1394,7 +1486,7 @@ mod tests { } #[tokio::test] - async fn first_clone_snapshot_uploads_full_base_blob_and_cuts_over_to_target_lineage() { + async fn first_clone_snapshot_publishes_packed_sparse_data_and_cuts_over_to_target_lineage() { let dir = tempdir().unwrap(); let remote = Arc::new(MemoryRemote::default()); remote @@ -1446,18 +1538,22 @@ mod tests { .unwrap(); let manifest: serde_json::Value = serde_json::from_slice(&manifest).unwrap(); assert_eq!(manifest["generation"], 1); - assert_eq!(manifest["base_ref"], 1); - assert_eq!( - manifest["refs"][0]["path"], - format!("exports/clone/snapshots/{snapshot_id}/base.blob") - ); + assert_eq!(manifest["base_ref"], serde_json::Value::Null); + assert_eq!(manifest["entries"].as_array().unwrap().len(), 2); assert_eq!( remote - .get_object(&format!("exports/clone/snapshots/{snapshot_id}/base.blob")) + .get_object(&format!("exports/clone/snapshots/{snapshot_id}/delta.blob")) .await .unwrap(), Bytes::from_static(b"WXYZefgh") ); + assert!( + !remote + .objects + .lock() + .unwrap() + .contains_key(&format!("exports/clone/snapshots/{snapshot_id}/base.blob")) + ); assert!( !dir.path() .join("clone-cache") diff --git a/tests/intergration/run_test.sh b/tests/intergration/run_test.sh index c094f7a..8539922 100755 --- a/tests/intergration/run_test.sh +++ b/tests/intergration/run_test.sh @@ -17,6 +17,7 @@ required_tools=( curl jq fio + md5sum mkfs.ext4 mount umount @@ -46,18 +47,24 @@ REPO_ROOT="$(cd "${SCRIPT_DIR}/../.." && pwd)" : "${NBD_SERVER_VM01_SIZE:=1073741824}" : "${NBD_SERVER_VM01:=vm01}" : "${NBD_SERVER_VM02:=vm02}" +: "${NBD_SERVER_VM03:=vm03}" : "${NBD_SERVER_NBD0:=/dev/nbd0}" : "${NBD_SERVER_NBD1:=/dev/nbd1}" +: "${NBD_SERVER_NBD2:=/dev/nbd2}" : "${NBD_SERVER_MOUNT_ROOT:=/var/tmp/nbd-server-it-mnt}" SERVER_LOG="${NBD_SERVER_MOUNT_ROOT}/server.log" VM01_MOUNT="${NBD_SERVER_MOUNT_ROOT}/${NBD_SERVER_VM01}" VM02_MOUNT="${NBD_SERVER_MOUNT_ROOT}/${NBD_SERVER_VM02}" +VM03_MOUNT="${NBD_SERVER_MOUNT_ROOT}/${NBD_SERVER_VM03}" SERVER_PID="" VM01_SNAPSHOT_ID="" +VM02_SNAPSHOT_ID="" +VM02_FIXTURE_DIR="agent-fixture-tree" +VM02_FIXTURE_MD5_MANIFEST="${NBD_SERVER_MOUNT_ROOT}/vm02-fixture.md5" mkdir -p "${NBD_SERVER_MOUNT_ROOT}" -mkdir -p "${VM01_MOUNT}" "${VM02_MOUNT}" +mkdir -p "${VM01_MOUNT}" "${VM02_MOUNT}" "${VM03_MOUNT}" mkdir -p "${NBD_SERVER_CACHE_ROOT}" cleanup() { @@ -69,7 +76,11 @@ cleanup() { if mountpoint -q "${VM01_MOUNT}"; then umount "${VM01_MOUNT}" fi + if mountpoint -q "${VM03_MOUNT}"; then + umount "${VM03_MOUNT}" + fi + nbd-client -d "${NBD_SERVER_NBD2}" >/dev/null 2>&1 || true nbd-client -d "${NBD_SERVER_NBD1}" >/dev/null 2>&1 || true nbd-client -d "${NBD_SERVER_NBD0}" >/dev/null 2>&1 || true @@ -79,6 +90,11 @@ cleanup() { fi rm -f "${NBD_SERVER_ADMIN_SOCK}" + rm -f "${VM02_FIXTURE_MD5_MANIFEST}" "${NBD_SERVER_MOUNT_ROOT}/vm03-fixture.md5" + rm -rf \ + "${NBD_SERVER_CACHE_ROOT}/${NBD_SERVER_VM01}" \ + "${NBD_SERVER_CACHE_ROOT}/${NBD_SERVER_VM02}" \ + "${NBD_SERVER_CACHE_ROOT}/${NBD_SERVER_VM03}" } trap cleanup EXIT @@ -87,20 +103,41 @@ admin_curl() { local method="$1" local path="$2" local body="${3:-}" + local response_body + local http_code + + response_body="$(mktemp)" if [[ -n "${body}" ]]; then - curl --fail --silent --show-error \ - --unix-socket "${NBD_SERVER_ADMIN_SOCK}" \ - -X "${method}" \ - -H 'content-type: application/json' \ - -d "${body}" \ - "http://localhost${path}" + http_code="$( + curl --silent --show-error \ + --output "${response_body}" \ + --write-out '%{http_code}' \ + --unix-socket "${NBD_SERVER_ADMIN_SOCK}" \ + -X "${method}" \ + -H 'content-type: application/json' \ + -d "${body}" \ + "http://localhost${path}" + )" else - curl --fail --silent --show-error \ - --unix-socket "${NBD_SERVER_ADMIN_SOCK}" \ - -X "${method}" \ - "http://localhost${path}" + http_code="$( + curl --silent --show-error \ + --output "${response_body}" \ + --write-out '%{http_code}' \ + --unix-socket "${NBD_SERVER_ADMIN_SOCK}" \ + -X "${method}" \ + "http://localhost${path}" + )" fi + + if [[ "${http_code}" -ge 400 ]]; then + cat "${response_body}" >&2 + rm -f "${response_body}" + return 22 + fi + + cat "${response_body}" + rm -f "${response_body}" } wait_for_admin() { @@ -166,6 +203,93 @@ run_fio_job() { --time_based=0 } +write_fixture_tree() { + local mount_dir="$1" + local root="${mount_dir}/${VM02_FIXTURE_DIR}" + + rm -rf "${root}" + mkdir -p \ + "${root}/alpha" \ + "${root}/alpha/nested" \ + "${root}/beta" \ + "${root}/gamma/deep" + + for index in $(seq 1 24); do + { + printf 'fixture export=%s clone=%s file=%02d\n' \ + "${NBD_SERVER_VM02}" \ + "${NBD_SERVER_VM03}" \ + "${index}" + for line in $(seq 1 128); do + printf 'line=%03d value=%08d\n' "${line}" "$((index * 1000 + line))" + done + } >"${root}/alpha/file-${index}.txt" + done + + for index in $(seq 1 12); do + { + printf 'nested export=%s index=%02d\n' "${NBD_SERVER_VM02}" "${index}" + for line in $(seq 1 96); do + printf 'payload=%02d-%03d-%s\n' "${index}" "${line}" "${NBD_SERVER_VM01}" + done + } >"${root}/alpha/nested/blob-${index}.log" + done + + for index in $(seq 1 16); do + { + printf 'beta export=%s source=%s idx=%02d\n' \ + "${NBD_SERVER_VM02}" \ + "${NBD_SERVER_VM01}" \ + "${index}" + for line in $(seq 1 80); do + printf 'record=%02d/%03d checksum-seed=%08d\n' "${index}" "${line}" "$((index * line))" + done + } >"${root}/beta/data-${index}.cfg" + done + + for index in $(seq 1 8); do + { + printf 'gamma export=%s clone=%s idx=%02d\n' \ + "${NBD_SERVER_VM02}" \ + "${NBD_SERVER_VM03}" \ + "${index}" + for line in $(seq 1 160); do + printf 'entry=%02d.%03d token=%08x\n' "${index}" "${line}" "$((index * 4096 + line))" + done + } >"${root}/gamma/deep/object-${index}.dat" + done + + ( + cd "${root}" + find . -type f -print0 \ + | sort -z \ + | xargs -0 md5sum + ) >"${VM02_FIXTURE_MD5_MANIFEST}" + sync +} + +verify_fixture_tree() { + local mount_dir="$1" + local root="${mount_dir}/${VM02_FIXTURE_DIR}" + local actual_manifest="${NBD_SERVER_MOUNT_ROOT}/vm03-fixture.md5" + + [[ -d "${root}" ]] + find "${root}" -type f -exec cat {} + >/dev/null + + ( + cd "${root}" + find . -type f -print0 \ + | sort -z \ + | xargs -0 md5sum + ) >"${actual_manifest}" + + if ! diff -u "${VM02_FIXTURE_MD5_MANIFEST}" "${actual_manifest}"; then + echo "Fixture directory checksum manifest mismatch" >&2 + return 1 + fi + echo "Directory checksum verification succeeded" +} + modprobe nbd max_part=16 # cleanup any leftover state from previous runs to ensure a clean slate for the integration test cleanup @@ -219,11 +343,33 @@ echo "Attaching ${NBD_SERVER_VM02} to ${NBD_SERVER_NBD1}" attach_export "${NBD_SERVER_VM02}" "${NBD_SERVER_NBD1}" mount "${NBD_SERVER_NBD1}" "${VM02_MOUNT}" run_fio_job "${VM02_MOUNT}" "${NBD_SERVER_VM02}-clone" +write_fixture_tree "${VM02_MOUNT}" sync umount "${VM02_MOUNT}" nbd-client -d "${NBD_SERVER_NBD1}" echo "Snapshotting ${NBD_SERVER_VM02}" -admin_curl POST "/v1/exports/${NBD_SERVER_VM02}/snapshot" | jq . +VM02_SNAPSHOT_ID="$( + admin_curl POST "/v1/exports/${NBD_SERVER_VM02}/snapshot" \ + | tee /dev/stderr \ + | jq -r '.snapshot_id' +)" +if [[ -z "${VM02_SNAPSHOT_ID}" || "${VM02_SNAPSHOT_ID}" == "null" ]]; then + echo "Snapshot did not return a snapshot_id for ${NBD_SERVER_VM02}" >&2 + exit 1 +fi + +echo "Cloning ${NBD_SERVER_VM03} from ${NBD_SERVER_VM02}@${VM02_SNAPSHOT_ID}" +admin_curl POST /v1/exports/clone \ + "{\"export_id\":\"${NBD_SERVER_VM03}\",\"source_export_id\":\"${NBD_SERVER_VM02}\",\"source_snapshot_id\":\"${VM02_SNAPSHOT_ID}\"}" \ + | jq . + +echo "Attaching ${NBD_SERVER_VM03} to ${NBD_SERVER_NBD2}" +attach_export "${NBD_SERVER_VM03}" "${NBD_SERVER_NBD2}" +mount "${NBD_SERVER_NBD2}" "${VM03_MOUNT}" +verify_fixture_tree "${VM03_MOUNT}" +sync +umount "${VM03_MOUNT}" +nbd-client -d "${NBD_SERVER_NBD2}" echo "Integration flow completed successfully."