Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
e38dad3
Moving merkle.rs to merkle directory
bernard-avalabs Aug 14, 2025
03d5ad1
chore: Moved merkle.rs to mod.rs in merkle directory
bernard-avalabs Aug 14, 2025
a0aeb19
Merge branch 'bernard/insert-worker-pool-rayon' of github.com:ava-lab…
bernard-avalabs Aug 14, 2025
afae01e
Simple version where all operations are offloaded to a single
bernard-avalabs Aug 19, 2025
c9e7651
Basic handoff to offload thread appears to be working.
bernard-avalabs Aug 19, 2025
bd877e6
More testing
bernard-avalabs Aug 19, 2025
7a22038
Working on prepare phase. For the case where the trie is empty,
bernard-avalabs Aug 20, 2025
c3d4100
Working on case for non-empty trie and an non-empty partial path in t…
bernard-avalabs Aug 20, 2025
c2445bc
Implemented all of the prepare phase cases. Appears to be working.
bernard-avalabs Aug 20, 2025
894033c
Added comments detailing steps to fix a temporarily malformed
bernard-avalabs Aug 20, 2025
8fb6da5
Initial code for undoing transform
bernard-avalabs Aug 28, 2025
bff76c1
Minor cleanup
bernard-avalabs Aug 29, 2025
7ef3fe2
Additional comments.
bernard-avalabs Aug 29, 2025
31129e6
Added some nodestore related changes. Fix possible error with the use of
bernard-avalabs Aug 29, 2025
efce980
Extra debug
bernard-avalabs Aug 29, 2025
6628617
Appears to work with an added insert path (needs to be refactored).
bernard-avalabs Sep 1, 2025
954a02d
Added Delete and DeleteRange
bernard-avalabs Sep 1, 2025
bd1e7c2
Cleanup
bernard-avalabs Sep 1, 2025
fcda015
Added code to handle most of the cases where the key is empty.
bernard-avalabs Sep 2, 2025
364f8d7
Improved comments around handling empty key.
bernard-avalabs Sep 2, 2025
b12ac8b
Added deleted from child nodestores to the proposal.
bernard-avalabs Sep 2, 2025
e2fde1a
Moved threadpool to DB.
bernard-avalabs Sep 2, 2025
2cbf542
Support case where trie only has a single leaf with an empty key.
bernard-avalabs Sep 2, 2025
9c4ac97
Added some error handling
bernard-avalabs Sep 2, 2025
660f0f8
Reorganizing code
bernard-avalabs Sep 2, 2025
eb2d0dd
Code cleanup
bernard-avalabs Sep 2, 2025
231c264
More error handling.
bernard-avalabs Sep 3, 2025
5fff34d
More cleanly handled the remove empty prefix case
bernard-avalabs Sep 3, 2025
ee9a488
Small fix to also delete the root value on a remove all.
bernard-avalabs Sep 3, 2025
e5bf520
Clean up. Fixed test.
bernard-avalabs Sep 3, 2025
e5a5757
Removed OneLock since it is not necessary
bernard-avalabs Sep 3, 2025
b670403
Cleanup with additional comments.
bernard-avalabs Sep 3, 2025
e816dad
Added test cases
bernard-avalabs Sep 3, 2025
e6d953e
More cleanup
bernard-avalabs Sep 3, 2025
b3b8c3d
Merge remote-tracking branch 'origin/main' into bernard/insert-worker…
bernard-avalabs Sep 4, 2025
2bfaf34
Cleanup for PR
bernard-avalabs Sep 4, 2025
2c2f3b5
Replaced Option<ThreadPool> with OnceLock<ThreadPool> so propose_para…
bernard-avalabs Sep 4, 2025
57717d9
Addressing most of the PR comments.
bernard-avalabs Sep 10, 2025
320c178
Moved threadpool to RevisionManager.
bernard-avalabs Sep 10, 2025
cfe983e
Made the initial change to the insert/remove/remove_prefix interface …
bernard-avalabs Sep 11, 2025
301243c
Moved take child code from parallel.rs to branch.rs
bernard-avalabs Sep 11, 2025
5effd6e
Added a TODO to describe a possible optimization using scoped threads.
bernard-avalabs Sep 11, 2025
7dd784c
Merge remote-tracking branch 'origin/main' into bernard/insert-worker…
bernard-avalabs Sep 11, 2025
1850186
Merge branch 'main' into bernard/insert-worker-pool-rayon-part2-merged
bernard-avalabs Sep 25, 2025
24be026
Updated rayon
bernard-avalabs Sep 25, 2025
f2c464b
Updating CI
bernard-avalabs Sep 25, 2025
f11bd53
Merge branch 'main' into bernard/insert-worker-pool-rayon-part2-merged
bernard-avalabs Sep 26, 2025
21dec41
Removed extra "with" entries in ci.yaml
bernard-avalabs Sep 26, 2025
4cc16b1
Merge branch 'main' into bernard/insert-worker-pool-rayon-part2-merged
bernard-avalabs Sep 26, 2025
6645609
Merge branch 'main' into bernard/insert-worker-pool-rayon-part2-merged
bernard-avalabs Sep 26, 2025
30093de
Separate response channel into sender/receiver
bernard-avalabs Sep 29, 2025
1e98c9a
Merge branch 'main' into bernard/insert-worker-pool-rayon-part2-merged
bernard-avalabs Oct 6, 2025
d01ca7b
Updating cargo.lock
bernard-avalabs Oct 6, 2025
82876c2
Addressing review feedback.
bernard-avalabs Oct 13, 2025
ea9d40f
Undoing some testcode
bernard-avalabs Oct 13, 2025
93cb00c
Addressing feedback.
bernard-avalabs Oct 14, 2025
3729f9c
Merge branch 'main' into bernard/insert-worker-pool-rayon-part2-merged
bernard-avalabs Oct 14, 2025
4f2ac15
Merge fix
bernard-avalabs Oct 14, 2025
95e38db
Added comment
bernard-avalabs Oct 14, 2025
8a59fd1
Address review feedback
bernard-avalabs Oct 15, 2025
0a7e06f
Removed unneeded comment
bernard-avalabs Oct 15, 2025
bbb8158
Cleanup
bernard-avalabs Oct 15, 2025
09206b7
Addressing feedback
bernard-avalabs Oct 15, 2025
dc66b0c
More changes from feedback
bernard-avalabs Oct 16, 2025
1699e16
Merge remote-tracking branch 'origin/main' into bernard/insert-worker…
bernard-avalabs Oct 16, 2025
f79c86e
Changed workers to use Children instead of an array. This allowed the…
bernard-avalabs Oct 16, 2025
002b7aa
Formatting
bernard-avalabs Oct 16, 2025
a3ef44b
More error cleanup for index to path component conversion
bernard-avalabs Oct 16, 2025
d74a1ef
Small change from a if let Some() to an ok_or.
bernard-avalabs Oct 16, 2025
3d7515c
Fixing comment
bernard-avalabs Oct 16, 2025
d63c842
Slight refactor based on discussion with Ron
bernard-avalabs Oct 16, 2025
1fa52fa
Addressed feedback
bernard-avalabs Oct 16, 2025
d765f7a
Fixed formatting
bernard-avalabs Oct 16, 2025
0843e00
Added to the comment explaining why there may be successful responses
bernard-avalabs Oct 16, 2025
bb0f160
Address comments
bernard-avalabs Oct 17, 2025
50e26bf
Formatting
bernard-avalabs Oct 17, 2025
7c02dfe
an u8 -> a u8
bernard-avalabs Oct 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions firewood/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ metrics.workspace = true
thiserror.workspace = true
# Regular dependencies
typed-builder = "0.22.0"
rayon = "1.11.0"

