Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion src/persist-client/src/internal/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,12 @@ impl<T: Timestamp + Lattice + Codec64> SpineBatch<T> {
});
new_parts.extend_from_slice(&parts[range.end..]);

let res = if range.len() == parts.len() {
ApplyMergeResult::AppliedExact
} else {
ApplyMergeResult::AppliedSubset
};

let new_spine_batch = SpineBatch {
id: *id,
desc: desc.to_owned(),
Expand All @@ -1462,7 +1468,7 @@ impl<T: Timestamp + Lattice + Codec64> SpineBatch<T> {
}

*self = new_spine_batch;
ApplyMergeResult::AppliedSubset
res
}
}

Expand Down Expand Up @@ -2574,4 +2580,57 @@ pub(crate) mod tests {
prop_assert!(new_batch.run_meta.len() == batch.run_meta.len() - runs.len() + to_replace.run_meta.len());
});
}

#[mz_ore::test]
fn test_perform_subset_replacement() {
let batch1 = crate::internal::state::tests::hollow::<u64>(0, 10, &["a"], 10);
let batch2 = crate::internal::state::tests::hollow::<u64>(10, 20, &["b"], 10);
let batch3 = crate::internal::state::tests::hollow::<u64>(20, 30, &["c"], 10);

let id_batch1 = IdHollowBatch {
id: SpineId(0, 1),
batch: Arc::new(batch1.clone()),
};
let id_batch2 = IdHollowBatch {
id: SpineId(1, 2),
batch: Arc::new(batch2.clone()),
};
let id_batch3 = IdHollowBatch {
id: SpineId(2, 3),
batch: Arc::new(batch3.clone()),
};

let spine_batch = SpineBatch {
id: SpineId(0, 3),
desc: Description::new(
Antichain::from_elem(0),
Antichain::from_elem(30),
Antichain::from_elem(0),
),
parts: vec![id_batch1, id_batch2, id_batch3],
active_compaction: None,
len: 30,
};

let res_exact = crate::internal::state::tests::hollow::<u64>(0, 30, &["d"], 30);
let mut sb_exact = spine_batch.clone();
let result = sb_exact.perform_subset_replacement(&res_exact, SpineId(0, 3), 0..3, None);
assert!(matches!(result, ApplyMergeResult::AppliedExact));
assert_eq!(sb_exact.parts.len(), 1);
assert_eq!(sb_exact.len(), 30);

let res_subset = crate::internal::state::tests::hollow::<u64>(0, 20, &["e"], 20);
let mut sb_subset = spine_batch.clone();
let result = sb_subset.perform_subset_replacement(&res_subset, SpineId(0, 2), 0..2, None);
assert!(matches!(result, ApplyMergeResult::AppliedSubset));
assert_eq!(sb_subset.parts.len(), 2); // One new part + one old part
assert_eq!(sb_subset.len(), 30);

let res_too_big = crate::internal::state::tests::hollow::<u64>(0, 30, &["f"], 31);
let mut sb_too_big = spine_batch.clone();
let result = sb_too_big.perform_subset_replacement(&res_too_big, SpineId(0, 3), 0..3, None);
assert!(matches!(result, ApplyMergeResult::NotAppliedTooManyUpdates));
assert_eq!(sb_too_big.parts.len(), 3);
assert_eq!(sb_too_big.len(), 30);
}
}