Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 161 additions & 1 deletion crates/typed-store/src/rocks/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use std::{
sync::{Mutex, MutexGuard, OnceLock},
sync::{Arc, Barrier, Mutex, MutexGuard, OnceLock},
time::Duration,
};

Expand Down Expand Up @@ -1143,3 +1143,163 @@ async fn test_optimistic_transaction_conflict_and_retry() {

assert_eq!(db.get(&1).unwrap(), Some("final".to_string()));
}

/// Stress test for the WAL-1187 corruption shape.
///
/// The bad case is not merely "transaction commit succeeds". The bad case is:
/// 1. the transaction observes the pre-merge value and stages a delete,
/// 2. a concurrent merge is assigned an earlier sequence number,
/// 3. the delete is assigned a later sequence number and still commits,
/// 4. a later dependent merge then fails because the earlier merge is hidden by the delete.
///
/// This test models that by using a merge operator where `C` is only valid if a visible `R`
/// precedes it. If we ever reproduce a state where appending `C` fails after a concurrent `R`,
/// the delete has hidden the register merge, which is exactly the shape we care about.
#[tokio::test]
async fn test_optimistic_transaction_concurrent_write_batch_merge_must_not_hide_register() {
let _guard = global_test_lock();

// Tiny state machine used to model the WAL-1187 failure:
// - visible base value "0" means "the row exists and is currently empty/deletable"
// - merge operand "R" means "Register(B)"
// - merge operand "C" means "Certify(B)"
//
// `C` is only valid if a visible `R` came before it. If a committed delete tombstone hides
// `R`, a later read/materialization fails exactly like the real blob-info merge operator.
fn zero_register_certify_merge(
_key: &[u8],
existing_val: Option<&[u8]>,
operands: &rocksdb::MergeOperands,
) -> Option<Vec<u8>> {
let mut state = match existing_val {
None | Some(b"0") => 0,
Some(b"V") => 2,
_ => return None,
};
for op in operands {
match (state, op) {
(0, b"R") => state = 1,
(1, b"C") => state = 2,
_ => return None,
}
}
Some(match state {
0 => vec![b'0'],
1 | 2 => vec![b'V'],
_ => unreachable!(),
})
}

let path = temp_dir();
let cf_name = "merge_cf";

let mut cf_opts = rocksdb::Options::default();
cf_opts.set_merge_operator(
"zero_register_certify_merge",
zero_register_certify_merge,
|_, _, _| None,
);

let db = open_cf_opts_optimistic(&path, None, MetricConf::default(), &[(cf_name, cf_opts)])
.expect("failed to open optimistic db");

let RocksDB::OptimisticTransactionDB(ref wrapper) = *db else {
panic!("expected OptimisticTransactionDB");
};
let underlying = &wrapper.underlying;
const ITERATIONS: usize = 2_000;
let mut hidden_register_failures = 0usize;

for i in 0..ITERATIONS {
let key = format!("counter-{i}").into_bytes();
let cf = underlying.cf_handle(cf_name).expect("cf handle");

// Seed the key with a visible "empty" state. The transaction should only proceed with
// the delete if it observes exactly this value.
underlying.put_cf(&cf, &key, b"0").unwrap();

// Phase 1: both worker threads reach the point where they are ready to race, but have not
// yet attempted the final write.
let ready_barrier = Arc::new(Barrier::new(3));
// Phase 2: release both workers at the same time so the race is between `tx.commit()` and
// the concurrent WriteBatch merge, not between thread startup and setup work.
let release_barrier = Arc::new(Barrier::new(3));

let commit_result: Result<(), rocksdb::Error> = std::thread::scope(|scope| {
let commit_ready_barrier = ready_barrier.clone();
let commit_release_barrier = release_barrier.clone();
let handle = db.as_optimistic().expect("optimistic db");
let commit_key = key.clone();
let commit_thread = scope.spawn(move || {
let commit_cf = underlying.cf_handle(cf_name).expect("cf handle");
let tx = handle.transaction();
// Construct the transaction inside the racing thread so the snapshot/read/check
// happen on the same path as the later commit.
let read = tx
.get_for_update_cf_opt(
&commit_cf,
&commit_key,
false,
&rocksdb::ReadOptions::default(),
)
.unwrap();
assert_eq!(
read.as_deref(),
Some(&b"0"[..]),
"transaction should only delete after observing 0"
);
tx.delete_cf(&commit_cf, &commit_key).unwrap();

// Signal that the transaction has already observed `0` and staged the delete.
commit_ready_barrier.wait();
// Wait for the merge thread to reach the matching point, then race the actual
// commit against the concurrent WriteBatch merge.
commit_release_barrier.wait();
tx.commit()
});

let merge_ready_barrier = ready_barrier.clone();
let merge_release_barrier = release_barrier.clone();
let merge_key = key.clone();
let merge_thread = scope.spawn(move || {
// Do not let the merge run early; first wait until the transaction thread has
// already constructed the transaction and staged the delete.
merge_ready_barrier.wait();
// Race only the final write.
merge_release_barrier.wait();
let merge_cf = underlying.cf_handle(cf_name).expect("cf handle");
let mut batch = rocksdb::WriteBatchWithTransaction::<true>::default();
batch.merge_cf(&merge_cf, &merge_key, b"R");
underlying.write(batch)
});

// Both workers have finished setup.
ready_barrier.wait();
// Start the actual race.
release_barrier.wait();
let merge_result = merge_thread.join().unwrap();
assert!(
merge_result.is_ok(),
"merge batch should succeed on iteration {i}: {merge_result:?}"
);
commit_thread.join().unwrap()
});

// Append the dependent "certify" merge. We only count the bad case:
// - the delete transaction committed, and
// - reading the key now fails because the concurrent "register" merge was hidden behind
// the delete tombstone.
//
// A successful commit on its own is fine if the merge simply landed afterwards.
underlying.merge_cf(&cf, &key, b"C").unwrap();
if commit_result.is_ok() && underlying.get_cf(&cf, &key).is_err() {
hidden_register_failures += 1;
}
}

assert_eq!(
hidden_register_failures, 0,
"concurrent register merge was hidden behind a later committed delete {} times out of {}",
hidden_register_failures, ITERATIONS
);
}
109 changes: 109 additions & 0 deletions crates/walrus-service/src/node/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1605,6 +1605,115 @@ pub(crate) mod tests {
Ok(())
}

