diff --git a/crates/walrus-service/src/node/dbtool.rs b/crates/walrus-service/src/node/dbtool.rs index 7d7e58c45b..d5c41080e6 100644 --- a/crates/walrus-service/src/node/dbtool.rs +++ b/crates/walrus-service/src/node/dbtool.rs @@ -3,12 +3,12 @@ //! Tools for inspecting and maintaining the RocksDB database. -use std::path::PathBuf; +use std::{collections::BTreeMap, path::PathBuf}; use anyhow::Result; use bincode::Options; use clap::Subcommand; -use rocksdb::{DB, Options as RocksdbOptions, ReadOptions}; +use rocksdb::{DB, DBRecoveryMode, Options as RocksdbOptions, ReadOptions}; use serde::{Deserialize, Serialize}; use serde_with::serde_as; use sui_types::base_types::ObjectID; @@ -47,11 +47,17 @@ use crate::{ PerObjectBlobInfo, blob_info_cf_options, per_object_blob_info_cf_options, + per_object_pooled_blob_info_cf_options, }, constants::{ aggregate_blob_info_cf_name, + event_cursor_cf_name, + event_index_cf_name, + garbage_collector_table_cf_name, metadata_cf_name, + node_status_cf_name, per_object_blob_info_cf_name, + per_object_pooled_blob_info_cf_name, primary_slivers_column_family_name, secondary_slivers_column_family_name, }, @@ -59,6 +65,43 @@ use crate::{ }, }; +#[derive(Debug, Clone, Copy, Serialize, Deserialize, clap::ValueEnum)] +#[serde(rename_all = "kebab-case")] +/// WAL recovery policy used by the read-only recovery probe. +pub enum ProbeWalRecoveryMode { + /// Tolerate incomplete records only at the end of the WAL. + TolerateCorruptedTailRecords, + /// Require strict consistency and fail on any corruption. + AbsoluteConsistency, + /// Recover to the last consistent point before corruption. + PointInTime, + /// Skip corrupted records and continue replaying later valid records. + SkipAnyCorruptedRecord, +} + +impl ProbeWalRecoveryMode { + fn as_rocksdb(self) -> DBRecoveryMode { + match self { + Self::TolerateCorruptedTailRecords => DBRecoveryMode::TolerateCorruptedTailRecords, + Self::AbsoluteConsistency => DBRecoveryMode::AbsoluteConsistency, + Self::PointInTime => DBRecoveryMode::PointInTime, + Self::SkipAnyCorruptedRecord => DBRecoveryMode::SkipAnyCorruptedRecord, + } + } +} + +impl std::fmt::Display for ProbeWalRecoveryMode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let name = match self { + Self::TolerateCorruptedTailRecords => "tolerate-corrupted-tail-records", + Self::AbsoluteConsistency => "absolute-consistency", + Self::PointInTime => "point-in-time", + Self::SkipAnyCorruptedRecord => "skip-any-corrupted-record", + }; + f.write_str(name) + } +} + /// Database inspection and maintenance tools. #[derive(Subcommand, Debug, Clone, Serialize, Deserialize)] #[serde_as] @@ -140,6 +183,36 @@ pub enum DbToolCommands { db_path: PathBuf, }, + /// Open the RocksDB database read-only with a selectable WAL recovery mode and report what + /// data is visible without persisting any recovery result. + ProbeRecovery { + /// Path to the RocksDB database directory. + #[arg(long)] + db_path: PathBuf, + /// WAL recovery mode to use for the read-only probe. + #[arg(long, value_enum, default_value_t = ProbeWalRecoveryMode::PointInTime)] + wal_recovery_mode: ProbeWalRecoveryMode, + /// Count entries by iterating each reported column family instead of using + /// `rocksdb.estimate-num-keys`. + #[arg(long, default_value_t = false)] + exact_counts: bool, + }, + + /// Open the RocksDB database read-write with a selectable WAL recovery mode and persist the + /// recovery result without starting the full node. + RecoverDb { + /// Path to the RocksDB database directory. + #[arg(long)] + db_path: PathBuf, + /// WAL recovery mode to use for the writable recovery. + #[arg(long, value_enum, default_value_t = ProbeWalRecoveryMode::PointInTime)] + wal_recovery_mode: ProbeWalRecoveryMode, + /// Count entries by iterating each reported column family instead of using + /// `rocksdb.estimate-num-keys`. + #[arg(long, default_value_t = false)] + exact_counts: bool, + }, + /// Scan blob metadata from the RocksDB database. ReadBlobMetadata { /// Path to the RocksDB database directory. @@ -272,6 +345,16 @@ impl DbToolCommands { column_family_names, } => drop_column_families(db_path, column_family_names), Self::ListColumnFamilies { db_path } => list_column_families(db_path), + Self::ProbeRecovery { + db_path, + wal_recovery_mode, + exact_counts, + } => probe_recovery(db_path, wal_recovery_mode, exact_counts), + Self::RecoverDb { + db_path, + wal_recovery_mode, + exact_counts, + } => recover_db(db_path, wal_recovery_mode, exact_counts), Self::ReadBlobMetadata { db_path, start_blob_id, @@ -316,11 +399,13 @@ fn repair_db(db_path: PathBuf) -> Result<()> { fn scan_events(db_path: PathBuf, start_event_index: u64, count: usize) -> Result<()> { println!("Scanning events from event index {start_event_index}"); - let opts = RocksdbOptions::default(); - let db = DB::open_cf_for_read_only( - &opts, + let db = DB::open_cf_with_opts_for_read_only( + &RocksdbOptions::default(), db_path, - [event_processor_constants::EVENT_STORE], + [( + event_processor_constants::EVENT_STORE, + DatabaseTableOptionsFactory::new(DatabaseConfig::default(), false).event_store(), + )], false, )?; let cf = db @@ -481,10 +566,16 @@ fn count_certified_blobs(db_path: PathBuf, epoch: Epoch) -> Result<()> { /// Drop a column family from the RocksDB database. fn drop_column_families(db_path: PathBuf, column_family_names: Vec) -> Result<()> { - let db = DB::open_cf( - &RocksdbOptions::default(), + let column_families = DB::list_cf(&RocksdbOptions::default(), &db_path)?; + let db_kind = detect_probe_db_kind(&column_families); + + let mut db_opts = RocksdbOptions::default(); + db_opts.set_max_open_files(512_000); + + let db = DB::open_cf_with_opts( + &db_opts, &db_path, - &DB::list_cf(&RocksdbOptions::default(), &db_path)?, + probe_cf_options(&column_families, db_kind), ) .inspect_err(|_| { println!( @@ -515,6 +606,352 @@ fn list_column_families(db_path: PathBuf) -> Result<()> { Ok(()) } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum ProbeDbKind { + Storage, + EventProcessor, + Unknown, +} + +#[derive(Debug, Default)] +struct StorageShardProbeStats { + primary: Option, + secondary: Option, + pending_recover: Option, + status: Option, + sync_progress: Option, +} + +fn probe_recovery( + db_path: PathBuf, + wal_recovery_mode: ProbeWalRecoveryMode, + exact_counts: bool, +) -> Result<()> { + let (column_families, db_kind, db_opts) = prepare_recovery_open(&db_path, wal_recovery_mode)?; + let cf_options = probe_cf_options(&column_families, db_kind); + let db = DB::open_cf_with_opts_for_read_only(&db_opts, &db_path, cf_options, false)?; + + println!("Opened DB read-only at {}", db_path.display()); + println!("WAL recovery mode: {wal_recovery_mode}"); + println!("Probe is non-mutating: read-only open does not persist recovery state."); + println!( + "Count mode: {}", + if exact_counts { + "exact iteration" + } else { + "rocksdb.estimate-num-keys" + } + ); + println!("Detected DB kind: {}", probe_db_kind_name(db_kind)); + println!("Column families discovered: {}", column_families.len()); + + match db_kind { + ProbeDbKind::Storage => report_storage_probe(&db, &column_families, exact_counts)?, + ProbeDbKind::EventProcessor => report_generic_probe( + &db, + &column_families, + exact_counts, + Some(&event_processor_headings()), + )?, + ProbeDbKind::Unknown => report_generic_probe(&db, &column_families, exact_counts, None)?, + } + + Ok(()) +} + +fn recover_db( + db_path: PathBuf, + wal_recovery_mode: ProbeWalRecoveryMode, + exact_counts: bool, +) -> Result<()> { + let (column_families, db_kind, db_opts) = prepare_recovery_open(&db_path, wal_recovery_mode)?; + let cf_options = probe_cf_options(&column_families, db_kind); + let db = DB::open_cf_with_opts(&db_opts, &db_path, cf_options).inspect_err(|_| { + println!(concat!( + "failed to open database for recovery; ", + "make sure the storage node is stopped before running recover-db", + )) + })?; + + println!("Opened DB read-write at {}", db_path.display()); + println!("WAL recovery mode: {wal_recovery_mode}"); + println!("Recovery is mutating: successful open persists the recovered DB state."); + println!( + "Count mode: {}", + if exact_counts { + "exact iteration" + } else { + "rocksdb.estimate-num-keys" + } + ); + println!("Detected DB kind: {}", probe_db_kind_name(db_kind)); + println!("Column families discovered: {}", column_families.len()); + + match db_kind { + ProbeDbKind::Storage => report_storage_probe(&db, &column_families, exact_counts)?, + ProbeDbKind::EventProcessor => report_generic_probe( + &db, + &column_families, + exact_counts, + Some(&event_processor_headings()), + )?, + ProbeDbKind::Unknown => report_generic_probe(&db, &column_families, exact_counts, None)?, + } + + drop(db); + println!(); + println!("Recovery committed successfully."); + + Ok(()) +} + +fn prepare_recovery_open( + db_path: &std::path::Path, + wal_recovery_mode: ProbeWalRecoveryMode, +) -> Result<(Vec, ProbeDbKind, RocksdbOptions)> { + let column_families = DB::list_cf(&RocksdbOptions::default(), db_path)?; + let db_kind = detect_probe_db_kind(&column_families); + + let mut db_opts = RocksdbOptions::default(); + db_opts.set_max_open_files(512_000); + db_opts.set_wal_recovery_mode(wal_recovery_mode.as_rocksdb()); + + Ok((column_families, db_kind, db_opts)) +} + +fn detect_probe_db_kind(column_families: &[String]) -> ProbeDbKind { + if column_families.iter().any(|cf| { + cf == aggregate_blob_info_cf_name() + || cf == per_object_blob_info_cf_name() + || cf == metadata_cf_name() + || cf.starts_with("shard-") + }) { + ProbeDbKind::Storage + } else if column_families.iter().any(|cf| { + cf == event_processor_constants::CHECKPOINT_STORE + || cf == event_processor_constants::WALRUS_PACKAGE_STORE + || cf == event_processor_constants::COMMITTEE_STORE + || cf == event_processor_constants::EVENT_STORE + || cf == event_processor_constants::INIT_STATE + }) { + ProbeDbKind::EventProcessor + } else { + ProbeDbKind::Unknown + } +} + +fn probe_db_kind_name(db_kind: ProbeDbKind) -> &'static str { + match db_kind { + ProbeDbKind::Storage => "storage", + ProbeDbKind::EventProcessor => "event-processor", + ProbeDbKind::Unknown => "unknown", + } +} + +fn probe_cf_options( + column_families: &[String], + db_kind: ProbeDbKind, +) -> Vec<(&str, RocksdbOptions)> { + let db_config = DatabaseConfig::default(); + let factory = DatabaseTableOptionsFactory::new(db_config, db_kind == ProbeDbKind::Storage); + column_families + .iter() + .map(|cf_name| { + ( + cf_name.as_str(), + probe_cf_options_for_name(cf_name, db_kind, &factory), + ) + }) + .collect() +} + +fn probe_cf_options_for_name( + cf_name: &str, + db_kind: ProbeDbKind, + factory: &DatabaseTableOptionsFactory, +) -> RocksdbOptions { + match db_kind { + ProbeDbKind::Storage => { + if let Some((_, suffix)) = parse_shard_cf_name(cf_name) { + return match suffix { + "primary-slivers" | "secondary-slivers" => factory.shard(), + "status" => factory.shard_status(), + "sync-progress" => factory.shard_sync_progress(), + "pending-recover-slivers" => factory.pending_recover_slivers(), + _ => RocksdbOptions::default(), + }; + } + + match cf_name { + name if name == aggregate_blob_info_cf_name() => blob_info_cf_options(factory), + name if name == per_object_blob_info_cf_name() => { + per_object_blob_info_cf_options(factory) + } + name if name == per_object_pooled_blob_info_cf_name() => { + per_object_pooled_blob_info_cf_options(factory) + } + name if name == event_index_cf_name() => factory.standard(), + name if name == metadata_cf_name() => factory.metadata(), + name if name == node_status_cf_name() => factory.node_status(), + name if name == event_cursor_cf_name() => factory.event_cursor(), + name if name == garbage_collector_table_cf_name() => factory.garbage_collector(), + _ => RocksdbOptions::default(), + } + } + ProbeDbKind::EventProcessor => match cf_name { + name if name == event_processor_constants::CHECKPOINT_STORE => { + factory.checkpoint_store() + } + name if name == event_processor_constants::WALRUS_PACKAGE_STORE => { + factory.walrus_package_store() + } + name if name == event_processor_constants::COMMITTEE_STORE => factory.committee_store(), + name if name == event_processor_constants::EVENT_STORE => factory.event_store(), + name if name == event_processor_constants::INIT_STATE => factory.init_state(), + _ => RocksdbOptions::default(), + }, + ProbeDbKind::Unknown => RocksdbOptions::default(), + } +} + +fn report_storage_probe(db: &DB, column_families: &[String], exact_counts: bool) -> Result<()> { + println!(); + println!("Top-level column families:"); + for cf_name in [ + aggregate_blob_info_cf_name(), + per_object_blob_info_cf_name(), + per_object_pooled_blob_info_cf_name(), + metadata_cf_name(), + node_status_cf_name(), + event_cursor_cf_name(), + event_index_cf_name(), + garbage_collector_table_cf_name(), + ] { + if column_families.iter().any(|name| name == cf_name) { + let count = count_cf_entries(db, cf_name, exact_counts)?; + println!(" {cf_name}: {}", format_probe_count(count)); + } + } + + let mut shard_stats = BTreeMap::::new(); + for cf_name in column_families { + let Some((shard_index, suffix)) = parse_shard_cf_name(cf_name) else { + continue; + }; + let count = count_cf_entries(db, cf_name, exact_counts)?; + let entry = shard_stats.entry(shard_index).or_default(); + match suffix { + "primary-slivers" => entry.primary = count, + "secondary-slivers" => entry.secondary = count, + "pending-recover-slivers" => entry.pending_recover = count, + "status" => entry.status = count, + "sync-progress" => entry.sync_progress = count, + _ => {} + } + } + + println!(); + println!("Per-shard sliver visibility:"); + let mut total_primary = 0_u64; + let mut total_secondary = 0_u64; + let mut total_pending = 0_u64; + for (shard_index, stats) in &shard_stats { + total_primary += stats.primary.unwrap_or(0); + total_secondary += stats.secondary.unwrap_or(0); + total_pending += stats.pending_recover.unwrap_or(0); + println!( + concat!( + " shard-{}: primary={}, secondary={}, ", + "pending-recover={}, status={}, sync-progress={}", + ), + shard_index, + format_probe_count(stats.primary), + format_probe_count(stats.secondary), + format_probe_count(stats.pending_recover), + format_probe_count(stats.status), + format_probe_count(stats.sync_progress), + ); + } + + println!(); + println!( + "Totals: primary={}, secondary={}, pending-recover={}", + total_primary, total_secondary, total_pending + ); + + Ok(()) +} + +fn event_processor_headings() -> Vec<&'static str> { + vec![ + event_processor_constants::CHECKPOINT_STORE, + event_processor_constants::WALRUS_PACKAGE_STORE, + event_processor_constants::COMMITTEE_STORE, + event_processor_constants::EVENT_STORE, + event_processor_constants::INIT_STATE, + ] +} + +fn report_generic_probe( + db: &DB, + column_families: &[String], + exact_counts: bool, + preferred_order: Option<&[&str]>, +) -> Result<()> { + println!(); + println!("Column family visibility:"); + + if let Some(preferred_order) = preferred_order { + for cf_name in preferred_order { + if column_families.iter().any(|name| name == cf_name) { + let count = count_cf_entries(db, cf_name, exact_counts)?; + println!(" {cf_name}: {}", format_probe_count(count)); + } + } + } + + for cf_name in column_families { + if preferred_order.is_some_and(|ordered| ordered.iter().any(|name| *name == cf_name)) { + continue; + } + let count = count_cf_entries(db, cf_name, exact_counts)?; + println!(" {cf_name}: {}", format_probe_count(count)); + } + + Ok(()) +} + +fn parse_shard_cf_name(cf_name: &str) -> Option<(u16, &str)> { + let (shard_name, suffix) = cf_name.split_once('/')?; + let shard_index = shard_name.strip_prefix("shard-")?.parse().ok()?; + Some((shard_index, suffix)) +} + +fn count_cf_entries(db: &DB, cf_name: &str, exact_counts: bool) -> Result> { + let Some(cf) = db.cf_handle(cf_name) else { + return Ok(None); + }; + + if !exact_counts { + return db + .property_int_value_cf(&cf, "rocksdb.estimate-num-keys") + .map_err(Into::into); + } + + let mut count = 0_u64; + for entry in db.iterator_cf(&cf, rocksdb::IteratorMode::Start) { + let _ = entry?; + count += 1; + } + Ok(Some(count)) +} + +fn format_probe_count(count: Option) -> String { + count + .map(|value| value.to_string()) + .unwrap_or_else(|| "n/a".to_string()) +} + fn read_blob_metadata( db_path: PathBuf, start_blob_id: Option, @@ -667,10 +1104,13 @@ fn read_secondary_slivers( } fn read_event_processor_init_state(db_path: PathBuf) -> Result<()> { - let db = DB::open_cf_for_read_only( + let db = DB::open_cf_with_opts_for_read_only( &RocksdbOptions::default(), db_path, - [event_processor_constants::INIT_STATE], + [( + event_processor_constants::INIT_STATE, + DatabaseTableOptionsFactory::new(DatabaseConfig::default(), false).init_state(), + )], false, )?; @@ -694,10 +1134,13 @@ fn read_event_processor_init_state(db_path: PathBuf) -> Result<()> { } fn read_certified_event_blobs(db_path: PathBuf) -> Result<()> { - let db = DB::open_cf_for_read_only( + let db = DB::open_cf_with_opts_for_read_only( &RocksdbOptions::default(), db_path, - [certified_cf_name()], + [( + certified_cf_name(), + DatabaseTableOptionsFactory::new(DatabaseConfig::default(), false).certified(), + )], false, )?; @@ -719,10 +1162,13 @@ fn read_certified_event_blobs(db_path: PathBuf) -> Result<()> { } fn read_attested_event_blobs(db_path: PathBuf) -> Result<()> { - let db = DB::open_cf_for_read_only( + let db = DB::open_cf_with_opts_for_read_only( &RocksdbOptions::default(), db_path, - [attested_cf_name()], + [( + attested_cf_name(), + DatabaseTableOptionsFactory::new(DatabaseConfig::default(), false).attested(), + )], false, )?; @@ -745,10 +1191,13 @@ fn read_attested_event_blobs(db_path: PathBuf) -> Result<()> { } fn read_pending_event_blobs(db_path: PathBuf, start_seq: Option, count: usize) -> Result<()> { - let db = DB::open_cf_for_read_only( + let db = DB::open_cf_with_opts_for_read_only( &RocksdbOptions::default(), db_path, - [pending_cf_name()], + [( + pending_cf_name(), + DatabaseTableOptionsFactory::new(DatabaseConfig::default(), false).pending(), + )], false, )?; @@ -784,10 +1233,13 @@ fn read_pending_event_blobs(db_path: PathBuf, start_seq: Option, count: usi } fn read_failed_to_attest_event_blobs(db_path: PathBuf) -> Result<()> { - let db = DB::open_cf_for_read_only( + let db = DB::open_cf_with_opts_for_read_only( &RocksdbOptions::default(), db_path, - [failed_to_attest_cf_name()], + [( + failed_to_attest_cf_name(), + DatabaseTableOptionsFactory::new(DatabaseConfig::default(), false).failed_to_attest(), + )], false, )?; @@ -808,3 +1260,50 @@ fn read_failed_to_attest_event_blobs(db_path: PathBuf) -> Result<()> { Ok(()) } + +#[cfg(test)] +mod tests { + use super::{ + ProbeDbKind, + detect_probe_db_kind, + event_processor_constants, + parse_shard_cf_name, + }; + use crate::node::storage::constants::{aggregate_blob_info_cf_name, metadata_cf_name}; + + #[test] + fn detect_storage_probe_db() { + let column_families = vec![ + "default".to_string(), + aggregate_blob_info_cf_name().to_string(), + metadata_cf_name().to_string(), + "shard-7/primary-slivers".to_string(), + ]; + + assert_eq!(detect_probe_db_kind(&column_families), ProbeDbKind::Storage); + } + + #[test] + fn detect_event_processor_probe_db() { + let column_families = vec![ + "default".to_string(), + event_processor_constants::CHECKPOINT_STORE.to_string(), + event_processor_constants::EVENT_STORE.to_string(), + ]; + + assert_eq!( + detect_probe_db_kind(&column_families), + ProbeDbKind::EventProcessor + ); + } + + #[test] + fn parse_storage_shard_column_family_name() { + assert_eq!( + parse_shard_cf_name("shard-12/pending-recover-slivers"), + Some((12, "pending-recover-slivers")) + ); + assert_eq!(parse_shard_cf_name("metadata"), None); + assert_eq!(parse_shard_cf_name("shard-x/primary-slivers"), None); + } +}