Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 51 additions & 3 deletions crates/walrus-service/src/node/dbtool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,21 @@ use crate::{
PerObjectBlobInfo,
blob_info_cf_options,
per_object_blob_info_cf_options,
per_object_pooled_blob_info_cf_options,
},
constants::{
aggregate_blob_info_cf_name,
event_cursor_cf_name,
event_index_cf_name,
garbage_collector_table_cf_name,
metadata_cf_name,
node_status_cf_name,
per_object_blob_info_cf_name,
per_object_pooled_blob_info_cf_name,
primary_slivers_column_family_name,
secondary_slivers_column_family_name,
},
event_cursor_cf_options,
},
},
};
Expand Down Expand Up @@ -526,8 +533,9 @@ fn drop_column_families(db_path: PathBuf, column_family_names: Vec<String>) -> R

/// Drop column families from the storage database using the node's configuration file.
///
/// Opens the database with the same options as the storage node to avoid WAL/SST consistency
/// errors that occur when opening with default RocksDB options.
/// Opens the database with the same per-CF options as the storage node (including merge operators)
/// to avoid WAL/SST consistency errors that occur when opening with default RocksDB options.
/// This can only be called when the storage node is stopped.
fn drop_storage_column_families_from_config(
config_path: PathBuf,
column_family_names: Vec<String>,
Expand All @@ -542,11 +550,16 @@ fn drop_storage_column_families_from_config(
let mut db_opts = RocksdbOptions::from(&db_config.global());
db_opts.create_missing_column_families(true);

let factory = DatabaseTableOptionsFactory::new(db_config.clone(), true);

let existing_cfs = DB::list_cf(&db_opts, db_path)?;
println!("Existing column families: {existing_cfs:?}");
let cf_descriptors: Vec<_> = existing_cfs
.iter()
.map(|name| (name.as_str(), db_opts.clone()))
.map(|name| {
let opts = storage_cf_options(name, &factory);
(name.as_str(), opts)
})
.collect();

let no_op_sampling = SamplingInterval::new(Duration::ZERO, 0);
Expand Down Expand Up @@ -581,6 +594,41 @@ fn drop_storage_column_families_from_config(
Ok(())
}

/// Returns the correct RocksDB options for a storage DB column family, matching the options
/// used by `Storage::open()`. This is critical for CFs with merge operators
/// (`aggregate_blob_info`, `per_object_blob_info`, `event_cursor`, etc.) since RocksDB needs
/// them to replay WAL entries.
fn storage_cf_options(name: &str, factory: &DatabaseTableOptionsFactory) -> RocksdbOptions {
if name == aggregate_blob_info_cf_name() {
blob_info_cf_options(factory)
} else if name == per_object_blob_info_cf_name() {
per_object_blob_info_cf_options(factory)
} else if name == per_object_pooled_blob_info_cf_name() {
per_object_pooled_blob_info_cf_options(factory)
} else if name == event_cursor_cf_name() {
event_cursor_cf_options(factory)
} else if name == metadata_cf_name() {
factory.metadata()
} else if name == node_status_cf_name() {
factory.node_status()
} else if name == garbage_collector_table_cf_name() {
factory.garbage_collector()
} else if name == event_index_cf_name() {
factory.standard()
} else if name.ends_with("/primary-slivers") || name.ends_with("/secondary-slivers") {
factory.shard()
} else if name.ends_with("/status") {
factory.shard_status()
} else if name.ends_with("/sync-progress") {
factory.shard_sync_progress()
} else if name.ends_with("/pending-recover-slivers") {
factory.pending_recover_slivers()
} else {
// "default" and any unknown CFs.
factory.standard()
}
}

fn list_column_families(db_path: PathBuf) -> Result<()> {
let result = rocksdb::DB::list_cf(&RocksdbOptions::default(), db_path);
if let Ok(column_families) = result {
Expand Down
1 change: 1 addition & 0 deletions crates/walrus-service/src/node/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub use database_config::{DatabaseConfig, DatabaseTableOptionsFactory};

mod event_cursor_table;
pub(super) use event_cursor_table::EventProgress;
pub(crate) use event_cursor_table::event_cursor_cf_options;

mod event_sequencer;
mod metrics;
Expand Down
22 changes: 15 additions & 7 deletions crates/walrus-service/src/node/storage/event_cursor_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,10 @@ impl EventCursorTable {
}

pub fn options(db_table_opts_factory: &DatabaseTableOptionsFactory) -> (&'static str, Options) {
let mut options = db_table_opts_factory.event_cursor();
options.set_merge_operator(
"update_cursor_and_progress",
update_cursor_and_progress,
|_, _, _| None,
);
(event_cursor_cf_name(), options)
(
event_cursor_cf_name(),
event_cursor_cf_options(db_table_opts_factory),
)
}

pub fn reposition_event_cursor(
Expand Down Expand Up @@ -211,6 +208,17 @@ impl EventCursorTable {
}
}

/// Returns RocksDB options for the event cursor column family, including the merge operator.
pub(crate) fn event_cursor_cf_options(factory: &DatabaseTableOptionsFactory) -> Options {
let mut options = factory.event_cursor();
options.set_merge_operator(
"update_cursor_and_progress",
update_cursor_and_progress,
|_, _, _| None,
);
options
}

#[tracing::instrument(level = Level::DEBUG, skip(operands))]
fn update_cursor_and_progress(
key: &[u8],
Expand Down
Loading