#[tokio::test]
async fn concurrent_aggregate_blob_info_delete_must_not_hide_next_register() -> TestResult {
let storage = empty_storage().await;
let storage = storage.as_ref();
let blob_id = BLOB_ID;
let current_epoch = 1;
let mut event_index = 0u64;
const ITERATIONS: usize = 2_000;
let mut hidden_register_failures = 0usize;

for i in 0..ITERATIONS {
let object_a = SuiObjectId([(2 * i + 1) as u8; SuiObjectId::LENGTH]);
let register_a = BlobRegistered {
deletable: true,
..BlobRegistered::for_testing_with_object_id(blob_id, object_a.into())
};
let delete_a = register_a
.clone()
.into_corresponding_deleted_event_for_testing(false);

storage.update_blob_info(event_index, &register_a.clone().into())?;
event_index += 1;
storage.update_blob_info(event_index, &delete_a.into())?;
event_index += 1;

let blob_info = storage
.get_blob_info(&blob_id)?
.expect("aggregate blob info should still exist after delete");
assert!(blob_info.can_blob_info_be_deleted(current_epoch));

let object_b = SuiObjectId([(2 * i + 2) as u8; SuiObjectId::LENGTH]);
let register_b = BlobRegistered {
deletable: true,
..BlobRegistered::for_testing_with_object_id(blob_id, object_b.into())
};

let ready_barrier = Arc::new(std::sync::Barrier::new(3));
let release_barrier = Arc::new(std::sync::Barrier::new(3));

let commit_result: Result<(), rocksdb::Error> = std::thread::scope(|scope| {
let commit_ready_barrier = ready_barrier.clone();
let commit_release_barrier = release_barrier.clone();
let commit_thread = scope.spawn(move || {
let optimistic_handle = storage
.database
.as_optimistic()
.expect("storage test DB should be optimistic");
let transaction = optimistic_handle.transaction();
let blob_info = storage
.blob_info
.get_for_update_in_transaction(&transaction, &blob_id)?
.expect("aggregate blob info should exist");
assert!(
blob_info.can_blob_info_be_deleted(current_epoch),
"cleanup transaction should only proceed after observing deletable state"
);
storage
.blob_info
.delete_in_transaction(&transaction, &blob_id)?;

commit_ready_barrier.wait();
commit_release_barrier.wait();
transaction.commit()
});

let merge_ready_barrier = ready_barrier.clone();
let merge_release_barrier = release_barrier.clone();
let merge_event = register_b.clone();
let merge_index = event_index;
let merge_thread = scope.spawn(move || {
merge_ready_barrier.wait();
merge_release_barrier.wait();
storage.update_blob_info(merge_index, &merge_event.into())
});

ready_barrier.wait();
release_barrier.wait();

let merge_result = merge_thread.join().expect("merge thread should not panic");
assert!(
merge_result.is_ok(),
"register merge should succeed on iteration {i}: {merge_result:?}"
);
commit_thread
.join()
.expect("commit thread should not panic")
});

event_index += 1;

if commit_result.is_ok() && storage.get_blob_info(&blob_id)?.is_none() {
hidden_register_failures += 1;
break;
}

let delete_b = register_b.into_corresponding_deleted_event_for_testing(false);
storage.update_blob_info(event_index, &delete_b.into())?;
event_index += 1;
}

assert_eq!(
hidden_register_failures, 0,
"aggregate register was hidden by a committed delete {} times out of {}",
hidden_register_failures, ITERATIONS
);

Ok(())
}

async_param_test! {
maybe_advance_event_cursor_order -> TestResult: [
in_order: (&[0, 1, 2], &[0, 1, 2]),
Expand Down
Loading