Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ k256 = { version = "0.13.4", default-features = false }
kitchensink-runtime = { path = "substrate/bin/node/runtime" }
kvdb = { version = "0.13.0" }
kvdb-memorydb = { version = "0.13.0" }
kvdb-rocksdb = { version = "0.20.1" }
kvdb-rocksdb = { version = "0.21.0" }
kvdb-shared-tests = { version = "0.11.0" }
landlock = { version = "0.3.0" }
libc = { version = "0.2.155" }
Expand Down
14 changes: 14 additions & 0 deletions prdoc/pr_10495.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
title: 'Rocksdb: Force compact columns on after warp sync'
doc:
- audience: Node Operator
description: |-
Recently we introduced a change that was always force compacting a Rocksdb database when starting a node and after writing a lot of data. We found out that force compacting a huge RocksDB of more than 600GB takes quite some time (more than one hour) and this every time.

So, this pull request changes the compaction to only happen after warp sync (and genesis) when we reset the state column to some given state. This way we don't run it anymore on startup of the node and it should fix the problems we have seen with archive nodes.
crates:
- name: sc-client-db
bump: patch
validate: false
- name: sp-database
bump: patch
validate: false
4 changes: 2 additions & 2 deletions substrate/client/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ codec = { features = ["derive"], workspace = true, default-features = true }
hash-db = { workspace = true, default-features = true }
kvdb = { workspace = true }
kvdb-memorydb = { workspace = true }
kvdb-rocksdb = { optional = true, workspace = true }
kvdb-rocksdb = { optional = true, workspace = true, features = ["jemalloc"] }
linked-hash-map = { workspace = true }
log = { workspace = true, default-features = true }
parity-db = { workspace = true }
Expand Down Expand Up @@ -62,4 +62,4 @@ runtime-benchmarks = [
"kitchensink-runtime/runtime-benchmarks",
"sp-runtime/runtime-benchmarks",
]
rocksdb = ["kvdb-rocksdb"]
rocksdb = ["kvdb-rocksdb", "sp-database/rocksdb"]
11 changes: 11 additions & 0 deletions substrate/client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,7 @@ pub struct BlockImportOperation<Block: BlockT> {
set_head: Option<Block::Hash>,
commit_state: bool,
create_gap: bool,
reset_storage: bool,
index_ops: Vec<IndexOperation>,
}

Expand Down Expand Up @@ -934,6 +935,7 @@ impl<Block: BlockT> sc_client_api::backend::BlockImportOperation<Block>
) -> ClientResult<Block::Hash> {
let root = self.apply_new_state(storage, state_version)?;
self.commit_state = true;
self.reset_storage = true;
Ok(root)
}

Expand Down Expand Up @@ -1841,6 +1843,14 @@ impl<Block: BlockT> Backend<Block> {

self.storage.db.commit(transaction)?;

// `reset_storage == true` means the entire state got replaced.
// In this case we optimize the `STATE` column to improve read performance.
if operation.reset_storage {
if let Err(e) = self.storage.db.optimize_db_col(columns::STATE) {
warn!(target: "db", "Failed to optimize database after state import: {e:?}");
}
}

// Apply all in-memory state changes.
// Code beyond this point can't fail.

Expand Down Expand Up @@ -2152,6 +2162,7 @@ impl<Block: BlockT> sc_client_api::backend::Backend<Block> for Backend<Block> {
set_head: None,
commit_state: false,
create_gap: true,
reset_storage: false,
index_ops: Default::default(),
})
}
Expand Down
2 changes: 1 addition & 1 deletion substrate/client/db/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ fn open_kvdb_rocksdb<Block: BlockT>(
let db = kvdb_rocksdb::Database::open(&db_config, path)?;
// write database version only after the database is successfully opened
crate::upgrade::update_version(path)?;
Ok(sp_database::as_database(db))
Ok(sp_database::as_rocksdb_database(db))
}

#[cfg(not(any(feature = "rocksdb", test)))]
Expand Down
5 changes: 5 additions & 0 deletions substrate/primitives/database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,9 @@ workspace = true

