diff --git a/crates/typed-store/src/column_family_handle.rs b/crates/typed-store/src/column_family_handle.rs index 5c51ec23df..5c4d0c94db 100644 --- a/crates/typed-store/src/column_family_handle.rs +++ b/crates/typed-store/src/column_family_handle.rs @@ -9,7 +9,7 @@ use std::{marker::PhantomData, sync::Arc}; -use rocksdb::{Direction, OptimisticTransactionDB}; +use rocksdb::Direction; use serde::{Deserialize, Serialize}; use crate::{TypedStoreError, rocks::RocksDB}; @@ -69,12 +69,15 @@ where } /// Puts a key-value pair within a transaction. - pub fn put_cf_with_txn( + pub fn put_cf_with_txn( &self, - txn: &rocksdb::Transaction<'_, OptimisticTransactionDB>, + txn: &rocksdb::Transaction<'_, DB>, key: &K, value: &V, - ) -> Result<(), TypedStoreError> { + ) -> Result<(), TypedStoreError> + where + DB: Sized, + { let key_bytes = key.serialize()?; let value_bytes = bcs::to_bytes(value).map_err(|e| TypedStoreError::SerializationError(e.to_string()))?; @@ -85,11 +88,14 @@ where } /// Deletes a key within a transaction. - pub fn delete_cf_with_txn( + pub fn delete_cf_with_txn( &self, - txn: &rocksdb::Transaction<'_, OptimisticTransactionDB>, + txn: &rocksdb::Transaction<'_, DB>, key: &K, - ) -> Result<(), TypedStoreError> { + ) -> Result<(), TypedStoreError> + where + DB: Sized, + { let key_bytes = key.serialize()?; let cf = self.cf()?; @@ -99,11 +105,14 @@ where /// Gets a value for update within a transaction, the commit will fail if the value is updated /// outside the transaction. - pub fn get_for_update_cf( + pub fn get_for_update_cf( &self, - txn: &rocksdb::Transaction<'_, OptimisticTransactionDB>, + txn: &rocksdb::Transaction<'_, DB>, key: &K, - ) -> Result, TypedStoreError> { + ) -> Result, TypedStoreError> + where + DB: Sized, + { let key_bytes = key.serialize()?; let cf = self.cf()?; @@ -122,11 +131,14 @@ where } /// Gets a value within a transaction. - pub fn get_cf_with_txn( + pub fn get_cf_with_txn( &self, - txn: &rocksdb::Transaction<'_, OptimisticTransactionDB>, + txn: &rocksdb::Transaction<'_, DB>, key: &K, - ) -> Result, TypedStoreError> { + ) -> Result, TypedStoreError> + where + DB: Sized, + { let key_bytes = key.serialize()?; let cf = self.cf()?; @@ -147,14 +159,17 @@ where /// Reads a range of key-value pairs within a transaction using range bounds. /// Returns key-value pairs in the range [begin, end). /// - pub fn read_range_with_txn( + pub fn read_range_with_txn( &self, - txn: &rocksdb::Transaction<'_, OptimisticTransactionDB>, + txn: &rocksdb::Transaction<'_, DB>, begin: Vec, end: Vec, row_limit: usize, reverse: bool, - ) -> Result, TypedStoreError> { + ) -> Result, TypedStoreError> + where + DB: Sized, + { let mut read_opts = rocksdb::ReadOptions::default(); read_opts.set_iterate_lower_bound(begin.clone()); read_opts.set_iterate_upper_bound(end.clone()); @@ -227,7 +242,13 @@ mod tests { use tempfile::TempDir; use super::*; - use crate::rocks::{MetricConf, ReadWriteOptions, RocksDB, open_cf_opts_optimistic}; + use crate::rocks::{ + MetricConf, + ReadWriteOptions, + RocksDB, + open_cf_opts_optimistic, + open_cf_opts_transaction, + }; /// Result type for opening a database with column family handles. /// Returns `(Arc, Vec>)` on success. @@ -272,6 +293,24 @@ mod tests { Ok((rocksdb, handles)) } + /// Opens a lock-based transaction database with column family handles. + fn open_cf_handle_opts_transaction( + path: P, + db_options: Option, + metric_conf: MetricConf, + cf_options: &[(&str, rocksdb::Options)], + ) -> OpenCFHandleResult + where + P: AsRef, + K: KeyCodec, + V: Serialize + for<'de> Deserialize<'de> + Clone, + { + let rocksdb = open_cf_opts_transaction(path, db_options, metric_conf, cf_options)?; + let cf_names: Vec<&str> = cf_options.iter().map(|(name, _)| *name).collect(); + let handles = create_cf_handles(&rocksdb, &cf_names, &ReadWriteOptions::default())?; + Ok((rocksdb, handles)) + } + /// Helper function to create ColumnFamilyHandles from an existing optimistic /// transaction database. /// @@ -368,7 +407,9 @@ mod tests { } } - fn create_test_db() -> ( + fn create_test_db( + use_transaction_db: bool, + ) -> ( TempDir, Arc, Vec>, @@ -376,7 +417,12 @@ mod tests { let temp_dir = TempDir::new().unwrap(); let db_path = temp_dir.path().join("test_db"); - let (db, maps) = open_cf_handle_opts_optimistic::<_, TestKey, TestValue>( + let open_fn = if use_transaction_db { + open_cf_handle_opts_transaction::<_, TestKey, TestValue> + } else { + open_cf_handle_opts_optimistic::<_, TestKey, TestValue> + }; + let (db, maps) = open_fn( db_path, None, MetricConf::default(), @@ -390,149 +436,196 @@ mod tests { #[tokio::test] async fn test_transaction_operations() { crate::metrics::DBMetrics::init(&Registry::default()); - let (_temp_dir, db, maps) = create_test_db(); - - let map = &maps[0]; - - // Get transaction handle - let handle = db.as_optimistic().expect("should be optimistic DB"); - - // Test transaction insert and get - let key = TestKey { - prefix: "txn".to_string(), - id: 1, - }; - let value = TestValue { - data: "transaction test".to_string(), - count: 99, - }; - - // Successful transaction - { - let txn = handle.transaction(); - - map.put_cf_with_txn(&txn, &key, &value) - .expect("failed to put in transaction"); - txn.commit().expect("failed to commit transaction"); - } - - { - let txn = handle.transaction(); - - let retrieved = map - .get_cf_with_txn(&txn, &key) - .expect("failed to get in transaction"); - assert_eq!(retrieved, Some(value.clone())); - txn.commit().expect("failed to commit transaction"); + for use_transaction_db in [false, true] { + let (_temp_dir, db, maps) = create_test_db(use_transaction_db); + let map = &maps[0]; + let key = TestKey { + prefix: "txn".to_string(), + id: 1, + }; + let value = TestValue { + data: "transaction test".to_string(), + count: 99, + }; + + match db.as_transactional().expect("should support transactions") { + crate::rocks::TransactionalHandle::Optimistic(handle) => { + let txn = handle.transaction(); + map.put_cf_with_txn(&txn, &key, &value) + .expect("failed to put in transaction"); + txn.commit().expect("failed to commit transaction"); + + let txn = handle.transaction(); + let retrieved = map + .get_cf_with_txn(&txn, &key) + .expect("failed to get in transaction"); + assert_eq!(retrieved, Some(value.clone())); + txn.commit().expect("failed to commit transaction"); + } + crate::rocks::TransactionalHandle::Transaction(handle) => { + let txn = handle.transaction(); + map.put_cf_with_txn(&txn, &key, &value) + .expect("failed to put in transaction"); + txn.commit().expect("failed to commit transaction"); + + let txn = handle.transaction(); + let retrieved = map + .get_cf_with_txn(&txn, &key) + .expect("failed to get in transaction"); + assert_eq!(retrieved, Some(value.clone())); + txn.commit().expect("failed to commit transaction"); + } + } } } #[tokio::test] async fn test_transaction_range_operations() { crate::metrics::DBMetrics::init(&Registry::default()); - let (_temp_dir, db, maps) = create_test_db(); - - let map = &maps[0]; - let handle = db.as_optimistic().expect("should be optimistic DB"); - - let prefix = "range_test"; - let mut kvs = Vec::new(); - for i in 1..=5 { - let key = TestKey::new(prefix.to_string(), i); - let value = TestValue::new(format!("value_{}", i), i as u32 * 10); - kvs.push((key, value)); - } - - { - let txn = handle.transaction(); - - for (key, value) in kvs.iter() { - map.put_cf_with_txn(&txn, key, value) - .expect("failed to put in transaction"); - } - - let prefix2 = "range_tesu"; - for i in 1..=3 { - let key = TestKey::new(prefix2.to_string(), i); - let value = TestValue::new(format!("other_{}", i), i as u32 * 100); - map.put_cf_with_txn(&txn, &key, &value) - .expect("failed to put in transaction"); - } - - let prefix3 = "range_tess"; - for i in 1..=3 { - let key = TestKey::new(prefix3.to_string(), i); - let value = TestValue::new(format!("other_{}", i), i as u32 * 100); - map.put_cf_with_txn(&txn, &key, &value) - .expect("failed to put in transaction"); - } - - txn.commit().expect("failed to commit transaction"); - } - - // Create prefix bytes for "range_test/" to match our key serialization format - let mut begin = prefix.as_bytes().to_vec(); - begin.push(b'/'); - let mut end = prefix.as_bytes().to_vec(); - end.push(b'0'); - - { - let txn = handle.transaction(); - - let results_with_limit: Vec<(TestKey, TestValue)> = map - .read_range_with_txn(&txn, begin.clone(), end.clone(), 3, false) - .expect("failed to read range"); - - assert_eq!(results_with_limit.len(), 3); - - let results_with_limit_reverse: Vec<(TestKey, TestValue)> = map - .read_range_with_txn(&txn, begin.clone(), end.clone(), 3, true) - .expect("failed to read range"); - - assert_eq!(results_with_limit_reverse.len(), 3); - - for (i, (key, value)) in results_with_limit_reverse.iter().enumerate() { - assert_eq!(key, &kvs[kvs.len() - i - 1].0); - assert_eq!(value, &kvs[kvs.len() - i - 1].1); - } - - let results_without_limit: Vec<(TestKey, TestValue)> = map - .read_range_with_txn(&txn, begin.clone(), end.clone(), 100, false) - .expect("failed to read range"); - - assert_eq!(results_without_limit.len(), kvs.len()); - for (i, (key, value)) in results_without_limit.iter().enumerate() { - assert_eq!(key, &kvs[i].0); - assert_eq!(value, &kvs[i].1); + for use_transaction_db in [false, true] { + let (_temp_dir, db, maps) = create_test_db(use_transaction_db); + let map = &maps[0]; + let prefix = "range_test"; + let mut kvs = Vec::new(); + for i in 1..=5 { + let key = TestKey::new(prefix.to_string(), i); + let value = TestValue::new(format!("value_{}", i), i as u32 * 10); + kvs.push((key, value)); } - let results_without_limit_reverse: Vec<(TestKey, TestValue)> = map - .read_range_with_txn(&txn, begin.clone(), end.clone(), 100, true) - .expect("failed to read range"); - - assert_eq!(results_without_limit_reverse.len(), kvs.len()); + match db.as_transactional().expect("should support transactions") { + crate::rocks::TransactionalHandle::Optimistic(handle) => { + let txn = handle.transaction(); + for (key, value) in kvs.iter() { + map.put_cf_with_txn(&txn, key, value) + .expect("failed to put in transaction"); + } + for prefix in ["range_tesu", "range_tess"] { + for i in 1..=3 { + let key = TestKey::new(prefix.to_string(), i); + let value = TestValue::new(format!("other_{}", i), i as u32 * 100); + map.put_cf_with_txn(&txn, &key, &value) + .expect("failed to put in transaction"); + } + } + txn.commit().expect("failed to commit transaction"); + + let mut begin = prefix.as_bytes().to_vec(); + begin.push(b'/'); + let mut end = prefix.as_bytes().to_vec(); + end.push(b'0'); + + let txn = handle.transaction(); + let results_with_limit: Vec<(TestKey, TestValue)> = map + .read_range_with_txn(&txn, begin.clone(), end.clone(), 3, false) + .expect("failed to read range"); + assert_eq!(results_with_limit.len(), 3); + let results_with_limit_reverse: Vec<(TestKey, TestValue)> = map + .read_range_with_txn(&txn, begin.clone(), end.clone(), 3, true) + .expect("failed to read range"); + assert_eq!(results_with_limit_reverse.len(), 3); + for (i, (key, value)) in results_with_limit_reverse.iter().enumerate() { + assert_eq!(key, &kvs[kvs.len() - i - 1].0); + assert_eq!(value, &kvs[kvs.len() - i - 1].1); + } + let results_without_limit: Vec<(TestKey, TestValue)> = map + .read_range_with_txn(&txn, begin.clone(), end.clone(), 100, false) + .expect("failed to read range"); + assert_eq!(results_without_limit.len(), kvs.len()); + for (i, (key, value)) in results_without_limit.iter().enumerate() { + assert_eq!(key, &kvs[i].0); + assert_eq!(value, &kvs[i].1); + } + let results_without_limit_reverse: Vec<(TestKey, TestValue)> = map + .read_range_with_txn(&txn, begin.clone(), end.clone(), 100, true) + .expect("failed to read range"); + assert_eq!(results_without_limit_reverse.len(), kvs.len()); + for (i, (key, value)) in results_without_limit_reverse.iter().enumerate() { + assert_eq!(key, &kvs[kvs.len() - i - 1].0); + assert_eq!(value, &kvs[kvs.len() - i - 1].1); + } - for (i, (key, value)) in results_without_limit_reverse.iter().enumerate() { - assert_eq!(key, &kvs[kvs.len() - i - 1].0); - assert_eq!(value, &kvs[kvs.len() - i - 1].1); - } - } + let txn = handle.transaction(); + assert!(map.delete_cf_with_txn(&txn, &kvs[2].0).is_ok()); + txn.commit().expect("failed to commit transaction"); + + let txn = handle.transaction(); + let results_without_limit: Vec<(TestKey, TestValue)> = map + .read_range_with_txn(&txn, begin, end, 100, false) + .expect("failed to read range"); + kvs.remove(2); + assert_eq!(results_without_limit.len(), kvs.len()); + for (i, (key, value)) in results_without_limit.iter().enumerate() { + assert_eq!(key, &kvs[i].0); + assert_eq!(value, &kvs[i].1); + } + } + crate::rocks::TransactionalHandle::Transaction(handle) => { + let txn = handle.transaction(); + for (key, value) in kvs.iter() { + map.put_cf_with_txn(&txn, key, value) + .expect("failed to put in transaction"); + } + for prefix in ["range_tesu", "range_tess"] { + for i in 1..=3 { + let key = TestKey::new(prefix.to_string(), i); + let value = TestValue::new(format!("other_{}", i), i as u32 * 100); + map.put_cf_with_txn(&txn, &key, &value) + .expect("failed to put in transaction"); + } + } + txn.commit().expect("failed to commit transaction"); + + let mut begin = prefix.as_bytes().to_vec(); + begin.push(b'/'); + let mut end = prefix.as_bytes().to_vec(); + end.push(b'0'); + + let txn = handle.transaction(); + let results_with_limit: Vec<(TestKey, TestValue)> = map + .read_range_with_txn(&txn, begin.clone(), end.clone(), 3, false) + .expect("failed to read range"); + assert_eq!(results_with_limit.len(), 3); + let results_with_limit_reverse: Vec<(TestKey, TestValue)> = map + .read_range_with_txn(&txn, begin.clone(), end.clone(), 3, true) + .expect("failed to read range"); + assert_eq!(results_with_limit_reverse.len(), 3); + for (i, (key, value)) in results_with_limit_reverse.iter().enumerate() { + assert_eq!(key, &kvs[kvs.len() - i - 1].0); + assert_eq!(value, &kvs[kvs.len() - i - 1].1); + } + let results_without_limit: Vec<(TestKey, TestValue)> = map + .read_range_with_txn(&txn, begin.clone(), end.clone(), 100, false) + .expect("failed to read range"); + assert_eq!(results_without_limit.len(), kvs.len()); + for (i, (key, value)) in results_without_limit.iter().enumerate() { + assert_eq!(key, &kvs[i].0); + assert_eq!(value, &kvs[i].1); + } + let results_without_limit_reverse: Vec<(TestKey, TestValue)> = map + .read_range_with_txn(&txn, begin.clone(), end.clone(), 100, true) + .expect("failed to read range"); + assert_eq!(results_without_limit_reverse.len(), kvs.len()); + for (i, (key, value)) in results_without_limit_reverse.iter().enumerate() { + assert_eq!(key, &kvs[kvs.len() - i - 1].0); + assert_eq!(value, &kvs[kvs.len() - i - 1].1); + } - { - let txn = handle.transaction(); - assert!(map.delete_cf_with_txn(&txn, &kvs[2].0).is_ok()); - txn.commit().expect("failed to commit transaction"); - - let txn = handle.transaction(); - let results_without_limit: Vec<(TestKey, TestValue)> = map - .read_range_with_txn(&txn, begin, end, 100, false) - .expect("failed to read range"); - - kvs.remove(2); - assert_eq!(results_without_limit.len(), kvs.len()); - for (i, (key, value)) in results_without_limit.iter().enumerate() { - assert_eq!(key, &kvs[i].0); - assert_eq!(value, &kvs[i].1); + let txn = handle.transaction(); + assert!(map.delete_cf_with_txn(&txn, &kvs[2].0).is_ok()); + txn.commit().expect("failed to commit transaction"); + + let txn = handle.transaction(); + let results_without_limit: Vec<(TestKey, TestValue)> = map + .read_range_with_txn(&txn, begin, end, 100, false) + .expect("failed to read range"); + kvs.remove(2); + assert_eq!(results_without_limit.len(), kvs.len()); + for (i, (key, value)) in results_without_limit.iter().enumerate() { + assert_eq!(key, &kvs[i].0); + assert_eq!(value, &kvs[i].1); + } + } } } } diff --git a/crates/typed-store/src/rocks.rs b/crates/typed-store/src/rocks.rs index 5fd3bebef8..2aa60fff03 100644 --- a/crates/typed-store/src/rocks.rs +++ b/crates/typed-store/src/rocks.rs @@ -41,6 +41,9 @@ use rocksdb::{ SnapshotWithThreadMode, SstFileWriter, Transaction, + TransactionDB, + TransactionDBOptions, + TransactionOptions, WriteBatch, WriteBatchWithTransaction, WriteOptions, @@ -179,6 +182,23 @@ impl DbBehavior for rocksdb::OptimisticTransactionDB { } } +impl DbBehavior for rocksdb::TransactionDB { + const ENGINE_LABEL: &'static str = "TransactionDB"; + fn cancel_on_drop(_db: &Self) { + // The rust-rocksdb TransactionDB wrapper does not expose the same management APIs as + // DB/OptimisticTransactionDB, so drop falls back to the engine's own cleanup path. + } +} + +/// Transaction engine variants exposed by typed-store. +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub enum TransactionKind { + /// Optimistic conflict detection at commit time. + Optimistic, + /// Lock-based TransactionDB semantics. + Pessimistic, +} + /// A generic wrapper around RocksDB engines with common metadata and behavior. pub struct DBWrapper { /// The underlying rocksdb database. @@ -226,6 +246,8 @@ pub type DBWithThreadModeWrapper = DBWrapper>; +/// Backwards-compatible alias for the `rocksdb::TransactionDB` wrapper. +pub type TransactionDBWrapper = DBWrapper>; #[derive(Debug)] /// Wrapper around RocksDB engines used by Walrus. @@ -234,6 +256,8 @@ pub enum RocksDB { DB(DBWithThreadModeWrapper), /// RocksDB optimistic transaction engine. OptimisticTransactionDB(OptimisticTransactionDBWrapper), + /// RocksDB lock-based transaction engine. + TransactionDB(TransactionDBWrapper), } /// A deliberate handle for optimistic-transaction-only APIs. @@ -261,6 +285,97 @@ impl<'a> OptimisticHandle<'a> { } } +/// A deliberate handle for lock-based transaction-only APIs. +#[derive(Debug, Copy, Clone)] +pub struct TransactionHandle<'a> { + pub(crate) inner: &'a TransactionDBWrapper, +} + +impl<'a> TransactionHandle<'a> { + /// Create a new transaction without a snapshot. + /// Consistency: No snapshot; guarantees keys haven't changed since first write/get_for_update. + pub fn transaction_without_snapshot( + &self, + ) -> Transaction<'a, rocksdb::TransactionDB> { + self.inner.underlying.transaction() + } + + /// Create a new transaction with a snapshot for repeatable reads. + pub fn transaction(&self) -> Transaction<'a, rocksdb::TransactionDB> { + let mut tx_opts = TransactionOptions::new(); + tx_opts.set_snapshot(true); + self.inner + .underlying + .transaction_opt(&WriteOptions::default(), &tx_opts) + } +} + +/// Shared behavior for typed-store transaction-capable handles. +pub trait TransactionHandleOps<'a>: Copy { + /// The RocksDB transaction engine backing this handle. + type Engine; + + /// The transaction engine kind. + fn kind(&self) -> TransactionKind; + + /// Create a new transaction without a snapshot. + fn transaction_without_snapshot(&self) -> Transaction<'a, Self::Engine>; + + /// Create a new snapshot-enabled transaction. + fn transaction(&self) -> Transaction<'a, Self::Engine>; +} + +impl<'a> TransactionHandleOps<'a> for OptimisticHandle<'a> { + type Engine = rocksdb::OptimisticTransactionDB; + + fn kind(&self) -> TransactionKind { + TransactionKind::Optimistic + } + + fn transaction_without_snapshot(&self) -> Transaction<'a, Self::Engine> { + Self::transaction_without_snapshot(self) + } + + fn transaction(&self) -> Transaction<'a, Self::Engine> { + Self::transaction(self) + } +} + +impl<'a> TransactionHandleOps<'a> for TransactionHandle<'a> { + type Engine = rocksdb::TransactionDB; + + fn kind(&self) -> TransactionKind { + TransactionKind::Pessimistic + } + + fn transaction_without_snapshot(&self) -> Transaction<'a, Self::Engine> { + Self::transaction_without_snapshot(self) + } + + fn transaction(&self) -> Transaction<'a, Self::Engine> { + Self::transaction(self) + } +} + +/// A runtime-selected transactional handle for RocksDB engines that support transactions. +#[derive(Debug, Copy, Clone)] +pub enum TransactionalHandle<'a> { + /// Optimistic transaction engine handle. + Optimistic(OptimisticHandle<'a>), + /// Lock-based transaction engine handle. + Transaction(TransactionHandle<'a>), +} + +impl<'a> TransactionalHandle<'a> { + /// Returns the underlying transaction engine kind. + pub fn kind(&self) -> TransactionKind { + match self { + Self::Optimistic(handle) => handle.kind(), + Self::Transaction(handle) => handle.kind(), + } + } +} + /// Handle for range-delete operations only valid for standard RocksDB engine. #[derive(Debug, Copy, Clone)] pub struct RangeDeleteHandle<'a> { @@ -284,12 +399,14 @@ macro_rules! delegate_call { match $self { Self::DB(d) => d.underlying.$method($($args),*), Self::OptimisticTransactionDB(d) => d.underlying.$method($($args),*), + Self::TransactionDB(d) => d.underlying.$method($($args),*), } }; ($self:ident.$field:ident) => { match $self { Self::DB(d) => &d.$field, Self::OptimisticTransactionDB(d) => &d.$field, + Self::TransactionDB(d) => &d.$field, } } @@ -303,6 +420,7 @@ macro_rules! delegate_pair { (RocksDB::OptimisticTransactionDB($td), RocksDBBatch::OptimisticTransactionDB($tb)) => { $txn_expr } + (RocksDB::TransactionDB($td), RocksDBBatch::TransactionDB($tb)) => $txn_expr, _ => Err(TypedStoreError::RocksDBError( "using invalid batch type for the database".into(), )), @@ -316,15 +434,47 @@ impl RocksDB { pub fn as_optimistic(&self) -> Option> { match self { RocksDB::OptimisticTransactionDB(db) => Some(OptimisticHandle { inner: db }), + RocksDB::TransactionDB(_) | RocksDB::DB(_) => None, + } + } + + /// Returns a lock-based transaction handle if the DB is using the TransactionDB engine. + /// This allows invoking transaction APIs only when supported. + pub fn as_transaction(&self) -> Option> { + match self { + RocksDB::TransactionDB(db) => Some(TransactionHandle { inner: db }), RocksDB::DB(_) => None, + RocksDB::OptimisticTransactionDB(_) => None, } } + /// Returns a transactional handle for any transaction-capable RocksDB engine. + pub fn as_transactional(&self) -> Option> { + match self { + RocksDB::OptimisticTransactionDB(db) => { + Some(TransactionalHandle::Optimistic(OptimisticHandle { + inner: db, + })) + } + RocksDB::TransactionDB(db) => { + Some(TransactionalHandle::Transaction(TransactionHandle { + inner: db, + })) + } + RocksDB::DB(_) => None, + } + } + + /// Returns whether this engine supports RocksDB transactions. + pub fn supports_transactions(&self) -> bool { + self.as_transactional().is_some() + } + /// Returns a handle that allows range-delete operations (only for standard RocksDB engine). pub fn as_range_delete(&self) -> Option> { match self { RocksDB::DB(db) => Some(RangeDeleteHandle { inner: db }), - RocksDB::OptimisticTransactionDB(_) => None, + RocksDB::OptimisticTransactionDB(_) | RocksDB::TransactionDB(_) => None, } } @@ -338,6 +488,9 @@ impl RocksDB { RocksDB::OptimisticTransactionDB(db) => RocksDBSnapshot::OptimisticTransactionDB( RocksDBSnapshotHandle::new(db.underlying.snapshot(), Arc::clone(self)), ), + RocksDB::TransactionDB(db) => RocksDBSnapshot::TransactionDB( + RocksDBSnapshotHandle::new(db.underlying.snapshot(), Arc::clone(self)), + ), } } @@ -377,7 +530,19 @@ impl RocksDB { K: AsRef<[u8]> + 'a + ?Sized, I: IntoIterator, { - delegate_call!(self.batched_multi_get_cf_opt(cf, keys, sorted_input, readopts)) + match self { + Self::DB(d) => d + .underlying + .batched_multi_get_cf_opt(cf, keys, sorted_input, readopts), + Self::OptimisticTransactionDB(d) => { + d.underlying + .batched_multi_get_cf_opt(cf, keys, sorted_input, readopts) + } + Self::TransactionDB(d) => keys + .into_iter() + .map(|key| d.underlying.get_pinned_cf_opt(cf, key, readopts)) + .collect(), + } } /// Get a property value from a specific column family. @@ -386,7 +551,11 @@ impl RocksDB { cf: &impl AsColumnFamilyRef, name: impl CStrLike, ) -> Result, rocksdb::Error> { - delegate_call!(self.property_int_value_cf(cf, name)) + match self { + Self::DB(d) => d.underlying.property_int_value_cf(cf, name), + Self::OptimisticTransactionDB(d) => d.underlying.property_int_value_cf(cf, name), + Self::TransactionDB(_) => Ok(None), + } } /// Get a pinned value from a specific column family. @@ -426,7 +595,14 @@ impl RocksDB { from: K, to: K, ) -> Result<(), rocksdb::Error> { - delegate_call!(self.delete_file_in_range_cf(cf, from, to)) + match self { + Self::DB(d) => d.underlying.delete_file_in_range_cf(cf, from, to), + Self::OptimisticTransactionDB(d) => d.underlying.delete_file_in_range_cf(cf, from, to), + Self::TransactionDB(_) => { + tracing::warn!("delete_file_in_range_cf is not supported for TransactionDB"); + Ok(()) + } + } } /// Delete a value from a specific column family. @@ -474,12 +650,26 @@ impl RocksDB { key: K, readopts: &ReadOptions, ) -> bool { - delegate_call!(self.key_may_exist_cf_opt(cf, key, readopts)) + match self { + Self::DB(d) => d.underlying.key_may_exist_cf_opt(cf, key, readopts), + Self::OptimisticTransactionDB(d) => { + d.underlying.key_may_exist_cf_opt(cf, key, readopts) + } + Self::TransactionDB(d) => d + .underlying + .get_pinned_cf_opt(cf, key, readopts) + .map(|value| value.is_some()) + .unwrap_or(true), + } } /// Try to catch up with the primary. pub fn try_catch_up_with_primary(&self) -> Result<(), rocksdb::Error> { - delegate_call!(self.try_catch_up_with_primary()) + match self { + Self::DB(d) => d.underlying.try_catch_up_with_primary(), + Self::OptimisticTransactionDB(d) => d.underlying.try_catch_up_with_primary(), + Self::TransactionDB(_) => Ok(()), + } } /// Write a batch of operations to the database. @@ -518,6 +708,9 @@ impl RocksDB { Self::OptimisticTransactionDB(d) => RocksDBRawIter::OptimisticTransactionDB( d.underlying.raw_iterator_cf_opt(cf_handle, readopts), ), + Self::TransactionDB(d) => { + RocksDBRawIter::TransactionDB(d.underlying.raw_iterator_cf_opt(cf_handle, readopts)) + } } } @@ -528,7 +721,13 @@ impl RocksDB { start: Option, end: Option, ) { - delegate_call!(self.compact_range_cf(cf, start, end)) + match self { + Self::DB(d) => d.underlying.compact_range_cf(cf, start, end), + Self::OptimisticTransactionDB(d) => d.underlying.compact_range_cf(cf, start, end), + Self::TransactionDB(_) => { + tracing::warn!("compact_range_cf is not supported for TransactionDB"); + } + } } /// Compact a range of values in a specific column family to the bottom. @@ -541,7 +740,15 @@ impl RocksDB { ) { let opt = &mut CompactOptions::default(); opt.set_bottommost_level_compaction(BottommostLevelCompaction::ForceOptimized); - delegate_call!(self.compact_range_cf_opt(cf, start, end, opt)) + match self { + Self::DB(d) => d.underlying.compact_range_cf_opt(cf, start, end, opt), + Self::OptimisticTransactionDB(d) => { + d.underlying.compact_range_cf_opt(cf, start, end, opt) + } + Self::TransactionDB(_) => { + tracing::warn!("compact_range_cf_opt is not supported for TransactionDB"); + } + } } /// Ingest external SST files into a specific column family. Ingesting into an existing @@ -556,15 +763,39 @@ impl RocksDB { cf: &impl AsColumnFamilyRef, paths: &[P], opts: &IngestExternalFileOptions, - ) -> Result<(), rocksdb::Error> { + ) -> Result<(), TypedStoreError> { let v: Vec<&Path> = paths.iter().map(|p| p.as_ref()).collect(); - delegate_call!(self.ingest_external_file_cf_opts(cf, opts, v)) + match self { + Self::DB(d) => d + .underlying + .ingest_external_file_cf_opts(cf, opts, v) + .map_err(typed_store_err_from_rocks_err), + Self::OptimisticTransactionDB(d) => d + .underlying + .ingest_external_file_cf_opts(cf, opts, v) + .map_err(typed_store_err_from_rocks_err), + Self::TransactionDB(_) => Err(TypedStoreError::RocksDBError( + "external SST ingestion is not supported for TransactionDB".into(), + )), + } } /// Flush the database #[allow(dead_code)] pub fn flush(&self) -> Result<(), TypedStoreError> { - delegate_call!(self.flush()).map_err(|e| TypedStoreError::RocksDBError(e.into_string())) + match self { + Self::DB(d) => d + .underlying + .flush() + .map_err(|e| TypedStoreError::RocksDBError(e.into_string())), + Self::OptimisticTransactionDB(d) => d + .underlying + .flush() + .map_err(|e| TypedStoreError::RocksDBError(e.into_string())), + Self::TransactionDB(_) => Err(TypedStoreError::RocksDBError( + "flush is not supported for TransactionDB".into(), + )), + } } /// Create a checkpoint of the database. @@ -588,12 +819,22 @@ impl RocksDB { Self::OptimisticTransactionDB(d) => engine .create_new_backup_flush(&d.underlying, flush_before_backup) .map_err(typed_store_err_from_rocks_err), + Self::TransactionDB(_) => Err(TypedStoreError::RocksDBError( + "backup is not supported for TransactionDB".into(), + )), } } /// Flush a specific column family. pub fn flush_cf(&self, cf: &impl AsColumnFamilyRef) -> Result<(), rocksdb::Error> { - delegate_call!(self.flush_cf(cf)) + match self { + Self::DB(d) => d.underlying.flush_cf(cf), + Self::OptimisticTransactionDB(d) => d.underlying.flush_cf(cf), + Self::TransactionDB(_) => { + tracing::warn!("flush_cf is not supported for TransactionDB"); + Ok(()) + } + } } /// Set options for a specific column family. @@ -603,12 +844,26 @@ impl RocksDB { cf: &impl AsColumnFamilyRef, opts: &[(&str, &str)], ) -> Result<(), rocksdb::Error> { - delegate_call!(self.set_options_cf(cf, opts)) + match self { + Self::DB(d) => d.underlying.set_options_cf(cf, opts), + Self::OptimisticTransactionDB(d) => d.underlying.set_options_cf(cf, opts), + Self::TransactionDB(_) => { + tracing::warn!("set_options_cf is not supported for TransactionDB"); + Ok(()) + } + } } /// Set options for the database. pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), rocksdb::Error> { - delegate_call!(self.set_options(opts)) + match self { + Self::DB(d) => d.underlying.set_options(opts), + Self::OptimisticTransactionDB(d) => d.underlying.set_options(opts), + Self::TransactionDB(_) => { + tracing::warn!("set_options is not supported for TransactionDB"); + Ok(()) + } + } } /// Get the sampling interval for the database. @@ -661,7 +916,11 @@ impl RocksDB { /// Get the live files in the database. #[allow(dead_code)] pub fn live_files(&self) -> Result, Error> { - delegate_call!(self.live_files()) + match self { + Self::DB(d) => d.underlying.live_files(), + Self::OptimisticTransactionDB(d) => d.underlying.live_files(), + Self::TransactionDB(_) => Ok(Vec::new()), + } } /// Create a new batch for the database. @@ -671,6 +930,9 @@ impl RocksDB { RocksDB::OptimisticTransactionDB(_) => { RocksDBBatch::OptimisticTransactionDB(WriteBatchWithTransaction::::default()) } + RocksDB::TransactionDB(_) => { + RocksDBBatch::TransactionDB(WriteBatchWithTransaction::::default()) + } } } @@ -682,16 +944,21 @@ impl RocksDB { RocksDB::OptimisticTransactionDB(d) => { Checkpoint::new(&d.underlying).map_err(typed_store_err_from_rocks_err) } + RocksDB::TransactionDB(_) => Err(TypedStoreError::RocksDBError( + "checkpoint is not supported for TransactionDB".into(), + )), } } } -/// A batch of write operations for RocksDB, covering both standard and optimistic transaction DBs. +/// A batch of write operations for RocksDB, covering standard and transactional engines. pub enum RocksDBBatch { /// A write batch for a standard `rocksdb::DB`. DB(rocksdb::WriteBatch), /// A write batch for an `rocksdb::OptimisticTransactionDB`. OptimisticTransactionDB(rocksdb::WriteBatchWithTransaction), + /// A write batch for a `rocksdb::TransactionDB`. + TransactionDB(rocksdb::WriteBatchWithTransaction), } impl fmt::Debug for RocksDBBatch { @@ -699,6 +966,7 @@ impl fmt::Debug for RocksDBBatch { match self { Self::DB(_) => write!(f, "RocksDBBatch::DB"), Self::OptimisticTransactionDB(_) => write!(f, "RocksDBBatch::OptimisticTransactionDB"), + Self::TransactionDB(_) => write!(f, "RocksDBBatch::TransactionDB"), } } } @@ -708,6 +976,7 @@ macro_rules! delegate_batch_call { match $self { Self::DB(b) => b.$method($($args),*), Self::OptimisticTransactionDB(b) => b.$method($($args),*), + Self::TransactionDB(b) => b.$method($($args),*), } } } @@ -753,10 +1022,12 @@ impl RocksDBBatch { batch.delete_range_cf(cf, from, to); Ok(()) } - Self::OptimisticTransactionDB(_) => { - tracing::warn!("delete_range_cf is not supported for OptimisticTransactionDB"); + Self::OptimisticTransactionDB(_) | Self::TransactionDB(_) => { + tracing::warn!( + "delete_range_cf is not supported for transactional RocksDB engines" + ); Err(TypedStoreError::RocksDBError( - "delete_range_cf is not supported for OptimisticTransactionDB".into(), + "delete_range_cf is not supported for transactional RocksDB engines".into(), )) } } @@ -1012,6 +1283,9 @@ impl DBMap { RocksDB::OptimisticTransactionDB(_) => { RocksDBBatch::OptimisticTransactionDB(WriteBatchWithTransaction::::default()) } + RocksDB::TransactionDB(_) => { + RocksDBBatch::TransactionDB(WriteBatchWithTransaction::::default()) + } }; DBBatch::new( &self.rocksdb, @@ -1024,6 +1298,11 @@ impl DBMap { /// Compact a range of keys in a specific column family. pub fn compact_range(&self, start: &J, end: &J) -> Result<(), TypedStoreError> { + if matches!(self.rocksdb.as_ref(), RocksDB::TransactionDB(_)) { + return Err(TypedStoreError::RocksDBError( + "manual compaction is not supported for TransactionDB".into(), + )); + } let from_buf = be_fix_int_ser(start)?; let to_buf = be_fix_int_ser(end)?; self.rocksdb @@ -1037,6 +1316,11 @@ impl DBMap { start: &J, end: &J, ) -> Result<(), TypedStoreError> { + if matches!(self.rocksdb.as_ref(), RocksDB::TransactionDB(_)) { + return Err(TypedStoreError::RocksDBError( + "manual compaction is not supported for TransactionDB".into(), + )); + } let from_buf = be_fix_int_ser(start)?; let to_buf = be_fix_int_ser(end)?; self.rocksdb @@ -1101,6 +1385,11 @@ impl DBMap { /// Flush the column family. pub fn flush(&self) -> Result<(), TypedStoreError> { + if matches!(self.rocksdb.as_ref(), RocksDB::TransactionDB(_)) { + return Err(TypedStoreError::RocksDBError( + "flush is not supported for TransactionDB".into(), + )); + } self.rocksdb .flush_cf(&self.cf()?) .map_err(|e| TypedStoreError::RocksDBError(e.into_string())) @@ -1120,7 +1409,6 @@ impl DBMap { ) -> Result<(), TypedStoreError> { self.rocksdb .ingest_external_file_cf(&self.cf()?, paths, opts) - .map_err(typed_store_err_from_rocks_err) } /// Returns a vector of raw values corresponding to the keys provided. @@ -1338,6 +1626,9 @@ impl DBMap { handle.snapshot.raw_iterator_cf_opt(&cf_handle, readopts), ) } + RocksDBSnapshot::TransactionDB(handle) => RocksDBRawIter::TransactionDB( + handle.snapshot.raw_iterator_cf_opt(&cf_handle, readopts), + ), }; let iter_context = self.create_iter_context(); @@ -1672,6 +1963,7 @@ macro_rules! delegate_iter_call { match $self { Self::DB(db) => db.$method($($args),*), Self::OptimisticTransactionDB(db) => db.$method($($args),*), + Self::TransactionDB(db) => db.$method($($args),*), } } } @@ -1684,6 +1976,8 @@ pub enum RocksDBRawIter<'a> { OptimisticTransactionDB( rocksdb::DBRawIteratorWithThreadMode<'a, OptimisticTransactionDB>, ), + /// Raw iterator variant for a `rocksdb::TransactionDB`. + TransactionDB(rocksdb::DBRawIteratorWithThreadMode<'a, TransactionDB>), } impl<'a> fmt::Debug for RocksDBRawIter<'a> { @@ -1693,6 +1987,7 @@ impl<'a> fmt::Debug for RocksDBRawIter<'a> { Self::OptimisticTransactionDB(_) => { write!(f, "RocksDBRawIter::OptimisticTransactionDB(..)") } + Self::TransactionDB(_) => write!(f, "RocksDBRawIter::TransactionDB(..)"), } } } @@ -1769,13 +2064,15 @@ where } } -/// Snapshot handle covering both RocksDB engine variants. +/// Snapshot handle covering all RocksDB engine variants. #[derive(Debug)] pub enum RocksDBSnapshot<'a> { /// Snapshot for the standard RocksDB engine. DB(RocksDBSnapshotHandle<'a, DBWithThreadMode>), /// Snapshot for the optimistic transaction engine. OptimisticTransactionDB(RocksDBSnapshotHandle<'a, OptimisticTransactionDB>), + /// Snapshot for the lock-based transaction engine. + TransactionDB(RocksDBSnapshotHandle<'a, TransactionDB>), } impl<'a> RocksDBSnapshot<'a> { @@ -1783,6 +2080,7 @@ impl<'a> RocksDBSnapshot<'a> { match self { Self::DB(handle) => handle.db.clone(), Self::OptimisticTransactionDB(handle) => handle.db.clone(), + Self::TransactionDB(handle) => handle.db.clone(), } } } @@ -1891,6 +2189,11 @@ where RocksDBSnapshot::OptimisticTransactionDB(handle) => handle .snapshot .get_pinned_cf_opt(&cf_handle, &key_buf, self.opts.readopts()), + RocksDBSnapshot::TransactionDB(handle) => { + handle + .snapshot + .get_pinned_cf_opt(&cf_handle, &key_buf, self.opts.readopts()) + } } .map_err(typed_store_err_from_rocks_err)?; @@ -2039,7 +2342,7 @@ where } /// Deletes all entries by iterating keys and issuing per-key deletes (batched), for engines - /// that do not support range deletes (e.g., OptimisticTransactionDB). + /// that do not support range deletes (e.g., OptimisticTransactionDB and TransactionDB). /// This is less efficient than range deletes but safe and explicit. fn delete_all_individually(&self) -> Result<(), TypedStoreError> where @@ -2453,6 +2756,57 @@ pub fn open_cf_optimistic>( Ok(db) } +/// Opens a TransactionDB with options, and a number of column families with +/// individual options that are created if they do not exist. +#[tracing::instrument(level="debug", skip_all, fields(path = ?path.as_ref()), err)] +pub fn open_cf_opts_transaction>( + path: P, + db_options: Option, + metric_conf: MetricConf, + opt_cfs: &[(&str, rocksdb::Options)], +) -> Result, TypedStoreError> { + let path = path.as_ref(); + let cfs = populate_missing_cfs(opt_cfs, path).map_err(typed_store_err_from_rocks_err)?; + sui_macros::nondeterministic!({ + let options = prepare_db_options(db_options); + let txn_db_options = TransactionDBOptions::default(); + rocksdb::TransactionDB::open_cf_descriptors( + &options, + &txn_db_options, + path, + cfs.into_iter() + .map(|(name, opts)| ColumnFamilyDescriptor::new(name, opts)), + ) + .map(|db| { + Arc::new(RocksDB::TransactionDB(TransactionDBWrapper::new( + db, + metric_conf, + PathBuf::from(path), + options, + ))) + }) + .map_err(typed_store_err_from_rocks_err) + }) +} + +/// Opens a TransactionDB with options, and a number of column families that are created +/// if they do not exist. Uses the same options for each provided column family name. +#[tracing::instrument(level="debug", skip_all, fields(path = ?path.as_ref(), cf = ?opt_cfs), err)] +pub fn open_cf_transaction>( + path: P, + db_options: Option, + metric_conf: MetricConf, + opt_cfs: &[&str], +) -> Result, TypedStoreError> { + let options = db_options.unwrap_or_else(|| default_db_options().options); + let column_descriptors: Vec<_> = opt_cfs + .iter() + .map(|name| (*name, options.clone())) + .collect(); + let db = open_cf_opts_transaction(path, Some(options), metric_conf, &column_descriptors[..])?; + Ok(db) +} + /// TODO: Good description of why we're doing this : /// RocksDB stores keys in BE and has a seek operator. /// on iterators, see `https://github.com/facebook/rocksdb/wiki/Iterator#introduction` @@ -2697,7 +3051,6 @@ where .ok_or_else(|| TypedStoreError::UnregisteredColumn(self.cf_name.clone()))?; self.rocksdb .ingest_external_file_cf(&cf, &[sst_path], &ingest_opts) - .map_err(|e| TypedStoreError::RocksDBError(e.into_string())) } /// Flush remaining entries and consume the buffer. @@ -2970,6 +3323,47 @@ mod sst_tests { assert_eq!(dbmap.get(&5).unwrap(), Some(50)); } + #[tokio::test] + async fn sst_ingest_buffer_flush_rejects_transaction_db() { + crate::metrics::DBMetrics::init(&Registry::default()); + + let dir = tempdir().unwrap(); + let mut db_options = rocksdb::Options::default(); + db_options.create_missing_column_families(true); + db_options.create_if_missing(true); + let rocks = open_cf_opts_transaction( + dir.path(), + Some(db_options), + MetricConf::default(), + &[("cf1", rocksdb::Options::default())], + ) + .unwrap(); + + let dbmap = + DBMap::::reopen(&rocks, Some("cf1"), &ReadWriteOptions::default(), false) + .expect("open cf1"); + + let options = SstIngestOptions { + max_entries: 10, + dir_name: None, + assume_sorted: true, + }; + + let mut buf = SstIngestBuffer::new(&dbmap, options).expect("create buffer"); + buf.push(1u32, 10u32).unwrap(); + let err = buf + .flush() + .expect_err("transaction db should reject SST ingestion"); + assert!( + matches!( + err, + TypedStoreError::RocksDBError(ref message) + if message.contains("external SST ingestion is not supported") + ), + "unexpected error: {err:?}" + ); + } + #[tokio::test] async fn sst_ingest_two_flushes_with_overlapping_keys() { crate::metrics::DBMetrics::init(&Registry::default()); diff --git a/crates/typed-store/src/rocks/tests.rs b/crates/typed-store/src/rocks/tests.rs index 796c0f8def..1885289037 100644 --- a/crates/typed-store/src/rocks/tests.rs +++ b/crates/typed-store/src/rocks/tests.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use std::{ - sync::{Mutex, MutexGuard, OnceLock}, + sync::{Arc, Barrier, Mutex, MutexGuard, OnceLock}, time::Duration, }; @@ -1143,3 +1143,259 @@ async fn test_optimistic_transaction_conflict_and_retry() { assert_eq!(db.get(&1).unwrap(), Some("final".to_string())); } + +/// Stress test for the WAL-1187 corruption shape. +/// Lock-based TransactionDB version of the WAL-1187 reproducer above. +/// +/// TransactionDB may reject or delay the outside merge because it uses key locks instead of +/// optimistic commit-time validation. The property we care about is narrower: even when both +/// sides make progress, the transaction must not commit a delete that hides a successful `R` +/// merge. +#[tokio::test] +async fn test_transaction_db_concurrent_write_batch_merge_must_not_hide_register() { + let _guard = global_test_lock(); + + fn zero_register_certify_merge( + _key: &[u8], + existing_val: Option<&[u8]>, + operands: &rocksdb::MergeOperands, + ) -> Option> { + let mut state = match existing_val { + None | Some(b"0") => 0, + Some(b"V") => 2, + _ => return None, + }; + for op in operands { + match (state, op) { + (0, b"R") => state = 1, + (1, b"C") => state = 2, + _ => return None, + } + } + Some(match state { + 0 => vec![b'0'], + 1 | 2 => vec![b'V'], + _ => unreachable!(), + }) + } + + let path = temp_dir(); + let cf_name = "merge_cf"; + + let mut cf_opts = rocksdb::Options::default(); + cf_opts.set_merge_operator( + "zero_register_certify_merge", + zero_register_certify_merge, + |_, _, _| None, + ); + + let db = open_cf_opts_transaction(&path, None, MetricConf::default(), &[(cf_name, cf_opts)]) + .expect("failed to open transaction db"); + + let RocksDB::TransactionDB(ref wrapper) = *db else { + panic!("expected TransactionDB"); + }; + let underlying = &wrapper.underlying; + const ITERATIONS: usize = 500; + let mut hidden_register_failures = 0usize; + let mut successful_merges = 0usize; + let mut successful_commits = 0usize; + + for i in 0..ITERATIONS { + let key = format!("counter-{i}").into_bytes(); + let cf = underlying.cf_handle(cf_name).expect("cf handle"); + + underlying.put_cf(&cf, &key, b"0").unwrap(); + + let ready_barrier = Arc::new(Barrier::new(3)); + let release_barrier = Arc::new(Barrier::new(3)); + + let (merge_result, commit_result): ( + Result<(), rocksdb::Error>, + Result<(), rocksdb::Error>, + ) = std::thread::scope(|scope| { + let commit_ready_barrier = ready_barrier.clone(); + let commit_release_barrier = release_barrier.clone(); + let handle = db.as_transaction().expect("transaction db"); + let commit_key = key.clone(); + let commit_thread = scope.spawn(move || { + let commit_cf = underlying.cf_handle(cf_name).expect("cf handle"); + let tx = handle.transaction(); + let read = tx + .get_for_update_cf_opt( + &commit_cf, + &commit_key, + false, + &rocksdb::ReadOptions::default(), + ) + .unwrap(); + assert_eq!( + read.as_deref(), + Some(&b"0"[..]), + "transaction should only delete after observing 0" + ); + tx.delete_cf(&commit_cf, &commit_key).unwrap(); + + commit_ready_barrier.wait(); + commit_release_barrier.wait(); + tx.commit() + }); + + let merge_ready_barrier = ready_barrier.clone(); + let merge_release_barrier = release_barrier.clone(); + let merge_key = key.clone(); + let merge_thread = scope.spawn(move || { + merge_ready_barrier.wait(); + merge_release_barrier.wait(); + let merge_cf = underlying.cf_handle(cf_name).expect("cf handle"); + let mut batch = rocksdb::WriteBatchWithTransaction::::default(); + batch.merge_cf(&merge_cf, &merge_key, b"R"); + underlying.write(batch) + }); + + ready_barrier.wait(); + release_barrier.wait(); + let merge_result = merge_thread.join().unwrap(); + let commit_result = commit_thread.join().unwrap(); + (merge_result, commit_result) + }); + + match &merge_result { + Ok(()) => successful_merges += 1, + Err(err) => assert!( + matches!( + err.kind(), + rocksdb::ErrorKind::Busy | rocksdb::ErrorKind::TimedOut + ), + concat!( + "merge batch should either succeed or fail with a lock conflict on ", + "iteration {}: {:?}" + ), + i, + err + ), + } + + match &commit_result { + Ok(()) => successful_commits += 1, + Err(err) => assert!( + matches!( + err.kind(), + rocksdb::ErrorKind::Busy | rocksdb::ErrorKind::TimedOut + ), + concat!( + "transaction commit should either succeed or fail with a lock conflict on ", + "iteration {}: {:?}" + ), + i, + err + ), + } + + underlying.merge_cf(&cf, &key, b"C").unwrap(); + if commit_result.is_ok() && merge_result.is_ok() && underlying.get_cf(&cf, &key).is_err() { + hidden_register_failures += 1; + } + } + + assert_eq!( + hidden_register_failures, 0, + "concurrent register merge was hidden behind a later committed delete {} times out of {}", + hidden_register_failures, ITERATIONS + ); + assert!( + successful_merges > 0, + "expected at least one concurrent merge write to succeed" + ); + assert!( + successful_commits > 0, + "expected at least one transaction commit to succeed" + ); +} + +/// Control test for the WAL-1187 reproducer above. +/// +/// This uses the same merge operator shape, but instead of staging a delete inside the +/// transaction it stages a normal put. The outside merge is fully written before commit is +/// attempted, so the optimistic transaction is expected to fail with a write-conflict error. +#[tokio::test] +async fn test_optimistic_transaction_put_conflicts_with_outside_merge_after_get_for_update() { + let _guard = global_test_lock(); + + fn zero_register_certify_merge( + _key: &[u8], + existing_val: Option<&[u8]>, + operands: &rocksdb::MergeOperands, + ) -> Option> { + let mut state = match existing_val { + None | Some(b"0") => 0, + Some(b"V") => 2, + _ => return None, + }; + for op in operands { + match (state, op) { + (0, b"R") => state = 1, + (1, b"C") => state = 2, + _ => return None, + } + } + Some(match state { + 0 => vec![b'0'], + 1 | 2 => vec![b'V'], + _ => unreachable!(), + }) + } + + let path = temp_dir(); + let cf_name = "merge_cf"; + + let mut cf_opts = rocksdb::Options::default(); + cf_opts.set_merge_operator( + "zero_register_certify_merge", + zero_register_certify_merge, + |_, _, _| None, + ); + + let db = open_cf_opts_optimistic(&path, None, MetricConf::default(), &[(cf_name, cf_opts)]) + .expect("failed to open optimistic db"); + + let RocksDB::OptimisticTransactionDB(ref wrapper) = *db else { + panic!("expected OptimisticTransactionDB"); + }; + let underlying = &wrapper.underlying; + let cf = underlying.cf_handle(cf_name).expect("cf handle"); + let key = b"counter".to_vec(); + + underlying.put_cf(&cf, &key, b"0").unwrap(); + + let handle = db.as_optimistic().expect("optimistic db"); + let tx = handle.transaction(); + let read = tx + .get_for_update_cf_opt(&cf, &key, false, &rocksdb::ReadOptions::default()) + .unwrap(); + assert_eq!( + read.as_deref(), + Some(&b"0"[..]), + "transaction should only update after observing 0" + ); + + // Stage a normal update in the transaction, then write the merge outside the transaction + // before commit. This mirrors RocksDB's own optimistic conflict tests and should surface as + // a write conflict rather than silently succeeding. + tx.put_cf(&cf, &key, b"0").unwrap(); + + let mut batch = rocksdb::WriteBatchWithTransaction::::default(); + batch.merge_cf(&cf, &key, b"R"); + underlying.write(batch).unwrap(); + + let commit_err = tx + .commit() + .expect_err("expected optimistic transaction commit to fail after outside merge"); + assert_eq!( + commit_err.kind(), + rocksdb::ErrorKind::Busy, + "expected optimistic transaction commit to fail with Busy, got {commit_err:?}" + ); + + assert_eq!(underlying.get_cf(&cf, &key).unwrap(), Some(b"V".to_vec())); +}