From 9672308991ea75acaeb5f6586efd94eb149365fb Mon Sep 17 00:00:00 2001 From: Markus Legner Date: Fri, 20 Mar 2026 15:04:54 +0100 Subject: [PATCH] fix: use per-CF RocksDB options in drop-column-families dbtool command The `DropStorageColumnFamiliesFromConfig` command was opening the DB with identical global options for every column family. CFs with custom merge operators (aggregate_blob_info, per_object_blob_info, event_cursor, etc.) need those operators registered to replay WAL entries on open. Co-Authored-By: Claude Opus 4.6 --- crates/walrus-service/src/node/dbtool.rs | 54 +++++++++++++++++-- crates/walrus-service/src/node/storage.rs | 1 + .../src/node/storage/event_cursor_table.rs | 22 +++++--- 3 files changed, 67 insertions(+), 10 deletions(-) diff --git a/crates/walrus-service/src/node/dbtool.rs b/crates/walrus-service/src/node/dbtool.rs index 254b22cdbd..df8b3c1975 100644 --- a/crates/walrus-service/src/node/dbtool.rs +++ b/crates/walrus-service/src/node/dbtool.rs @@ -50,14 +50,21 @@ use crate::{ PerObjectBlobInfo, blob_info_cf_options, per_object_blob_info_cf_options, + per_object_pooled_blob_info_cf_options, }, constants::{ aggregate_blob_info_cf_name, + event_cursor_cf_name, + event_index_cf_name, + garbage_collector_table_cf_name, metadata_cf_name, + node_status_cf_name, per_object_blob_info_cf_name, + per_object_pooled_blob_info_cf_name, primary_slivers_column_family_name, secondary_slivers_column_family_name, }, + event_cursor_cf_options, }, }, }; @@ -526,8 +533,9 @@ fn drop_column_families(db_path: PathBuf, column_family_names: Vec) -> R /// Drop column families from the storage database using the node's configuration file. /// -/// Opens the database with the same options as the storage node to avoid WAL/SST consistency -/// errors that occur when opening with default RocksDB options. +/// Opens the database with the same per-CF options as the storage node (including merge operators) +/// to avoid WAL/SST consistency errors that occur when opening with default RocksDB options. +/// This can only be called when the storage node is stopped. fn drop_storage_column_families_from_config( config_path: PathBuf, column_family_names: Vec, @@ -542,11 +550,16 @@ fn drop_storage_column_families_from_config( let mut db_opts = RocksdbOptions::from(&db_config.global()); db_opts.create_missing_column_families(true); + let factory = DatabaseTableOptionsFactory::new(db_config.clone(), true); + let existing_cfs = DB::list_cf(&db_opts, db_path)?; println!("Existing column families: {existing_cfs:?}"); let cf_descriptors: Vec<_> = existing_cfs .iter() - .map(|name| (name.as_str(), db_opts.clone())) + .map(|name| { + let opts = storage_cf_options(name, &factory); + (name.as_str(), opts) + }) .collect(); let no_op_sampling = SamplingInterval::new(Duration::ZERO, 0); @@ -581,6 +594,41 @@ fn drop_storage_column_families_from_config( Ok(()) } +/// Returns the correct RocksDB options for a storage DB column family, matching the options +/// used by `Storage::open()`. This is critical for CFs with merge operators +/// (`aggregate_blob_info`, `per_object_blob_info`, `event_cursor`, etc.) since RocksDB needs +/// them to replay WAL entries. +fn storage_cf_options(name: &str, factory: &DatabaseTableOptionsFactory) -> RocksdbOptions { + if name == aggregate_blob_info_cf_name() { + blob_info_cf_options(factory) + } else if name == per_object_blob_info_cf_name() { + per_object_blob_info_cf_options(factory) + } else if name == per_object_pooled_blob_info_cf_name() { + per_object_pooled_blob_info_cf_options(factory) + } else if name == event_cursor_cf_name() { + event_cursor_cf_options(factory) + } else if name == metadata_cf_name() { + factory.metadata() + } else if name == node_status_cf_name() { + factory.node_status() + } else if name == garbage_collector_table_cf_name() { + factory.garbage_collector() + } else if name == event_index_cf_name() { + factory.standard() + } else if name.ends_with("/primary-slivers") || name.ends_with("/secondary-slivers") { + factory.shard() + } else if name.ends_with("/status") { + factory.shard_status() + } else if name.ends_with("/sync-progress") { + factory.shard_sync_progress() + } else if name.ends_with("/pending-recover-slivers") { + factory.pending_recover_slivers() + } else { + // "default" and any unknown CFs. + factory.standard() + } +} + fn list_column_families(db_path: PathBuf) -> Result<()> { let result = rocksdb::DB::list_cf(&RocksdbOptions::default(), db_path); if let Ok(column_families) = result { diff --git a/crates/walrus-service/src/node/storage.rs b/crates/walrus-service/src/node/storage.rs index c53f82af34..d133a4bace 100644 --- a/crates/walrus-service/src/node/storage.rs +++ b/crates/walrus-service/src/node/storage.rs @@ -73,6 +73,7 @@ pub use database_config::{DatabaseConfig, DatabaseTableOptionsFactory}; mod event_cursor_table; pub(super) use event_cursor_table::EventProgress; +pub(crate) use event_cursor_table::event_cursor_cf_options; mod event_sequencer; mod metrics; diff --git a/crates/walrus-service/src/node/storage/event_cursor_table.rs b/crates/walrus-service/src/node/storage/event_cursor_table.rs index f469ca2ad0..8ae17e1f3a 100644 --- a/crates/walrus-service/src/node/storage/event_cursor_table.rs +++ b/crates/walrus-service/src/node/storage/event_cursor_table.rs @@ -109,13 +109,10 @@ impl EventCursorTable { } pub fn options(db_table_opts_factory: &DatabaseTableOptionsFactory) -> (&'static str, Options) { - let mut options = db_table_opts_factory.event_cursor(); - options.set_merge_operator( - "update_cursor_and_progress", - update_cursor_and_progress, - |_, _, _| None, - ); - (event_cursor_cf_name(), options) + ( + event_cursor_cf_name(), + event_cursor_cf_options(db_table_opts_factory), + ) } pub fn reposition_event_cursor( @@ -211,6 +208,17 @@ impl EventCursorTable { } } +/// Returns RocksDB options for the event cursor column family, including the merge operator. +pub(crate) fn event_cursor_cf_options(factory: &DatabaseTableOptionsFactory) -> Options { + let mut options = factory.event_cursor(); + options.set_merge_operator( + "update_cursor_and_progress", + update_cursor_and_progress, + |_, _, _| None, + ); + options +} + #[tracing::instrument(level = Level::DEBUG, skip(operands))] fn update_cursor_and_progress( key: &[u8],