[dependencies]
kvdb = { workspace = true }
kvdb-rocksdb = { optional = true, workspace = true }
parking_lot = { workspace = true, default-features = true }

[features]
default = []
rocksdb = ["kvdb-rocksdb"]
163 changes: 100 additions & 63 deletions substrate/primitives/database/src/kvdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,75 @@ fn handle_err<T>(result: std::io::Result<T>) -> T {
}
}

/// Wrap RocksDb database into a trait object that implements `sp_database::Database`
/// Read the reference counter for a key.
fn read_counter(
db: &dyn KeyValueDB,
col: ColumnId,
key: &[u8],
) -> error::Result<(Vec<u8>, Option<u32>)> {
let mut counter_key = key.to_vec();
counter_key.push(0);
Ok(match db.get(col, &counter_key).map_err(|e| error::DatabaseError(Box::new(e)))? {
Some(data) => {
let mut counter_data = [0; 4];
if data.len() != 4 {
return Err(error::DatabaseError(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
format!("Unexpected counter len {}", data.len()),
))))
}
counter_data.copy_from_slice(&data);
let counter = u32::from_le_bytes(counter_data);
(counter_key, Some(counter))
},
None => (counter_key, None),
})
}

/// Commit a transaction to a KeyValueDB.
fn commit_impl<H: Clone + AsRef<[u8]>>(
db: &dyn KeyValueDB,
transaction: Transaction<H>,
) -> error::Result<()> {
let mut tx = DBTransaction::new();
for change in transaction.0.into_iter() {
match change {
Change::Set(col, key, value) => tx.put_vec(col, &key, value),
Change::Remove(col, key) => tx.delete(col, &key),
Change::Store(col, key, value) => match read_counter(db, col, key.as_ref())? {
(counter_key, Some(mut counter)) => {
counter += 1;
tx.put(col, &counter_key, &counter.to_le_bytes());
},
(counter_key, None) => {
let d = 1u32.to_le_bytes();
tx.put(col, &counter_key, &d);
tx.put_vec(col, key.as_ref(), value);
},
},
Change::Reference(col, key) => {
if let (counter_key, Some(mut counter)) = read_counter(db, col, key.as_ref())? {
counter += 1;
tx.put(col, &counter_key, &counter.to_le_bytes());
}
},
Change::Release(col, key) => {
if let (counter_key, Some(mut counter)) = read_counter(db, col, key.as_ref())? {
counter -= 1;
if counter == 0 {
tx.delete(col, &counter_key);
tx.delete(col, key.as_ref());
} else {
tx.put(col, &counter_key, &counter.to_le_bytes());
}
}
},
}
}
db.write(tx).map_err(|e| error::DatabaseError(Box::new(e)))
}

/// Wrap generic kvdb-based database into a trait object that implements [`Database`].
pub fn as_database<D, H>(db: D) -> std::sync::Arc<dyn Database<H>>
where
D: KeyValueDB + 'static,
Expand All @@ -40,72 +108,28 @@ where
std::sync::Arc::new(DbAdapter(db))
}

impl<D: KeyValueDB> DbAdapter<D> {
// Returns counter key and counter value if it exists.
fn read_counter(&self, col: ColumnId, key: &[u8]) -> error::Result<(Vec<u8>, Option<u32>)> {
// Add a key suffix for the counter
let mut counter_key = key.to_vec();
counter_key.push(0);
Ok(match self.0.get(col, &counter_key).map_err(|e| error::DatabaseError(Box::new(e)))? {
Some(data) => {
let mut counter_data = [0; 4];
if data.len() != 4 {
return Err(error::DatabaseError(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
format!("Unexpected counter len {}", data.len()),
))))
}
counter_data.copy_from_slice(&data);
let counter = u32::from_le_bytes(counter_data);
(counter_key, Some(counter))
},
None => (counter_key, None),
})
impl<D: KeyValueDB, H: Clone + AsRef<[u8]>> Database<H> for DbAdapter<D> {
fn commit(&self, transaction: Transaction<H>) -> error::Result<()> {
commit_impl(&self.0, transaction)
}

fn get(&self, col: ColumnId, key: &[u8]) -> Option<Vec<u8>> {
handle_err(self.0.get(col, key))
}

fn contains(&self, col: ColumnId, key: &[u8]) -> bool {
handle_err(self.0.has_key(col, key))
}
}

