Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 116 additions & 20 deletions src/core/engine/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,19 +496,8 @@ impl Export {
.await
}
ReadSource::Manifest(_) => {
self.materialize_all_chunks().await?;
self.cache.sync_data()?;
let snapshot_id = new_snapshot_id();
self.publish_full_snapshot(
next_generation,
snapshot_id.clone(),
JournalOperation::Snapshot,
format!(
"{}/snapshots/{}/base.blob",
self.config.storage.prefix, snapshot_id
),
)
.await
self.publish_clone_sparse_snapshot(next_generation, new_snapshot_id())
.await
}
}
} else {
Expand Down Expand Up @@ -821,6 +810,98 @@ impl Export {
self.stage_manifest(snapshot_id, manifest).await
}

async fn publish_clone_sparse_snapshot(
&self,
generation: u64,
snapshot_id: String,
) -> Result<PreparedPublish> {
let delta_path = self
.config
.cache_dir
.join(format!("snapshot-{snapshot_id}.delta.blob"));
let delta_key = format!(
"{}/snapshots/{}/delta.blob",
self.config.storage.prefix, snapshot_id
);
let manifest_key = manifest_key(&self.config.storage.prefix, &snapshot_id);

let mut delta_file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.open(&delta_path)?;
let mut replacements = BTreeMap::new();
let mut cursor = 0_u64;

for index in 0..self.cache.chunk_count() {
let _chunk_guard = self.chunk_locks[index].lock().await;
let bytes = self.effective_chunk_bytes(index as u64).await?;
if bytes.iter().all(|byte| *byte == 0) {
continue;
}

let logical_len = bytes.len() as u64;
delta_file.write_all(&bytes)?;
replacements.insert(
index as u64,
ReplacementChunk {
object_offset: cursor,
logical_len,
checksum: blake3::hash(&bytes).to_hex().to_string(),
},
);
cursor += logical_len;
}
delta_file.sync_all()?;

if replacements.is_empty() {
remove_file(&delta_path).ok();
let manifest = Manifest::empty(
self.config.export_id.clone(),
generation,
self.cache.image_size(),
self.cache.chunk_size(),
)?;
return self.stage_manifest(snapshot_id, manifest).await;
}

JournalRecord {
version: 1,
operation: JournalOperation::Snapshot,
generation,
staging_path: Some(delta_path.display().to_string()),
object_key: delta_key.clone(),
manifest_key: manifest_key.clone(),
}
.persist(&self.journal_path)?;

tracing::info!(
generation,
packed_chunks = replacements.len(),
packed_bytes = cursor,
object_key = %delta_key,
"starting clone sparse snapshot upload"
);
self.remote.put_file(&delta_key, &delta_path).await?;
tracing::info!(
generation,
packed_chunks = replacements.len(),
packed_bytes = cursor,
object_key = %delta_key,
"finished clone sparse snapshot upload"
);

let manifest = Manifest::empty(
self.config.export_id.clone(),
generation,
self.cache.image_size(),
self.cache.chunk_size(),
)?
.with_new_ref(generation, delta_key, replacements)?;
remove_file(&delta_path).ok();
self.stage_manifest(snapshot_id, manifest).await
}

async fn stage_manifest(
&self,
snapshot_id: String,
Expand Down Expand Up @@ -939,6 +1020,17 @@ impl Export {
}
}

async fn effective_chunk_bytes(&self, index: u64) -> Result<Vec<u8>> {
let logical_len = chunk_len(self.cache.image_size(), self.cache.chunk_size(), index);
if self.cache.is_resident(index as usize) {
return self.cache.read_exact_at(
chunk_offset(self.cache.chunk_size(), index),
logical_len as usize,
);
}
self.fetch_chunk_bytes(index).await
}

fn validate_range(&self, offset: u64, len: u64) -> Result<()> {
let size = self.cache.image_size();
if offset.checked_add(len).is_none() || offset + len > size {
Expand Down Expand Up @@ -1394,7 +1486,7 @@ mod tests {
}

#[tokio::test]
async fn first_clone_snapshot_uploads_full_base_blob_and_cuts_over_to_target_lineage() {
async fn first_clone_snapshot_publishes_packed_sparse_data_and_cuts_over_to_target_lineage() {
let dir = tempdir().unwrap();
let remote = Arc::new(MemoryRemote::default());
remote
Expand Down Expand Up @@ -1446,18 +1538,22 @@ mod tests {
.unwrap();
let manifest: serde_json::Value = serde_json::from_slice(&manifest).unwrap();
assert_eq!(manifest["generation"], 1);
assert_eq!(manifest["base_ref"], 1);
assert_eq!(
manifest["refs"][0]["path"],
format!("exports/clone/snapshots/{snapshot_id}/base.blob")
);
assert_eq!(manifest["base_ref"], serde_json::Value::Null);
assert_eq!(manifest["entries"].as_array().unwrap().len(), 2);
assert_eq!(
remote
.get_object(&format!("exports/clone/snapshots/{snapshot_id}/base.blob"))
.get_object(&format!("exports/clone/snapshots/{snapshot_id}/delta.blob"))
.await
.unwrap(),
Bytes::from_static(b"WXYZefgh")
);
assert!(
!remote
.objects
.lock()
.unwrap()
.contains_key(&format!("exports/clone/snapshots/{snapshot_id}/base.blob"))
);
assert!(
!dir.path()
.join("clone-cache")
Expand Down
Loading
Loading