diff --git a/src/persist-client/src/internal/trace.rs b/src/persist-client/src/internal/trace.rs index 00c06b9c62871..20d47847d142b 100644 --- a/src/persist-client/src/internal/trace.rs +++ b/src/persist-client/src/internal/trace.rs @@ -1449,6 +1449,12 @@ impl SpineBatch { }); new_parts.extend_from_slice(&parts[range.end..]); + let res = if range.len() == parts.len() { + ApplyMergeResult::AppliedExact + } else { + ApplyMergeResult::AppliedSubset + }; + let new_spine_batch = SpineBatch { id: *id, desc: desc.to_owned(), @@ -1462,7 +1468,7 @@ impl SpineBatch { } *self = new_spine_batch; - ApplyMergeResult::AppliedSubset + res } } @@ -2574,4 +2580,57 @@ pub(crate) mod tests { prop_assert!(new_batch.run_meta.len() == batch.run_meta.len() - runs.len() + to_replace.run_meta.len()); }); } + + #[mz_ore::test] + fn test_perform_subset_replacement() { + let batch1 = crate::internal::state::tests::hollow::(0, 10, &["a"], 10); + let batch2 = crate::internal::state::tests::hollow::(10, 20, &["b"], 10); + let batch3 = crate::internal::state::tests::hollow::(20, 30, &["c"], 10); + + let id_batch1 = IdHollowBatch { + id: SpineId(0, 1), + batch: Arc::new(batch1.clone()), + }; + let id_batch2 = IdHollowBatch { + id: SpineId(1, 2), + batch: Arc::new(batch2.clone()), + }; + let id_batch3 = IdHollowBatch { + id: SpineId(2, 3), + batch: Arc::new(batch3.clone()), + }; + + let spine_batch = SpineBatch { + id: SpineId(0, 3), + desc: Description::new( + Antichain::from_elem(0), + Antichain::from_elem(30), + Antichain::from_elem(0), + ), + parts: vec![id_batch1, id_batch2, id_batch3], + active_compaction: None, + len: 30, + }; + + let res_exact = crate::internal::state::tests::hollow::(0, 30, &["d"], 30); + let mut sb_exact = spine_batch.clone(); + let result = sb_exact.perform_subset_replacement(&res_exact, SpineId(0, 3), 0..3, None); + assert!(matches!(result, ApplyMergeResult::AppliedExact)); + assert_eq!(sb_exact.parts.len(), 1); + assert_eq!(sb_exact.len(), 30); + + let res_subset = crate::internal::state::tests::hollow::(0, 20, &["e"], 20); + let mut sb_subset = spine_batch.clone(); + let result = sb_subset.perform_subset_replacement(&res_subset, SpineId(0, 2), 0..2, None); + assert!(matches!(result, ApplyMergeResult::AppliedSubset)); + assert_eq!(sb_subset.parts.len(), 2); // One new part + one old part + assert_eq!(sb_subset.len(), 30); + + let res_too_big = crate::internal::state::tests::hollow::(0, 30, &["f"], 31); + let mut sb_too_big = spine_batch.clone(); + let result = sb_too_big.perform_subset_replacement(&res_too_big, SpineId(0, 3), 0..3, None); + assert!(matches!(result, ApplyMergeResult::NotAppliedTooManyUpdates)); + assert_eq!(sb_too_big.parts.len(), 3); + assert_eq!(sb_too_big.len(), 30); + } }