[features]
default = []
Expand Down
180 changes: 180 additions & 0 deletions firewood/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use std::sync::Arc;
use thiserror::Error;
use typed_builder::TypedBuilder;

use crate::merkle::parallel::ParallelMerkle;

#[derive(Error, Debug)]
#[non_exhaustive]
/// Represents the different types of errors that can occur in the database.
Expand Down Expand Up @@ -200,6 +202,26 @@ impl Db {
self.manager.view(root_hash).map_err(Into::into)
}

/// Propose a new batch that is processed in parallel.
///
/// # Panics
///
/// Panics if the revision manager cannot create a thread pool.
pub fn propose_parallel(
&self,
batch: impl IntoIterator<IntoIter: KeyValuePairIter>,
) -> Result<Proposal<'_>, api::Error> {
let parent = self.manager.current_revision();
let mut parallel_merkle = ParallelMerkle::default();
let immutable =
parallel_merkle.create_proposal(&parent, batch, self.manager.threadpool())?;
self.manager.add_proposal(immutable.clone());
Ok(Proposal {
nodestore: immutable,
db: self,
})
}

/// Dump the Trie of the latest revision.
pub fn dump(&self, w: &mut dyn Write) -> Result<(), std::io::Error> {
let latest_rev_nodestore = self.manager.current_revision();
Expand Down Expand Up @@ -555,6 +577,164 @@ mod test {
assert_eq!(&*value, b"proposal_value");
}

#[test]
fn test_propose_parallel_reopen() {
fn insert_commit(db: &TestDb, kv: u8) {
let keys: Vec<[u8; 1]> = vec![[kv; 1]];
let vals: Vec<Box<[u8]>> = vec![Box::new([kv; 1])];
let kviter = keys.iter().zip(vals.iter()).map_into_batch();
let proposal = db.propose_parallel(kviter).unwrap();
proposal.commit().unwrap();
}

// Create, insert, close, open, insert
let db = TestDb::new();
insert_commit(&db, 1);
let db = db.reopen();
insert_commit(&db, 2);
// Check that the keys are still there after the commits
let committed = db.revision(db.root_hash().unwrap().unwrap()).unwrap();
let keys: Vec<[u8; 1]> = vec![[1; 1], [2; 1]];
let vals: Vec<Box<[u8]>> = vec![Box::new([1; 1]), Box::new([2; 1])];
let kviter = keys.iter().zip(vals.iter());
for (k, v) in kviter {
assert_eq!(&committed.val(k).unwrap().unwrap(), v);
}
drop(db);

// Open-db1, insert, open-db2, insert
let db1 = TestDb::new();
insert_commit(&db1, 1);
let db2 = TestDb::new();
insert_commit(&db2, 2);
let committed1 = db1.revision(db1.root_hash().unwrap().unwrap()).unwrap();
let committed2 = db2.revision(db2.root_hash().unwrap().unwrap()).unwrap();
let keys: Vec<[u8; 1]> = vec![[1; 1], [2; 1]];
let vals: Vec<Box<[u8]>> = vec![Box::new([1; 1]), Box::new([2; 1])];
let mut kviter = keys.iter().zip(vals.iter());
let (k, v) = kviter.next().unwrap();
assert_eq!(&committed1.val(k).unwrap().unwrap(), v);
let (k, v) = kviter.next().unwrap();
assert_eq!(&committed2.val(k).unwrap().unwrap(), v);
}

#[test]
fn test_propose_parallel() {
const N: usize = 100;
let db = TestDb::new();

// Test an empty proposal
let keys: Vec<[u8; 0]> = Vec::new();
let vals: Vec<Box<[u8]>> = Vec::new();

let kviter = keys.iter().zip(vals.iter()).map_into_batch();
let proposal = db.propose_parallel(kviter).unwrap();
proposal.commit().unwrap();

// Create a proposal consisting of a single entry and an empty key.
let keys: Vec<[u8; 0]> = vec![[0; 0]];

// Note that if the value is [], then it is interpreted as a DeleteRange.
// Instead, set value to [0]
let vals: Vec<Box<[u8]>> = vec![Box::new([0; 1])];

let kviter = keys.iter().zip(vals.iter()).map_into_batch();
let proposal = db.propose_parallel(kviter).unwrap();

let kviter = keys.iter().zip(vals.iter());
for (k, v) in kviter {
assert_eq!(&proposal.val(k).unwrap().unwrap(), v);
}
proposal.commit().unwrap();

// Check that the key is still there after the commit
let committed = db.revision(db.root_hash().unwrap().unwrap()).unwrap();
let kviter = keys.iter().zip(vals.iter());
for (k, v) in kviter {
assert_eq!(&committed.val(k).unwrap().unwrap(), v);
}

// Create a proposal that deletes the previous entry
let vals: Vec<Box<[u8]>> = vec![Box::new([0; 0])];
let kviter = keys.iter().zip(vals.iter()).map_into_batch();
let proposal = db.propose_parallel(kviter).unwrap();

let kviter = keys.iter().zip(vals.iter());
for (k, _v) in kviter {
assert_eq!(proposal.val(k).unwrap(), None);
}
proposal.commit().unwrap();

// Create a proposal that inserts 0 to 999
let (keys, vals): (Vec<_>, Vec<_>) = (0..1000)
.map(|i| {
(
format!("key{i}").into_bytes(),
Box::from(format!("value{i}").as_bytes()),
)
})
.unzip();

let kviter = keys.iter().zip(vals.iter()).map_into_batch();
let proposal = db.propose_parallel(kviter).unwrap();
let kviter = keys.iter().zip(vals.iter());
for (k, v) in kviter {
assert_eq!(&proposal.val(k).unwrap().unwrap(), v);
}
proposal.commit().unwrap();

// Create a proposal that deletes all of the even entries
let (keys, vals): (Vec<_>, Vec<_>) = (0..1000)
.filter_map(|i| {
if i % 2 != 0 {
Some::<(Vec<u8>, Box<[u8]>)>((format!("key{i}").into_bytes(), Box::new([])))
} else {
None
}
})
.unzip();

let kviter = keys.iter().zip(vals.iter()).map_into_batch();
let proposal = db.propose_parallel(kviter).unwrap();
let kviter = keys.iter().zip(vals.iter());
for (k, _v) in kviter {
assert_eq!(proposal.val(k).unwrap(), None);
}
proposal.commit().unwrap();

// Create a proposal that deletes using empty prefix
let keys: Vec<[u8; 0]> = vec![[0; 0]];
let vals: Vec<Box<[u8]>> = vec![Box::new([0; 0])];
let kviter = keys.iter().zip(vals.iter()).map_into_batch();
let proposal = db.propose_parallel(kviter).unwrap();
proposal.commit().unwrap();

// Create N keys and values like (key0, value0)..(keyN, valueN)
let rng = firewood_storage::SeededRng::from_env_or_random();
let (keys, vals): (Vec<_>, Vec<_>) = (0..N)
.map(|i| {
(
rng.random::<[u8; 32]>(),
Box::from(format!("value{i}").as_bytes()),
)
})
.unzip();

// Looping twice to test that we are reusing the thread pool.
for _ in 0..2 {
let kviter = keys.iter().zip(vals.iter()).map_into_batch();
let proposal = db.propose_parallel(kviter).unwrap();

// iterate over the keys and values again, checking that the values are in the correct proposal
let kviter = keys.iter().zip(vals.iter());

for (k, v) in kviter {
assert_eq!(&proposal.val(k).unwrap().unwrap(), v);
}
proposal.commit().unwrap();
}
}

/// Test that proposing on a proposal works as expected
///
/// Test creates two batches and proposes them, and verifies that the values are in the correct proposal.
Expand Down
25 changes: 23 additions & 2 deletions firewood/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,20 @@
use std::collections::{HashMap, VecDeque};
use std::num::NonZero;
use std::path::PathBuf;
use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Arc, Mutex, OnceLock, RwLock};

