diff --git a/bin/ream/src/main.rs b/bin/ream/src/main.rs index 22e72f79c..1b265f147 100644 --- a/bin/ream/src/main.rs +++ b/bin/ream/src/main.rs @@ -66,6 +66,7 @@ use ream_post_quantum_crypto::leansig::{ }; use ream_rpc_common::config::RpcServerConfig; use ream_storage::{ + cache::{BeaconCacheDB, LeanCacheDB}, db::{ReamDB, reset_db}, dir::setup_data_dir, tables::table::REDBTable, @@ -190,9 +191,11 @@ pub async fn run_lean_node(config: LeanNodeConfig, executor: ReamExecutor, ream_ set_lean_network_spec(Arc::new(network)); // Initialize the lean database + let cache = Arc::new(LeanCacheDB::new()); let lean_db = ream_db .init_lean_db() - .expect("unable to init Ream Lean Database"); + .expect("unable to init Ream Lean Database") + .with_cache(cache); info!("ream lean database has been initialized"); @@ -320,9 +323,11 @@ pub async fn run_beacon_node(config: BeaconNodeConfig, executor: ReamExecutor, r set_beacon_network_spec(config.network.clone()); // Initialize the beacon database + let cache = Arc::new(BeaconCacheDB::new()); let beacon_db = ream_db .init_beacon_db() - .expect("unable to init Ream Beacon Database"); + .expect("unable to init Ream Beacon Database") + .with_cache(cache.clone()); info!("ream beacon database has been initialized"); @@ -396,7 +401,6 @@ pub async fn run_beacon_node(config: BeaconNodeConfig, executor: ReamExecutor, r execution_engine.clone(), Some(event_sender.clone()), )); - let cached_db = Arc::new(ream_storage::cache::CachedDB::new()); // Create network manager let network_manager = NetworkManagerService::new( @@ -405,7 +409,7 @@ pub async fn run_beacon_node(config: BeaconNodeConfig, executor: ReamExecutor, r beacon_db.clone(), beacon_db.data_dir.clone(), beacon_chain.clone(), - cached_db.clone(), + cache.clone(), ) .await .expect("Failed to create manager service"); @@ -428,7 +432,7 @@ pub async fn run_beacon_node(config: BeaconNodeConfig, executor: ReamExecutor, r event_sender, beacon_chain, p2p_sender, - cached_db, + cache, ) .await }); diff --git a/crates/networking/manager/src/gossipsub/handle.rs b/crates/networking/manager/src/gossipsub/handle.rs index 7ef851717..0f5ca8b5c 100644 --- a/crates/networking/manager/src/gossipsub/handle.rs +++ b/crates/networking/manager/src/gossipsub/handle.rs @@ -15,7 +15,7 @@ use ream_p2p::{ }, network::beacon::channel::GossipMessage, }; -use ream_storage::{cache::CachedDB, tables::table::CustomTable}; +use ream_storage::{cache::BeaconCacheDB, tables::table::CustomTable}; use ream_validator_beacon::{ blob_sidecars::compute_subnet_for_blob_sidecar, constants::SYNC_COMMITTEE_SUBNET_COUNT, }; @@ -122,7 +122,7 @@ pub fn init_gossipsub_config_with_topics() -> GossipsubConfig { pub async fn handle_gossipsub_message( message: Message, beacon_chain: &BeaconChain, - cached_db: &CachedDB, + cached_db: &BeaconCacheDB, p2p_sender: &P2PSender, ) { match GossipsubMessage::decode(&message.topic, &message.data) { diff --git a/crates/networking/manager/src/gossipsub/validate/aggregate_and_proof.rs b/crates/networking/manager/src/gossipsub/validate/aggregate_and_proof.rs index 679b7339c..b2368c39a 100644 --- a/crates/networking/manager/src/gossipsub/validate/aggregate_and_proof.rs +++ b/crates/networking/manager/src/gossipsub/validate/aggregate_and_proof.rs @@ -10,7 +10,7 @@ use ream_consensus_misc::{ }; use ream_network_spec::networks::beacon_network_spec; use ream_storage::{ - cache::{AggregateAndProofKey, CachedDB}, + cache::{AggregateAndProofKey, BeaconCacheDB}, tables::table::REDBTable, }; use ream_validator_beacon::{ @@ -23,7 +23,7 @@ use super::result::ValidationResult; pub async fn validate_aggregate_and_proof( signed_aggregate_and_proof: &SignedAggregateAndProof, beacon_chain: &BeaconChain, - cached_db: &CachedDB, + cached_db: &BeaconCacheDB, ) -> anyhow::Result { let store = beacon_chain.store.lock().await; diff --git a/crates/networking/manager/src/gossipsub/validate/attester_slashing.rs b/crates/networking/manager/src/gossipsub/validate/attester_slashing.rs index d01276fcd..25c115adf 100644 --- a/crates/networking/manager/src/gossipsub/validate/attester_slashing.rs +++ b/crates/networking/manager/src/gossipsub/validate/attester_slashing.rs @@ -5,14 +5,14 @@ use ream_chain_beacon::beacon_chain::BeaconChain; use ream_consensus_beacon::{ attester_slashing::AttesterSlashing, electra::beacon_state::BeaconState, }; -use ream_storage::{cache::CachedDB, tables::table::REDBTable}; +use ream_storage::{cache::BeaconCacheDB, tables::table::REDBTable}; use super::result::ValidationResult; pub async fn validate_attester_slashing( attester_slashing: &AttesterSlashing, beacon_chain: &BeaconChain, - cached_db: &CachedDB, + cached_db: &BeaconCacheDB, ) -> anyhow::Result { let store = beacon_chain.store.lock().await; let head_root = store.get_head()?; diff --git a/crates/networking/manager/src/gossipsub/validate/beacon_attestation.rs b/crates/networking/manager/src/gossipsub/validate/beacon_attestation.rs index bbd0efb93..e03a093dc 100644 --- a/crates/networking/manager/src/gossipsub/validate/beacon_attestation.rs +++ b/crates/networking/manager/src/gossipsub/validate/beacon_attestation.rs @@ -9,7 +9,7 @@ use ream_consensus_misc::{ misc::{compute_epoch_at_slot, compute_signing_root}, }; use ream_storage::{ - cache::{AtestationKey, CachedDB}, + cache::{AtestationKey, BeaconCacheDB}, tables::{field::REDBField, table::REDBTable}, }; use ream_validator_beacon::attestation::compute_subnet_for_attestation; @@ -20,7 +20,7 @@ pub async fn validate_beacon_attestation( attestation: &SingleAttestation, beacon_chain: &BeaconChain, attestation_subnet_id: u64, - cached_db: &CachedDB, + cached_db: &BeaconCacheDB, ) -> anyhow::Result { let store = beacon_chain.store.lock().await; diff --git a/crates/networking/manager/src/gossipsub/validate/beacon_block.rs b/crates/networking/manager/src/gossipsub/validate/beacon_block.rs index 03fc2fd40..5d3e1c901 100644 --- a/crates/networking/manager/src/gossipsub/validate/beacon_block.rs +++ b/crates/networking/manager/src/gossipsub/validate/beacon_block.rs @@ -7,7 +7,7 @@ use ream_consensus_misc::{ use ream_execution_engine::new_payload_request::NewPayloadRequest; use ream_execution_rpc_types::payload_status::PayloadStatus; use ream_storage::{ - cache::{AddressSlotIdentifier, CachedDB}, + cache::{AddressSlotIdentifier, BeaconCacheDB}, tables::{field::REDBField, table::REDBTable}, }; @@ -15,7 +15,7 @@ use super::result::ValidationResult; pub async fn validate_gossip_beacon_block( beacon_chain: &BeaconChain, - cached_db: &CachedDB, + cached_db: &BeaconCacheDB, block: &SignedBeaconBlock, ) -> anyhow::Result { let latest_state = beacon_chain.store.lock().await.db.get_latest_state()?; @@ -89,7 +89,7 @@ pub async fn validate_gossip_beacon_block( pub async fn validate_beacon_block( beacon_chain: &BeaconChain, - cached_db: &CachedDB, + cached_db: &BeaconCacheDB, block: &SignedBeaconBlock, state: &BeaconState, is_parent: bool, diff --git a/crates/networking/manager/src/gossipsub/validate/blob_sidecar.rs b/crates/networking/manager/src/gossipsub/validate/blob_sidecar.rs index 251d0228f..b870e6774 100644 --- a/crates/networking/manager/src/gossipsub/validate/blob_sidecar.rs +++ b/crates/networking/manager/src/gossipsub/validate/blob_sidecar.rs @@ -6,7 +6,7 @@ use ream_consensus_misc::{ }; use ream_polynomial_commitments::handlers::verify_blob_kzg_proof_batch; use ream_storage::{ - cache::CachedDB, + cache::BeaconCacheDB, tables::{field::REDBField, table::REDBTable}, }; use ream_validator_beacon::blob_sidecars::compute_subnet_for_blob_sidecar; @@ -17,7 +17,7 @@ pub async fn validate_blob_sidecar( beacon_chain: &BeaconChain, blob_sidecar: &BlobSidecar, subnet_id: u64, - cached_db: &CachedDB, + cached_db: &BeaconCacheDB, ) -> anyhow::Result { // [REJECT] The sidecar's index is consistent with MAX_BLOBS_PER_BLOCK if blob_sidecar.index >= MAX_BLOBS_PER_BLOCK_ELECTRA { diff --git a/crates/networking/manager/src/gossipsub/validate/bls_to_execution_change.rs b/crates/networking/manager/src/gossipsub/validate/bls_to_execution_change.rs index 77d3bac46..addee1979 100644 --- a/crates/networking/manager/src/gossipsub/validate/bls_to_execution_change.rs +++ b/crates/networking/manager/src/gossipsub/validate/bls_to_execution_change.rs @@ -5,7 +5,7 @@ use ream_consensus_beacon::{ }; use ream_network_spec::networks::beacon_network_spec; use ream_storage::{ - cache::{AddressValidaterIndexIdentifier, CachedDB}, + cache::{AddressValidaterIndexIdentifier, BeaconCacheDB}, tables::table::REDBTable, }; @@ -14,7 +14,7 @@ use super::result::ValidationResult; pub async fn validate_bls_to_execution_change( signed: &SignedBLSToExecutionChange, beacon_chain: &BeaconChain, - cached_db: &CachedDB, + cached_db: &BeaconCacheDB, ) -> anyhow::Result { let store = beacon_chain.store.lock().await; diff --git a/crates/networking/manager/src/gossipsub/validate/data_column_sidecar.rs b/crates/networking/manager/src/gossipsub/validate/data_column_sidecar.rs index d97b5ac53..893fa8ba9 100644 --- a/crates/networking/manager/src/gossipsub/validate/data_column_sidecar.rs +++ b/crates/networking/manager/src/gossipsub/validate/data_column_sidecar.rs @@ -4,7 +4,7 @@ use ream_consensus_beacon::data_column_sidecar::DataColumnSidecar; use ream_consensus_misc::misc::compute_start_slot_at_epoch; use ream_polynomial_commitments::handlers::verify_cell_kzg_proof_batch; use ream_storage::{ - cache::CachedDB, + cache::BeaconCacheDB, tables::{field::REDBField, table::REDBTable}, }; @@ -14,7 +14,7 @@ pub async fn validate_data_column_sidecar_full( data_column_sidecar: &DataColumnSidecar, beacon_chain: &BeaconChain, subnet_id: u64, - cached_db: &CachedDB, + cached_db: &BeaconCacheDB, ) -> anyhow::Result { if !data_column_sidecar.verify() { return Ok(ValidationResult::Reject( diff --git a/crates/networking/manager/src/gossipsub/validate/light_client_finality_update.rs b/crates/networking/manager/src/gossipsub/validate/light_client_finality_update.rs index 0eedaf590..d2053e69a 100644 --- a/crates/networking/manager/src/gossipsub/validate/light_client_finality_update.rs +++ b/crates/networking/manager/src/gossipsub/validate/light_client_finality_update.rs @@ -4,13 +4,13 @@ use anyhow::Ok; use ream_consensus_misc::constants::beacon::SYNC_COMMITTEE_SIZE; use ream_light_client::finality_update::LightClientFinalityUpdate; use ream_network_spec::networks::{beacon_network_spec, lean_network_spec}; -use ream_storage::cache::CachedDB; +use ream_storage::cache::BeaconCacheDB; use crate::gossipsub::validate::result::ValidationResult; pub async fn validate_light_client_finality_update( update: &LightClientFinalityUpdate, - cached_db: &CachedDB, + cached_db: &BeaconCacheDB, ) -> anyhow::Result { // [IGNORE] The finalized header is greater than that of all previously forwarded finality // updates or it matches the highest previously forwarded slot and also has a supermajority diff --git a/crates/networking/manager/src/gossipsub/validate/light_client_optimistic_update.rs b/crates/networking/manager/src/gossipsub/validate/light_client_optimistic_update.rs index d930aead8..0d1441080 100644 --- a/crates/networking/manager/src/gossipsub/validate/light_client_optimistic_update.rs +++ b/crates/networking/manager/src/gossipsub/validate/light_client_optimistic_update.rs @@ -4,14 +4,14 @@ use anyhow::anyhow; use ream_chain_beacon::beacon_chain::BeaconChain; use ream_light_client::optimistic_update::LightClientOptimisticUpdate; use ream_network_spec::networks::{beacon_network_spec, lean_network_spec}; -use ream_storage::{cache::CachedDB, tables::table::REDBTable}; +use ream_storage::{cache::BeaconCacheDB, tables::table::REDBTable}; use crate::gossipsub::validate::result::ValidationResult; pub async fn validate_light_client_optimistic_update( light_client_optimistic_update: &LightClientOptimisticUpdate, beacon_chain: &BeaconChain, - cached_db: &CachedDB, + cached_db: &BeaconCacheDB, ) -> anyhow::Result { let store = beacon_chain.store.lock().await; let head_root = store.get_head()?; diff --git a/crates/networking/manager/src/gossipsub/validate/proposer_slashing.rs b/crates/networking/manager/src/gossipsub/validate/proposer_slashing.rs index 0178435ea..258c44f6c 100644 --- a/crates/networking/manager/src/gossipsub/validate/proposer_slashing.rs +++ b/crates/networking/manager/src/gossipsub/validate/proposer_slashing.rs @@ -3,14 +3,14 @@ use ream_chain_beacon::beacon_chain::BeaconChain; use ream_consensus_beacon::{ electra::beacon_state::BeaconState, proposer_slashing::ProposerSlashing, }; -use ream_storage::{cache::CachedDB, tables::table::REDBTable}; +use ream_storage::{cache::BeaconCacheDB, tables::table::REDBTable}; use super::result::ValidationResult; pub async fn validate_proposer_slashing( proposer_slashing: &ProposerSlashing, beacon_chain: &BeaconChain, - cached_db: &CachedDB, + cached_db: &BeaconCacheDB, ) -> anyhow::Result { let proposer_index = proposer_slashing.signed_header_1.message.proposer_index; diff --git a/crates/networking/manager/src/gossipsub/validate/sync_committee.rs b/crates/networking/manager/src/gossipsub/validate/sync_committee.rs index f217acc27..0d7332ff2 100644 --- a/crates/networking/manager/src/gossipsub/validate/sync_committee.rs +++ b/crates/networking/manager/src/gossipsub/validate/sync_committee.rs @@ -7,7 +7,7 @@ use ream_consensus_misc::{ misc::{compute_epoch_at_slot, compute_signing_root}, }; use ream_storage::{ - cache::{CachedDB, SyncCommitteeKey}, + cache::{BeaconCacheDB, SyncCommitteeKey}, tables::table::REDBTable, }; use ream_validator_beacon::sync_committee::{ @@ -20,7 +20,7 @@ pub async fn validate_sync_committee( message: &SyncCommitteeMessage, beacon_chain: &BeaconChain, subnet_id: u64, - cached_db: &CachedDB, + cached_db: &BeaconCacheDB, ) -> anyhow::Result { let store = beacon_chain.store.lock().await; diff --git a/crates/networking/manager/src/gossipsub/validate/sync_committee_contribution_and_proof.rs b/crates/networking/manager/src/gossipsub/validate/sync_committee_contribution_and_proof.rs index 740b5df8d..779197ab7 100644 --- a/crates/networking/manager/src/gossipsub/validate/sync_committee_contribution_and_proof.rs +++ b/crates/networking/manager/src/gossipsub/validate/sync_committee_contribution_and_proof.rs @@ -8,7 +8,7 @@ use ream_consensus_misc::{ }; use ream_events_beacon::contribution_and_proof::SignedContributionAndProof; use ream_storage::{ - cache::{CacheSyncCommitteeContribution, CachedDB, SyncCommitteeKey}, + cache::{BeaconCacheDB, CacheSyncCommitteeContribution, SyncCommitteeKey}, tables::table::REDBTable, }; use ream_validator_beacon::{ @@ -23,7 +23,7 @@ use super::result::ValidationResult; pub async fn validate_sync_committee_contribution_and_proof( beacon_chain: &BeaconChain, - cached_db: &CachedDB, + cached_db: &BeaconCacheDB, signed_contribution_and_proof: &SignedContributionAndProof, ) -> anyhow::Result { let contribution_and_proof = &signed_contribution_and_proof.message; diff --git a/crates/networking/manager/src/gossipsub/validate/voluntary_exit.rs b/crates/networking/manager/src/gossipsub/validate/voluntary_exit.rs index 6f03c16c1..b4929814b 100644 --- a/crates/networking/manager/src/gossipsub/validate/voluntary_exit.rs +++ b/crates/networking/manager/src/gossipsub/validate/voluntary_exit.rs @@ -3,14 +3,14 @@ use ream_chain_beacon::beacon_chain::BeaconChain; use ream_consensus_beacon::{ electra::beacon_state::BeaconState, voluntary_exit::SignedVoluntaryExit, }; -use ream_storage::{cache::CachedDB, tables::table::REDBTable}; +use ream_storage::{cache::BeaconCacheDB, tables::table::REDBTable}; use super::result::ValidationResult; pub async fn validate_voluntary_exit( voluntary_exit: &SignedVoluntaryExit, beacon_chain: &BeaconChain, - cached_db: &CachedDB, + cached_db: &BeaconCacheDB, ) -> anyhow::Result { let store = beacon_chain.store.lock().await; diff --git a/crates/networking/manager/src/service.rs b/crates/networking/manager/src/service.rs index 0a69e3b9e..c3045fbc6 100644 --- a/crates/networking/manager/src/service.rs +++ b/crates/networking/manager/src/service.rs @@ -15,7 +15,7 @@ use ream_p2p::{ config::NetworkConfig, network::beacon::{Network, ReamNetworkEvent, network_state::NetworkState}, }; -use ream_storage::{cache::CachedDB, db::beacon::BeaconDB}; +use ream_storage::{cache::BeaconCacheDB, db::beacon::BeaconDB}; use ream_syncer::block_range::BlockRangeSyncer; use tokio::{sync::mpsc, time::interval}; use tracing::{error, info}; @@ -34,7 +34,7 @@ pub struct NetworkManagerService { pub network_state: Arc, pub block_range_syncer: BlockRangeSyncer, pub ream_db: BeaconDB, - pub cached_db: Arc, + pub cached_db: Arc, } /// The `NetworkManagerService` acts as the manager for all networking activities in Ream. @@ -57,7 +57,7 @@ impl NetworkManagerService { ream_db: BeaconDB, ream_dir: PathBuf, beacon_chain: Arc, - cached_db: Arc, + cached_db: Arc, ) -> anyhow::Result { let discv5_config = discv5::ConfigBuilder::new(discv5::ListenConfig::from_ip( config.socket_address, diff --git a/crates/rpc/beacon/src/handlers/block.rs b/crates/rpc/beacon/src/handlers/block.rs index 1752b559c..54c0f48c6 100644 --- a/crates/rpc/beacon/src/handlers/block.rs +++ b/crates/rpc/beacon/src/handlers/block.rs @@ -35,7 +35,7 @@ use ream_p2p::{ network::beacon::channel::GossipMessage, }; use ream_storage::{ - cache::{AddressSlotIdentifier, CachedDB}, + cache::{AddressSlotIdentifier, BeaconCacheDB}, db::beacon::BeaconDB, tables::{field::REDBField, table::REDBTable}, }; @@ -345,7 +345,7 @@ async fn validate_block_for_broadcast( beacon_chain: &BeaconChain, block: &SignedBeaconBlock, validation_level: &BroadcastValidation, - cached_db: &CachedDB, + cached_db: &BeaconCacheDB, ) -> Result<(), String> { match validation_level { BroadcastValidation::Gossip => { @@ -367,7 +367,7 @@ async fn validate_block_for_broadcast( async fn validate_gossip_level( beacon_chain: &BeaconChain, block: &SignedBeaconBlock, - cached_db: &CachedDB, + cached_db: &BeaconCacheDB, ) -> Result<(), String> { let store = beacon_chain.store.lock().await; @@ -395,7 +395,7 @@ async fn validate_gossip_level( return Err("Invalid block signature".to_string()); } - // Check for duplicate (using CachedDB) + // Check for duplicate (using BeaconCacheDB) let validator = parent_state .validators .get(block.message.proposer_index as usize) @@ -419,7 +419,7 @@ async fn validate_gossip_level( async fn validate_consensus_level( beacon_chain: &BeaconChain, block: &SignedBeaconBlock, - cached_db: &CachedDB, + cached_db: &BeaconCacheDB, ) -> Result<(), String> { // First do gossip checks validate_gossip_level(beacon_chain, block, cached_db).await?; @@ -567,7 +567,7 @@ pub async fn post_beacon_block( query: Query, beacon_chain: Data>, p2p_sender: Data>, - cached_db: Data>, + cached_db: Data>, ) -> Result { validate_consensus_version_header(&http_request)?; @@ -598,7 +598,7 @@ pub async fn post_blinded_beacon_block( query: Query, beacon_chain: Data>, p2p_sender: Data>, - cached_db: Data>, + cached_db: Data>, builder_client: Option>>, ) -> Result { validate_consensus_version_header(&http_request)?; diff --git a/crates/rpc/beacon/src/server.rs b/crates/rpc/beacon/src/server.rs index dbe48ee86..90aac256f 100644 --- a/crates/rpc/beacon/src/server.rs +++ b/crates/rpc/beacon/src/server.rs @@ -7,7 +7,7 @@ use ream_network_manager::p2p_sender::P2PSender; use ream_operation_pool::OperationPool; use ream_p2p::network::beacon::network_state::NetworkState; use ream_rpc_common::{config::RpcServerConfig, server::RpcServerBuilder}; -use ream_storage::{cache::CachedDB, db::beacon::BeaconDB}; +use ream_storage::{cache::BeaconCacheDB, db::beacon::BeaconDB}; use ream_validator_beacon::builder::builder_client::BuilderClient; use tokio::sync::broadcast; @@ -25,7 +25,7 @@ pub async fn start( event_sender: broadcast::Sender, beacon_chain: Arc, p2p_sender: Arc, - cached_db: Arc, + cached_db: Arc, ) -> Result<()> { RpcServerBuilder::new(server_config.http_socket_address) .allow_origin(server_config.http_allow_origin) diff --git a/crates/storage/src/cache.rs b/crates/storage/src/cache.rs index e6d60779c..1f793f9f9 100644 --- a/crates/storage/src/cache.rs +++ b/crates/storage/src/cache.rs @@ -1,13 +1,18 @@ -use std::num::NonZeroUsize; +use std::{num::NonZeroUsize, sync::Mutex}; -use alloy_primitives::FixedBytes; +use alloy_primitives::{B256, FixedBytes}; use lru::LruCache; use ream_bls::{BLSSignature, PublicKey}; -use ream_consensus_beacon::bls_to_execution_change::BLSToExecutionChange; +use ream_consensus_beacon::{ + bls_to_execution_change::BLSToExecutionChange, + electra::{beacon_block::SignedBeaconBlock, beacon_state::BeaconState}, +}; +use ream_consensus_lean::{block::SignedBlockWithAttestation, state::LeanState}; use ream_consensus_misc::constants::beacon::SYNC_COMMITTEE_SIZE; use ream_light_client::finality_update::LightClientFinalityUpdate; use tokio::sync::RwLock; const LRU_CACHE_SIZE: usize = 64; +const BLOCK_STATE_CACHE_SIZE: usize = 128; #[derive(Debug, Hash, PartialEq, Eq, Default, Clone)] pub struct AddressSlotIdentifier { @@ -48,9 +53,10 @@ pub struct AggregateAndProofKey { pub target_epoch: u64, } -/// In-memory LRU cache. +/// In-memory LRU cache for beacon node (gossip validation + beacon storage). #[derive(Debug)] -pub struct CachedDB { +pub struct BeaconCacheDB { + // Gossip validation caches pub seen_proposer_signature: RwLock>, pub seen_bls_to_execution_signature: RwLock>, @@ -67,9 +73,12 @@ pub struct CachedDB { pub forwarded_optimistic_update_slot: RwLock>, pub forwarded_light_client_finality_update: RwLock>, pub seen_aggregate_and_proof: RwLock>, + // Beacon storage caches + pub blocks: Mutex>, + pub states: Mutex>, } -impl CachedDB { +impl BeaconCacheDB { pub fn new() -> Self { Self { seen_proposer_signature: LruCache::new( @@ -123,11 +132,44 @@ impl CachedDB { NonZeroUsize::new(LRU_CACHE_SIZE).expect("Invalid cache size"), ) .into(), + blocks: Mutex::new(LruCache::new( + NonZeroUsize::new(BLOCK_STATE_CACHE_SIZE).expect("Invalid cache size"), + )), + states: Mutex::new(LruCache::new( + NonZeroUsize::new(BLOCK_STATE_CACHE_SIZE).expect("Invalid cache size"), + )), } } } -impl Default for CachedDB { +impl Default for BeaconCacheDB { + fn default() -> Self { + Self::new() + } +} + +/// In-memory LRU cache for lean node (lean storage only). +#[derive(Debug)] +pub struct LeanCacheDB { + // Lean storage caches + pub blocks: Mutex>, + pub states: Mutex>, +} + +impl LeanCacheDB { + pub fn new() -> Self { + Self { + blocks: Mutex::new(LruCache::new( + NonZeroUsize::new(BLOCK_STATE_CACHE_SIZE).expect("Invalid cache size"), + )), + states: Mutex::new(LruCache::new( + NonZeroUsize::new(BLOCK_STATE_CACHE_SIZE).expect("Invalid cache size"), + )), + } + } +} + +impl Default for LeanCacheDB { fn default() -> Self { Self::new() } diff --git a/crates/storage/src/db/beacon.rs b/crates/storage/src/db/beacon.rs index ce5c38eae..c7b9761b6 100644 --- a/crates/storage/src/db/beacon.rs +++ b/crates/storage/src/db/beacon.rs @@ -5,39 +5,52 @@ use ream_consensus_beacon::electra::beacon_state::BeaconState; use ream_consensus_misc::constants::beacon::SLOTS_PER_EPOCH; use redb::{Database, ReadableDatabase}; -use crate::tables::{ - beacon::{ - beacon_block::BeaconBlockTable, beacon_state::BeaconStateTable, - blobs_and_proofs::BlobsAndProofsTable, block_timeliness::BlockTimelinessTable, - checkpoint_states::CheckpointStatesTable, column_sidecars::ColumnSidecarsTable, - equivocating_indices::EquivocatingIndicesField, - finalized_checkpoint::FinalizedCheckpointField, genesis_time::GenesisTimeField, - justified_checkpoint::JustifiedCheckpointField, latest_messages::LatestMessagesTable, - parent_root_index::ParentRootIndexMultimapTable, - proposer_boost_root::ProposerBoostRootField, slot_index::BeaconSlotIndexTable, - state_root_index::BeaconStateRootIndexTable, time::TimeField, - unrealized_finalized_checkpoint::UnrealizedFinalizedCheckpointField, - unrealized_justifications::UnrealizedJustificationsTable, - unrealized_justified_checkpoint::UnrealizedJustifiedCheckpointField, +use crate::{ + cache::BeaconCacheDB, + tables::{ + beacon::{ + beacon_block::BeaconBlockTable, beacon_state::BeaconStateTable, + blobs_and_proofs::BlobsAndProofsTable, block_timeliness::BlockTimelinessTable, + checkpoint_states::CheckpointStatesTable, column_sidecars::ColumnSidecarsTable, + equivocating_indices::EquivocatingIndicesField, + finalized_checkpoint::FinalizedCheckpointField, genesis_time::GenesisTimeField, + justified_checkpoint::JustifiedCheckpointField, latest_messages::LatestMessagesTable, + parent_root_index::ParentRootIndexMultimapTable, + proposer_boost_root::ProposerBoostRootField, slot_index::BeaconSlotIndexTable, + state_root_index::BeaconStateRootIndexTable, time::TimeField, + unrealized_finalized_checkpoint::UnrealizedFinalizedCheckpointField, + unrealized_justifications::UnrealizedJustificationsTable, + unrealized_justified_checkpoint::UnrealizedJustifiedCheckpointField, + }, + table::REDBTable, }, - table::REDBTable, }; #[derive(Clone, Debug)] pub struct BeaconDB { pub db: Arc, pub data_dir: PathBuf, + pub(crate) cache: Option>, } impl BeaconDB { + /// Attach a cache to this BeaconDB instance. + /// This enables in-memory caching of blocks and states for improved performance. + pub fn with_cache(mut self, cache: Arc) -> Self { + self.cache = Some(cache); + self + } + pub fn block_provider(&self) -> BeaconBlockTable { BeaconBlockTable { db: self.db.clone(), + cache: self.cache.clone(), } } pub fn state_provider(&self) -> BeaconStateTable { BeaconStateTable { db: self.db.clone(), + cache: self.cache.clone(), } } diff --git a/crates/storage/src/db/lean.rs b/crates/storage/src/db/lean.rs index be22d4ff0..0f56ff769 100644 --- a/crates/storage/src/db/lean.rs +++ b/crates/storage/src/db/lean.rs @@ -2,28 +2,42 @@ use std::sync::Arc; use redb::Database; -use crate::tables::lean::{ - latest_finalized::LatestFinalizedField, latest_justified::LatestJustifiedField, - latest_known_attestation::LatestKnownAttestationTable, lean_block::LeanBlockTable, - lean_head::LeanHeadField, lean_latest_new_attestations::LeanLatestNewAttestationsTable, - lean_safe_target::LeanSafeTargetField, lean_state::LeanStateTable, lean_time::LeanTimeField, - slot_index::LeanSlotIndexTable, state_root_index::LeanStateRootIndexTable, +use crate::{ + cache::LeanCacheDB, + tables::lean::{ + latest_finalized::LatestFinalizedField, latest_justified::LatestJustifiedField, + latest_known_attestation::LatestKnownAttestationTable, lean_block::LeanBlockTable, + lean_head::LeanHeadField, lean_latest_new_attestations::LeanLatestNewAttestationsTable, + lean_safe_target::LeanSafeTargetField, lean_state::LeanStateTable, + lean_time::LeanTimeField, slot_index::LeanSlotIndexTable, + state_root_index::LeanStateRootIndexTable, + }, }; #[derive(Clone, Debug)] pub struct LeanDB { pub db: Arc, + pub(crate) cache: Option>, } impl LeanDB { + /// Attach a cache to this LeanDB instance. + /// This enables in-memory caching of blocks and states for improved performance. + pub fn with_cache(mut self, cache: Arc) -> Self { + self.cache = Some(cache); + self + } + pub fn block_provider(&self) -> LeanBlockTable { LeanBlockTable { db: self.db.clone(), + cache: self.cache.clone(), } } pub fn state_provider(&self) -> LeanStateTable { LeanStateTable { db: self.db.clone(), + cache: self.cache.clone(), } } diff --git a/crates/storage/src/db/mod.rs b/crates/storage/src/db/mod.rs index 545deaee6..52763007e 100644 --- a/crates/storage/src/db/mod.rs +++ b/crates/storage/src/db/mod.rs @@ -91,6 +91,7 @@ impl ReamDB { Ok(BeaconDB { db: self.db.clone(), data_dir: self.data_dir.clone(), + cache: None, }) } @@ -112,6 +113,7 @@ impl ReamDB { Ok(LeanDB { db: self.db.clone(), + cache: None, }) } } diff --git a/crates/storage/src/tables/beacon/beacon_block.rs b/crates/storage/src/tables/beacon/beacon_block.rs index ade58ab51..30ae2be68 100644 --- a/crates/storage/src/tables/beacon/beacon_block.rs +++ b/crates/storage/src/tables/beacon/beacon_block.rs @@ -2,11 +2,12 @@ use std::sync::Arc; use alloy_primitives::B256; use ream_consensus_beacon::electra::beacon_block::SignedBeaconBlock; -use redb::{Database, Durability, TableDefinition}; +use redb::{Database, Durability, ReadableDatabase, TableDefinition}; use tree_hash::TreeHash; use super::parent_root_index::ParentRootIndexMultimapTable; use crate::{ + cache::BeaconCacheDB, errors::StoreError, tables::{ beacon::{slot_index::BeaconSlotIndexTable, state_root_index::BeaconStateRootIndexTable}, @@ -18,6 +19,7 @@ use crate::{ pub struct BeaconBlockTable { pub db: Arc, + pub cache: Option>, } /// Table definition for the Beacon Block table @@ -40,6 +42,32 @@ impl REDBTable for BeaconBlockTable { self.db.clone() } + fn get<'a>( + &self, + key: ::SelfType<'a>, + ) -> Result, StoreError> { + // LruCache::get requires mutable access to update LRU order + if let Some(cache) = &self.cache + && let Ok(mut cache_lock) = cache.blocks.lock() + && let Some(block) = cache_lock.get(&key) + { + return Ok(Some(block.clone())); + } + + let read_txn = self.database().begin_read()?; + let table = read_txn.open_table(Self::TABLE_DEFINITION)?; + let result = table.get(key)?; + let block = result.map(|res| res.value()); + + if let (Some(cache), Some(block)) = (&self.cache, &block) + && let Ok(mut cache_lock) = cache.blocks.lock() + { + cache_lock.put(key, block.clone()); + } + + Ok(block) + } + fn insert(&self, key: Self::Key, value: Self::Value) -> Result<(), StoreError> { // insert entry to slot_index table let block_root = value.message.tree_hash_root(); @@ -58,6 +86,13 @@ impl REDBTable for BeaconBlockTable { db: self.db.clone(), }; parent_root_index_table.insert(value.message.parent_root, block_root)?; + + if let Some(cache) = &self.cache + && let Ok(mut cache_lock) = cache.blocks.lock() + { + cache_lock.put(key, value.clone()); + } + let mut write_txn = self.db.begin_write()?; write_txn.set_durability(Durability::Immediate)?; let mut table = write_txn.open_table(Self::TABLE_DEFINITION)?; @@ -68,6 +103,12 @@ impl REDBTable for BeaconBlockTable { } fn remove(&self, key: Self::Key) -> Result, StoreError> { + if let Some(cache) = &self.cache + && let Ok(mut cache_lock) = cache.blocks.lock() + { + cache_lock.pop(&key); + } + let write_txn = self.db.begin_write()?; let mut table = write_txn.open_table(Self::TABLE_DEFINITION)?; let value = table.remove(key)?.map(|v| v.value()); diff --git a/crates/storage/src/tables/beacon/beacon_state.rs b/crates/storage/src/tables/beacon/beacon_state.rs index 929719c85..bc0a933f0 100644 --- a/crates/storage/src/tables/beacon/beacon_state.rs +++ b/crates/storage/src/tables/beacon/beacon_state.rs @@ -2,12 +2,17 @@ use std::sync::Arc; use alloy_primitives::B256; use ream_consensus_beacon::electra::beacon_state::BeaconState; -use redb::{Database, TableDefinition}; +use redb::{Database, ReadableDatabase, TableDefinition}; -use crate::tables::{ssz_encoder::SSZEncoding, table::REDBTable}; +use crate::{ + cache::BeaconCacheDB, + errors::StoreError, + tables::{ssz_encoder::SSZEncoding, table::REDBTable}, +}; pub struct BeaconStateTable { pub db: Arc, + pub cache: Option>, } /// Table definition for the Beacon State table @@ -29,4 +34,70 @@ impl REDBTable for BeaconStateTable { fn database(&self) -> Arc { self.db.clone() } + + fn get<'a>( + &self, + key: ::SelfType<'a>, + ) -> Result, StoreError> { + // LruCache::get requires mutable access to update LRU order + if let Some(cache) = &self.cache + && let Ok(mut cache_lock) = cache.states.lock() + && let Some(state) = cache_lock.get(&key) + { + return Ok(Some(state.clone())); + } + + let read_txn = self.database().begin_read()?; + let table = read_txn.open_table(Self::TABLE_DEFINITION)?; + let result = table.get(key)?; + let state = result.map(|res| res.value()); + + if let (Some(cache), Some(state)) = (&self.cache, &state) + && let Ok(mut cache_lock) = cache.states.lock() + { + cache_lock.put(key, state.clone()); + } + + Ok(state) + } + + fn insert<'a>( + &self, + key: ::SelfType<'a>, + value: ::SelfType<'a>, + ) -> Result<(), StoreError> { + if let Some(cache) = &self.cache + && let Ok(mut cache_lock) = cache.states.lock() + { + cache_lock.put(key, value.clone()); + } + + let mut write_txn = self.database().begin_write()?; + write_txn.set_durability(redb::Durability::Immediate)?; + { + let mut table = write_txn.open_table(Self::TABLE_DEFINITION)?; + table.insert(key, value)?; + } + write_txn.commit()?; + Ok(()) + } + + fn remove<'a>( + &self, + key: ::SelfType<'a>, + ) -> Result, StoreError> { + if let Some(cache) = &self.cache + && let Ok(mut cache_lock) = cache.states.lock() + { + cache_lock.pop(&key); + } + + let write_txn = self.database().begin_write()?; + let value = { + let mut table = write_txn.open_table(Self::TABLE_DEFINITION)?; + table.remove(key)?.map(|value| value.value()) + }; + write_txn.commit()?; + Ok(value) + } } diff --git a/crates/storage/src/tables/lean/lean_block.rs b/crates/storage/src/tables/lean/lean_block.rs index 2b9b45d0e..54cf9c101 100644 --- a/crates/storage/src/tables/lean/lean_block.rs +++ b/crates/storage/src/tables/lean/lean_block.rs @@ -7,12 +7,14 @@ use tree_hash::TreeHash; use super::{slot_index::LeanSlotIndexTable, state_root_index::LeanStateRootIndexTable}; use crate::{ + cache::LeanCacheDB, errors::StoreError, tables::{ssz_encoder::SSZEncoding, table::REDBTable}, }; pub struct LeanBlockTable { pub db: Arc, + pub cache: Option>, } /// Table definition for the Lean Block table @@ -38,6 +40,32 @@ impl REDBTable for LeanBlockTable { self.db.clone() } + fn get<'a>( + &self, + key: ::SelfType<'a>, + ) -> Result, StoreError> { + // LruCache::get requires mutable access to update LRU order + if let Some(cache) = &self.cache + && let Ok(mut cache_lock) = cache.blocks.lock() + && let Some(block) = cache_lock.get(&key) + { + return Ok(Some(block.clone())); + } + + let read_txn = self.database().begin_read()?; + let table = read_txn.open_table(Self::TABLE_DEFINITION)?; + let result = table.get(key)?; + let block = result.map(|res| res.value()); + + if let (Some(cache), Some(block)) = (&self.cache, &block) + && let Ok(mut cache_lock) = cache.blocks.lock() + { + cache_lock.put(key, block.clone()); + } + + Ok(block) + } + fn insert(&self, key: Self::Key, value: Self::Value) -> Result<(), StoreError> { // insert entry to slot_index table let block_root = value.message.block.tree_hash_root(); @@ -52,6 +80,12 @@ impl REDBTable for LeanBlockTable { }; state_root_index_table.insert(value.message.block.state_root, block_root)?; + if let Some(cache) = &self.cache + && let Ok(mut cache_lock) = cache.blocks.lock() + { + cache_lock.put(key, value.clone()); + } + let mut write_txn = self.db.begin_write()?; write_txn.set_durability(Durability::Immediate)?; let mut table = write_txn.open_table(Self::TABLE_DEFINITION)?; @@ -62,6 +96,12 @@ impl REDBTable for LeanBlockTable { } fn remove(&self, key: Self::Key) -> Result, StoreError> { + if let Some(cache) = &self.cache + && let Ok(mut cache_lock) = cache.blocks.lock() + { + cache_lock.pop(&key); + } + let write_txn = self.db.begin_write()?; let mut table = write_txn.open_table(Self::TABLE_DEFINITION)?; let value = table.remove(key)?.map(|v| v.value()); diff --git a/crates/storage/src/tables/lean/lean_state.rs b/crates/storage/src/tables/lean/lean_state.rs index 0f7ba6cc3..a51f85885 100644 --- a/crates/storage/src/tables/lean/lean_state.rs +++ b/crates/storage/src/tables/lean/lean_state.rs @@ -5,12 +5,14 @@ use ream_consensus_lean::state::LeanState; use redb::{Database, ReadableDatabase, TableDefinition}; use crate::{ + cache::LeanCacheDB, errors::StoreError, tables::{ssz_encoder::SSZEncoding, table::REDBTable}, }; pub struct LeanStateTable { pub db: Arc, + pub cache: Option>, } /// Table definition for the Lean State table @@ -32,6 +34,72 @@ impl REDBTable for LeanStateTable { fn database(&self) -> Arc { self.db.clone() } + + fn get<'a>( + &self, + key: ::SelfType<'a>, + ) -> Result, StoreError> { + // LruCache::get requires mutable access to update LRU order + if let Some(cache) = &self.cache + && let Ok(mut cache_lock) = cache.states.lock() + && let Some(state) = cache_lock.get(&key) + { + return Ok(Some(state.clone())); + } + + let read_txn = self.database().begin_read()?; + let table = read_txn.open_table(Self::TABLE_DEFINITION)?; + let result = table.get(key)?; + let state = result.map(|res| res.value()); + + if let (Some(cache), Some(state)) = (&self.cache, &state) + && let Ok(mut cache_lock) = cache.states.lock() + { + cache_lock.put(key, state.clone()); + } + + Ok(state) + } + + fn insert<'a>( + &self, + key: ::SelfType<'a>, + value: ::SelfType<'a>, + ) -> Result<(), StoreError> { + if let Some(cache) = &self.cache + && let Ok(mut cache_lock) = cache.states.lock() + { + cache_lock.put(key, value.clone()); + } + + let mut write_txn = self.database().begin_write()?; + write_txn.set_durability(redb::Durability::Immediate)?; + { + let mut table = write_txn.open_table(Self::TABLE_DEFINITION)?; + table.insert(key, value)?; + } + write_txn.commit()?; + Ok(()) + } + + fn remove<'a>( + &self, + key: ::SelfType<'a>, + ) -> Result, StoreError> { + if let Some(cache) = &self.cache + && let Ok(mut cache_lock) = cache.states.lock() + { + cache_lock.pop(&key); + } + + let write_txn = self.database().begin_write()?; + let value = { + let mut table = write_txn.open_table(Self::TABLE_DEFINITION)?; + table.remove(key)?.map(|value| value.value()) + }; + write_txn.commit()?; + Ok(value) + } } impl LeanStateTable { diff --git a/testing/gossip-validation/tests/validate_block.rs b/testing/gossip-validation/tests/validate_block.rs index 16e7b4f04..b47724d71 100644 --- a/testing/gossip-validation/tests/validate_block.rs +++ b/testing/gossip-validation/tests/validate_block.rs @@ -1,7 +1,7 @@ #[allow(clippy::unwrap_used)] mod tests { const PATH_TO_TEST_DATA_FOLDER: &str = "./tests"; - use std::{path::PathBuf, str::FromStr}; + use std::{path::PathBuf, str::FromStr, sync::Arc}; use alloy_primitives::B256; use anyhow::anyhow; @@ -17,7 +17,7 @@ mod tests { use ream_network_spec::networks::initialize_test_network_spec; use ream_operation_pool::OperationPool; use ream_storage::{ - cache::{AddressSlotIdentifier, CachedDB}, + cache::{AddressSlotIdentifier, BeaconCacheDB}, db::{ReamDB, beacon::BeaconDB}, tables::{field::REDBField, table::REDBTable}, }; @@ -28,11 +28,15 @@ mod tests { const SEPOLIA_GENESIS_TIME: u64 = 1655733600; const CURRENT_TIME: u64 = 1752744600; - pub async fn db_setup() -> (BeaconChain, CachedDB, B256) { + pub async fn db_setup() -> (BeaconChain, Arc, B256) { let temp_dir = TempDir::new("ream_gossip_test").unwrap(); let temp_path = temp_dir.path().to_path_buf(); let ream_db = ReamDB::new(temp_path).expect("unable to init Ream Database"); - let mut db = ream_db.init_beacon_db().unwrap(); + let cached_db = Arc::new(BeaconCacheDB::default()); + let mut db = ream_db + .init_beacon_db() + .unwrap() + .with_cache(cached_db.clone()); let ancestor_beacon_block = read_ssz_snappy_file::( "./assets/sepolia/blocks/slot_8084160.ssz_snappy", @@ -72,7 +76,6 @@ mod tests { .await; let operation_pool = OperationPool::default(); - let cached_db = CachedDB::default(); let beacon_chain = BeaconChain::new(db, operation_pool.into(), None, None); (beacon_chain, cached_db, block_root)