Add support for pessimistic transaction db in typed store#3153
Add support for pessimistic transaction db in typed store#3153sadhansood wants to merge 4 commits intomainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds support for running Walrus’ typed-store layer (and downstream Walrus Service / Indexer storage) on RocksDB’s lock-based TransactionDB (pessimistic) engine in addition to the existing optimistic transaction engine.
Changes:
- Introduces transaction engine selection (
None/Optimistic/Pessimistic) via storage DB config and updates Storage initialization accordingly. - Generalizes several transaction-scoped helpers to work with either RocksDB transaction engine.
- Adds concurrency/regression tests to validate delete-vs-merge behavior across both transaction engines.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| crates/walrus-service/src/node/storage/shard.rs | Generalizes sliver-pair deletion helper to accept any transaction engine. |
| crates/walrus-service/src/node/storage/database_config.rs | Adds TransactionDbMode, a new use_transaction_db flag, and mode selection logic. |
| crates/walrus-service/src/node/storage/blob_info.rs | Generalizes blob-info transactional helpers to accept any transaction engine. |
| crates/walrus-service/src/node/storage.rs | Opens DB based on selected transaction engine; updates GC deletion paths to work with both engines; adds TransactionDB-mode test. |
| crates/walrus-service/src/node/config.rs | Validates mutually exclusive transaction flags and allows GC deletion when either transaction engine is enabled. |
| crates/walrus-indexer/src/storage.rs | Adds ability to open indexer DB with either optimistic or pessimistic transaction engine; updates tests accordingly. |
| crates/typed-store/src/rocks/tests.rs | Adds stress/regression tests for concurrent delete vs merge behavior (optimistic + TransactionDB). |
| crates/typed-store/src/rocks.rs | Adds TransactionDB engine support, transactional handles/traits, open helpers, and engine-specific API behavior. |
| crates/typed-store/src/column_family_handle.rs | Generalizes column-family transactional operations to accept any transaction engine; extends tests to cover TransactionDB. |
Comments suppressed due to low confidence (1)
crates/walrus-service/src/node/storage/blob_info.rs:451
get_for_update_cf_optis called withexclusive = false, but after this call the transaction can proceed to delete/update the same key. ForTransactionDB(pessimistic/lock-based), theexclusiveflag affects lock acquisition and can lead to lock upgrades/deadlocks or weaker isolation than intended. Consider usingexclusive = truehere (it’s already used that way in typed-store’sColumnFamilyHandle::get_for_update_cf) and update the comment which currently only references optimistic transactions.
// The value of the `exclusive` parameter does not matter for optimistic transactions.
Ok(transaction
.get_for_update_cf_opt(
&self.aggregate_cf(),
blob_id,
false,
&self.aggregate_blob_info.opts.readopts(),
)?
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| ) | ||
| .unwrap(); | ||
| println!("read: {:?}", read); | ||
| assert_eq!( | ||
| read.as_deref(), | ||
| Some(&b"0"[..]), | ||
| "transaction should only delete after observing 0" | ||
| ); | ||
| tx.delete_cf(&commit_cf, &commit_key).unwrap(); | ||
|
|
||
| // Signal that the transaction has already observed `0` and staged the delete. | ||
| commit_ready_barrier.wait(); | ||
| // Wait for the merge thread to reach the matching point, then race the actual | ||
| // commit against the concurrent WriteBatch merge. | ||
| commit_release_barrier.wait(); | ||
| let commit_result = tx.commit(); | ||
| println!("commit_result: {:?}", commit_result); | ||
| commit_result |
crates/typed-store/src/rocks.rs
Outdated
| d.underlying.ingest_external_file_cf_opts(cf, opts, v) | ||
| } | ||
| Self::TransactionDB(_) => { | ||
| unimplemented!("ingest_external_file_cf_opts is not supported for TransactionDB") |
ea65f3a to
467756e
Compare
467756e to
6c403ed
Compare
There was a problem hiding this comment.
Pull request overview
This PR extends typed-store’s RocksDB abstraction to support the lock-based (pessimistic) TransactionDB engine alongside the existing standard DB and OptimisticTransactionDB engines, and adds tests to validate transaction semantics and known corruption/repro shapes.
Changes:
- Add a new
RocksDB::TransactionDBengine variant with open helpers and transactional handles (as_transaction,as_transactional). - Update various
RocksDBoperations/batch plumbing/snapshots to account for the new engine, including explicitly rejecting unsupported features (e.g., SST ingest). - Generalize
ColumnFamilyHandletransaction helpers to work with both transaction engines and add TransactionDB coverage tests, plus a new concurrent merge/delete stress test.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
crates/typed-store/src/rocks.rs |
Introduces TransactionDB engine support, handle APIs, and engine-specific behavior for unsupported operations. |
crates/typed-store/src/rocks/tests.rs |
Adds TransactionDB concurrency stress/regression test and an optimistic control test. |
crates/typed-store/src/column_family_handle.rs |
Makes CF transactional helpers engine-generic and tests both optimistic and TransactionDB paths. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| Self::OptimisticTransactionDB(d) => d.underlying.delete_file_in_range_cf(cf, from, to), | ||
| Self::TransactionDB(_) => { | ||
| tracing::warn!("delete_file_in_range_cf is not supported for TransactionDB"); | ||
| Ok(()) |
| pub fn set_options(&self, opts: &[(&str, &str)]) -> Result<(), rocksdb::Error> { | ||
| delegate_call!(self.set_options(opts)) | ||
| match self { | ||
| Self::DB(d) => d.underlying.set_options(opts), | ||
| Self::OptimisticTransactionDB(d) => d.underlying.set_options(opts), | ||
| Self::TransactionDB(_) => { | ||
| tracing::warn!("set_options is not supported for TransactionDB"); | ||
| Ok(()) | ||
| } | ||
| } |
| pub fn property_int_value_cf( | ||
| &self, | ||
| cf: &impl AsColumnFamilyRef, | ||
| name: impl CStrLike, | ||
| ) -> Result<Option<u64>, rocksdb::Error> { | ||
| delegate_call!(self.property_int_value_cf(cf, name)) | ||
| match self { | ||
| Self::DB(d) => d.underlying.property_int_value_cf(cf, name), | ||
| Self::OptimisticTransactionDB(d) => d.underlying.property_int_value_cf(cf, name), | ||
| Self::TransactionDB(_) => Ok(None), | ||
| } |
| match self { | ||
| Self::DB(d) => d.underlying.try_catch_up_with_primary(), | ||
| Self::OptimisticTransactionDB(d) => d.underlying.try_catch_up_with_primary(), | ||
| Self::TransactionDB(_) => Ok(()), |
| pub fn ingest_external_file_cf<P: AsRef<Path>>( | ||
| &self, | ||
| cf: &impl AsColumnFamilyRef, | ||
| paths: &[P], | ||
| opts: &IngestExternalFileOptions, | ||
| ) -> Result<(), rocksdb::Error> { | ||
| ) -> Result<(), TypedStoreError> { | ||
| let v: Vec<&Path> = paths.iter().map(|p| p.as_ref()).collect(); |
| const ITERATIONS: usize = 500; | ||
| let mut hidden_register_failures = 0usize; | ||
| let mut successful_merges = 0usize; | ||
| let mut successful_commits = 0usize; | ||
|
|
||
| for i in 0..ITERATIONS { | ||
| let key = format!("counter-{i}").into_bytes(); | ||
| let cf = underlying.cf_handle(cf_name).expect("cf handle"); | ||
|
|
||
| underlying.put_cf(&cf, &key, b"0").unwrap(); | ||
|
|
||
| let ready_barrier = Arc::new(Barrier::new(3)); | ||
| let release_barrier = Arc::new(Barrier::new(3)); | ||
|
|
||
| let (merge_result, commit_result): ( |
Description
Describe the changes or additions included in this PR.
Test plan
How did you test the new or updated feature?
Release notes
Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required.
For each box you select, include information after the relevant heading that describes the impact of your changes that
a user might notice and any actions they must take to implement updates. (Add release notes after the colon for each item)