use firewood_storage::logger::{trace, warn};
use metrics::gauge;
use rayon::{ThreadPool, ThreadPoolBuilder};
use typed_builder::TypedBuilder;

use crate::merkle::Merkle;
use crate::v2::api::{ArcDynDbView, HashKey, OptionalHashKeyExt};

pub use firewood_storage::CacheReadStrategy;
use firewood_storage::{
Committed, FileBacked, FileIoError, HashedNodeReader, ImmutableProposal, NodeStore, TrieHash,
BranchNode, Committed, FileBacked, FileIoError, HashedNodeReader, ImmutableProposal, NodeStore,
TrieHash,
};

#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, TypedBuilder)]
Expand Down Expand Up @@ -75,6 +77,7 @@ pub(crate) struct RevisionManager {
proposals: Mutex<Vec<ProposedRevision>>,
// committing_proposals: VecDeque<Arc<ProposedImmutable>>,
by_hash: RwLock<HashMap<TrieHash, CommittedRevision>>,
threadpool: OnceLock<ThreadPool>,
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -115,6 +118,7 @@ impl RevisionManager {
by_hash: RwLock::new(Default::default()),
proposals: Mutex::new(Default::default()),
// committing_proposals: Default::default(),
threadpool: OnceLock::new(),
};

if let Some(hash) = nodestore.root_hash().or_default_root_hash() {
Expand Down Expand Up @@ -309,6 +313,23 @@ impl RevisionManager {
.expect("there is always one revision")
.clone()
}

/// Gets or creates a threadpool associated with the revision manager.
///
/// # Panics
///
/// Panics if the it cannot create a thread pool.
pub fn threadpool(&self) -> &ThreadPool {
// Note that OnceLock currently doesn't support get_or_try_init (it is available in a
// nightly release). The get_or_init should be replaced with get_or_try_init once it
// is available to allow the error to be passed back to the caller.
self.threadpool.get_or_init(|| {
ThreadPoolBuilder::new()
.num_threads(BranchNode::MAX_CHILDREN)
.build()
.expect("Error in creating threadpool")
})
}
}

#[cfg(test)]
Expand Down
36 changes: 32 additions & 4 deletions firewood/src/merkle.rs → firewood/src/merkle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
#[cfg(test)]
pub(crate) mod tests;

/// Parallel merkle
pub mod parallel;

use crate::iter::{MerkleKeyValueIter, PathIterator, TryExtend};
use crate::proof::{Proof, ProofCollection, ProofError, ProofNode};
use crate::range_proof::RangeProof;
Expand Down Expand Up @@ -593,10 +596,17 @@ impl<S: ReadableStorage> Merkle<NodeStore<MutableProposal, S>> {
/// Map `key` to `value` in the trie.
/// Each element of key is 2 nibbles.
pub fn insert(&mut self, key: &[u8], value: Value) -> Result<(), FileIoError> {
let key = Path::from_nibbles_iterator(NibblesIterator::new(key));
self.insert_from_iter(NibblesIterator::new(key), value)
}

/// Map `key` to `value` in the trie when `key` is a `NibblesIterator`
pub fn insert_from_iter(
&mut self,
key: NibblesIterator<'_>,
value: Value,
) -> Result<(), FileIoError> {
let key = Path::from_nibbles_iterator(key);
let root = self.nodestore.root_mut();

let Some(root_node) = std::mem::take(root) else {
// The trie is empty. Create a new leaf node with `value` and set
// it as the root.
Expand Down Expand Up @@ -751,8 +761,18 @@ impl<S: ReadableStorage> Merkle<NodeStore<MutableProposal, S>> {
/// Otherwise returns `None`.
/// Each element of `key` is 2 nibbles.
pub fn remove(&mut self, key: &[u8]) -> Result<Option<Value>, FileIoError> {
let key = Path::from_nibbles_iterator(NibblesIterator::new(key));
self.remove_from_iter(NibblesIterator::new(key))
}

/// Removes the value associated with the given `key` where `key` is a `NibblesIterator`
/// Returns the value that was removed, if any.
/// Otherwise returns `None`.
/// Each element of `key` is 2 nibbles.
pub fn remove_from_iter(
&mut self,
key: NibblesIterator<'_>,
) -> Result<Option<Value>, FileIoError> {
let key = Path::from_nibbles_iterator(key);
let root = self.nodestore.root_mut();
let Some(root_node) = std::mem::take(root) else {
// The trie is empty. There is nothing to remove.
Expand Down Expand Up @@ -842,8 +862,16 @@ impl<S: ReadableStorage> Merkle<NodeStore<MutableProposal, S>> {
/// Removes any key-value pairs with keys that have the given `prefix`.
/// Returns the number of key-value pairs removed.
pub fn remove_prefix(&mut self, prefix: &[u8]) -> Result<usize, FileIoError> {
let prefix = Path::from_nibbles_iterator(NibblesIterator::new(prefix));
self.remove_prefix_from_iter(NibblesIterator::new(prefix))
}

/// Removes any key-value pairs with keys that have the given `prefix` where `prefix` is a `NibblesIterator`
/// Returns the number of key-value pairs removed.
pub fn remove_prefix_from_iter(
&mut self,
prefix: NibblesIterator<'_>,
) -> Result<usize, FileIoError> {
let prefix = Path::from_nibbles_iterator(prefix);
let root = self.nodestore.root_mut();
let Some(root_node) = std::mem::take(root) else {
// The trie is empty. There is nothing to remove.
Expand Down
Loading