Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
31 changes: 27 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ rcgen = "0.14.7"
reed-solomon-simd = "3.1.0"
regex = "1"
reqwest = { version = "0.12.12", default-features = false, features = ["http2", "json", "rustls-tls"] }
rocksdb = "0.22.0"
rocksdb = { path = "third_party/rocksdb" }
rstest = "0.26.1"
rustls = { version = "0.23.37", default-features = false, features = ["logging", "ring", "tls12"] }
rustls-native-certs = "0.8.3"
Expand Down Expand Up @@ -215,3 +215,7 @@ panic = 'abort'
debug = 1 # Keep debug symbols
inherits = "release"
strip = 'debuginfo' # remove extensive debug info to keep the binary size smaller

[patch.crates-io]
librocksdb-sys = { path = "third_party/librocksdb-sys" }
rocksdb = { path = "third_party/rocksdb" }
4 changes: 3 additions & 1 deletion crates/typed-store/src/rocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use rocksdb::{
IngestExternalFileOptions,
LiveFile,
MultiThreaded,
OccValidationPolicy,
OptimisticTransactionDB,
OptimisticTransactionOptions,
ReadOptions,
Expand Down Expand Up @@ -2420,11 +2421,12 @@ pub fn open_cf_opts_optimistic<P: AsRef<Path>>(
let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?;
sui_macros::nondeterministic!({
let options = prepare_db_options(db_options);
rocksdb::OptimisticTransactionDB::open_cf_descriptors(
rocksdb::OptimisticTransactionDB::open_cf_descriptors_with_validation(
&options,
path,
cfs.into_iter()
.map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)),
OccValidationPolicy::ValidateSerial,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we change this to ValidateParallel, the test will fail as before

)
.map(|db| {
Arc::new(RocksDB::OptimisticTransactionDB(
Expand Down
187 changes: 186 additions & 1 deletion crates/typed-store/src/rocks/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use std::{
sync::{Mutex, MutexGuard, OnceLock},
sync::{Arc, Barrier, Mutex, MutexGuard, OnceLock},
time::Duration,
};

Expand Down Expand Up @@ -1143,3 +1143,188 @@ async fn test_optimistic_transaction_conflict_and_retry() {

assert_eq!(db.get(&1).unwrap(), Some("final".to_string()));
}

/// Stress test for the WAL-1187 corruption shape.
///
/// The bad case is not merely "transaction commit succeeds". The bad case is:
/// 1. the transaction observes the pre-merge value and stages a delete,
/// 2. a concurrent merge is assigned an earlier sequence number,
/// 3. the delete is assigned a later sequence number and still commits,
/// 4. a later dependent merge then fails because the earlier merge is hidden by the delete.
///
/// This test models that by using a merge operator where `C` is only valid if a visible `R`
/// precedes it. If we ever reproduce a state where appending `C` fails after a concurrent `R`,
/// the delete has hidden the register merge, which is exactly the shape we care about.
#[tokio::test]
async fn test_optimistic_transaction_concurrent_write_batch_merge_must_not_hide_register() {
let _guard = global_test_lock();

// Tiny state machine used to model the WAL-1187 failure:
// - visible base value "0" means "the row exists and is currently empty/deletable"
// - merge operand "R" means "Register(B)"
// - merge operand "C" means "Certify(B)"
//
// `C` is only valid if a visible `R` came before it. If a committed delete tombstone hides
// `R`, a later read/materialization fails exactly like the real blob-info merge operator.
fn zero_register_certify_merge(
_key: &[u8],
existing_val: Option<&[u8]>,
operands: &rocksdb::MergeOperands,
) -> Option<Vec<u8>> {
println!("zero_register_certify_merge: {_key:?}, {existing_val:?}");
let mut state = match existing_val {
None | Some(b"0") => 0,
Some(b"RR") => 1,
Some(b"V") => 2,
_ => return None,
};
for op in operands {
println!("op: {:?}, state: {:?}", op, state);
match (state, op) {
(0, b"R") => state = 1,
(1, b"C") => state = 2,
_ => {
println!("returning None: state: {:?}, op: {:?}", state, op);
return None;
}
}
}
Some(match state {
0 => vec![b'0'],
1 | 2 => vec![b'V'],
_ => unreachable!(),
})
}

let path = temp_dir();
let cf_name = "merge_cf";

let mut cf_opts = rocksdb::Options::default();
cf_opts.set_merge_operator(
"zero_register_certify_merge",
zero_register_certify_merge,
|_, _, _| None,
);

let db = open_cf_opts_optimistic(&path, None, MetricConf::default(), &[(cf_name, cf_opts)])
.expect("failed to open optimistic db");

let RocksDB::OptimisticTransactionDB(ref wrapper) = *db else {
panic!("expected OptimisticTransactionDB");
};
let underlying = &wrapper.underlying;
const ITERATIONS: usize = 1000;
let mut hidden_register_failures = 0usize;

for i in 0..ITERATIONS {
let key = format!("counter-{i}").into_bytes();
let cf = underlying.cf_handle(cf_name).expect("cf handle");

// Seed the key with a visible "empty" state. The transaction should only proceed with
// the delete if it observes exactly this value.
underlying.put_cf(&cf, &key, b"0").unwrap();

// Phase 1: both worker threads reach the point where they are ready to race, but have not
// yet attempted the final write.
let ready_barrier = Arc::new(Barrier::new(3));
// Phase 2: release both workers at the same time so the race is between `tx.commit()` and
// the concurrent WriteBatch merge, not between thread startup and setup work.
let release_barrier = Arc::new(Barrier::new(3));

let commit_result: Result<(), rocksdb::Error> = std::thread::scope(|scope| {
let commit_ready_barrier = ready_barrier.clone();
let commit_release_barrier = release_barrier.clone();
let handle = db.as_optimistic().expect("optimistic db");
let commit_key = key.clone();
let commit_thread = scope.spawn(move || {
let commit_cf = underlying.cf_handle(cf_name).expect("cf handle");
let tx = handle.transaction();
// Construct the transaction inside the racing thread so the snapshot/read/check
// happen on the same path as the later commit.
let read = tx
.get_for_update_cf_opt(
&commit_cf,
&commit_key,
false,
&rocksdb::ReadOptions::default(),
)
.unwrap();
assert_eq!(
read.as_deref(),
Some(&b"0"[..]),
"transaction should only delete after observing 0"
);
tx.delete_cf(&commit_cf, &commit_key).unwrap();

// Signal that the transaction has already observed `0` and staged the delete.
commit_ready_barrier.wait();
// Wait for the merge thread to reach the matching point, then race the actual
// commit against the concurrent WriteBatch merge.
commit_release_barrier.wait();
println!("tx.commit()");
let result = tx.commit();
println!("tx.commit() done");
result
});

let merge_ready_barrier = ready_barrier.clone();
let merge_release_barrier = release_barrier.clone();
let merge_key = key.clone();
let merge_thread = scope.spawn(move || {
// Do not let the merge run early; first wait until the transaction thread has
// already constructed the transaction and staged the delete.
merge_ready_barrier.wait();
// Race only the final write.
merge_release_barrier.wait();
let merge_cf = underlying.cf_handle(cf_name).expect("cf handle");
let mut batch = rocksdb::WriteBatchWithTransaction::<true>::default();
batch.merge_cf(&merge_cf, &merge_key, b"R");
// batch.put_cf(&merge_cf, &merge_key, b"RR");

// let tx = handle.transaction();
// tx.merge_cf(&merge_cf, &merge_key, b"R")?;
println!("underlying.write(batch)");
//let result = underlying.merge_cf(&merge_cf, &merge_key, b"R");
let result = underlying.write(batch);
//let result = tx.commit();
println!("underlying.write(batch) done");
// Race only the final write.
// merge_release_barrier.wait();
result
});

// Both workers have finished setup.
ready_barrier.wait();
// Start the actual race.
release_barrier.wait();
let merge_result = merge_thread.join().unwrap();
assert!(
merge_result.is_ok(),
"merge batch should succeed on iteration {i}: {merge_result:?}"
);
commit_thread.join().unwrap()
});

// Append the dependent "certify" merge. We only count the bad case:
// - the delete transaction committed, and
// - reading the key now fails because the concurrent "register" merge was hidden behind
// the delete tombstone.
//
// A successful commit on its own is fine if the merge simply landed afterwards.
underlying.merge_cf(&cf, &key, b"C").unwrap();
println!("commit_result: {:?}", commit_result);
println!(
"underlying.get_cf(&cf, &key): {:?}",
underlying.get_cf(&cf, &key)
);
if commit_result.is_ok() && underlying.get_cf(&cf, &key).is_err() {
hidden_register_failures += 1;
}
}

assert_eq!(
hidden_register_failures, 0,
"concurrent register merge was hidden behind a later committed delete {} times out of {}",
hidden_register_failures, ITERATIONS
);
}
Loading
Loading