impl<D: KeyValueDB, H: Clone + AsRef<[u8]>> Database<H> for DbAdapter<D> {
/// RocksDB-specific adapter that implements `optimize_db` via `force_compact`.
#[cfg(feature = "rocksdb")]
pub struct RocksDbAdapter(kvdb_rocksdb::Database);

#[cfg(feature = "rocksdb")]
impl<H: Clone + AsRef<[u8]>> Database<H> for RocksDbAdapter {
fn commit(&self, transaction: Transaction<H>) -> error::Result<()> {
let mut tx = DBTransaction::new();
for change in transaction.0.into_iter() {
match change {
Change::Set(col, key, value) => tx.put_vec(col, &key, value),
Change::Remove(col, key) => tx.delete(col, &key),
Change::Store(col, key, value) => match self.read_counter(col, key.as_ref())? {
(counter_key, Some(mut counter)) => {
counter += 1;
tx.put(col, &counter_key, &counter.to_le_bytes());
},
(counter_key, None) => {
let d = 1u32.to_le_bytes();
tx.put(col, &counter_key, &d);
tx.put_vec(col, key.as_ref(), value);
},
},
Change::Reference(col, key) => {
if let (counter_key, Some(mut counter)) =
self.read_counter(col, key.as_ref())?
{
counter += 1;
tx.put(col, &counter_key, &counter.to_le_bytes());
}
},
Change::Release(col, key) => {
if let (counter_key, Some(mut counter)) =
self.read_counter(col, key.as_ref())?
{
counter -= 1;
if counter == 0 {
tx.delete(col, &counter_key);
tx.delete(col, key.as_ref());
} else {
tx.put(col, &counter_key, &counter.to_le_bytes());
}
}
},
}
}
self.0.write(tx).map_err(|e| error::DatabaseError(Box::new(e)))
commit_impl(&self.0, transaction)
}

fn get(&self, col: ColumnId, key: &[u8]) -> Option<Vec<u8>> {
Expand All @@ -115,4 +139,17 @@ impl<D: KeyValueDB, H: Clone + AsRef<[u8]>> Database<H> for DbAdapter<D> {
fn contains(&self, col: ColumnId, key: &[u8]) -> bool {
handle_err(self.0.has_key(col, key))
}

fn optimize_db_col(&self, col: ColumnId) -> error::Result<()> {
self.0.force_compact(col).map_err(|e| error::DatabaseError(Box::new(e)))
}
}

/// Wrap RocksDB database into a trait object with `optimize_db` support.
#[cfg(feature = "rocksdb")]
pub fn as_rocksdb_database<H>(db: kvdb_rocksdb::Database) -> std::sync::Arc<dyn Database<H>>
where
H: Clone + AsRef<[u8]>,
{
std::sync::Arc::new(RocksDbAdapter(db))
}
7 changes: 7 additions & 0 deletions substrate/primitives/database/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ mod kvdb;
mod mem;

pub use crate::kvdb::as_database;
#[cfg(feature = "rocksdb")]
pub use crate::kvdb::as_rocksdb_database;
pub use mem::MemDb;

/// An identifier for a column.
Expand Down Expand Up @@ -117,6 +119,11 @@ pub trait Database<H: Clone + AsRef<[u8]>>: Send + Sync {
///
/// Not all database implementations use a prefix for keys, so this function may be a noop.
fn sanitize_key(&self, _key: &mut Vec<u8>) {}

/// Optimize a database column.
fn optimize_db_col(&self, _col: ColumnId) -> error::Result<()> {
Ok(())
}
}

impl<H> std::fmt::Debug for dyn Database<H> {
Expand Down
Loading