From 91411c891c2e03df47dbf87fa348c693e9d32dc6 Mon Sep 17 00:00:00 2001 From: ShiroObiJohn <28779404+ShiroObiJohn@users.noreply.github.com> Date: Fri, 3 Oct 2025 15:51:33 +0800 Subject: [PATCH 1/2] feat: add sync committee pool to manage sync committee messages and contributions. get /eth/v1/validator/sync_committee_contribution endpoint --- Cargo.lock | 22 + Cargo.toml | 2 + bin/ream/Cargo.toml | 1 + bin/ream/src/main.rs | 5 + crates/common/api_types/beacon/src/query.rs | 7 + crates/common/chain/beacon/Cargo.toml | 1 + .../common/chain/beacon/src/beacon_chain.rs | 4 +- crates/common/fork_choice/beacon/Cargo.toml | 1 + crates/common/fork_choice/beacon/src/store.rs | 24 +- crates/common/sync_committee_pool/Cargo.toml | 24 + crates/common/sync_committee_pool/src/lib.rs | 776 ++++++++++++++++++ .../beacon/src/contribution_and_proof.rs | 4 +- crates/networking/manager/Cargo.toml | 1 + crates/networking/manager/src/service.rs | 4 + crates/rpc/beacon/Cargo.toml | 1 + crates/rpc/beacon/src/handlers/debug.rs | 18 +- .../src/handlers/prepare_beacon_proposer.rs | 2 +- crates/rpc/beacon/src/handlers/syncing.rs | 5 +- crates/rpc/beacon/src/handlers/validator.rs | 81 +- crates/rpc/beacon/src/routes/validator.rs | 6 +- crates/rpc/beacon/src/server.rs | 3 + testing/gossip-validation/Cargo.toml | 1 + .../gossip-validation/tests/validate_block.rs | 10 +- 23 files changed, 975 insertions(+), 28 deletions(-) create mode 100644 crates/common/sync_committee_pool/Cargo.toml create mode 100644 crates/common/sync_committee_pool/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index d44f1f23d..f8dc1cc9b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2754,6 +2754,7 @@ dependencies = [ "ream-network-spec", "ream-operation-pool", "ream-storage", + "ream-sync-committee-pool", "serde", "serde_json", "snap", @@ -5770,6 +5771,7 @@ dependencies = [ "ream-rpc-lean", "ream-storage", "ream-sync", + "ream-sync-committee-pool", "ream-validator-beacon", "ream-validator-lean", "serde", @@ -5872,6 +5874,7 @@ dependencies = [ "ream-operation-pool", "ream-p2p", "ream-storage", + "ream-sync-committee-pool", "tokio", "tracing", ] @@ -6094,6 +6097,7 @@ dependencies = [ "ream-operation-pool", "ream-polynomial-commitments", "ream-storage", + "ream-sync-committee-pool", "rust-kzg-blst", "serde", "serde_json", @@ -6221,6 +6225,7 @@ dependencies = [ "ream-p2p", "ream-polynomial-commitments", "ream-storage", + "ream-sync-committee-pool", "ream-syncer", "ream-validator-beacon", "tokio", @@ -6410,6 +6415,7 @@ dependencies = [ "ream-peer", "ream-rpc-common", "ream-storage", + "ream-sync-committee-pool", "ream-validator-beacon", "serde", "serde_json", @@ -6487,6 +6493,22 @@ dependencies = [ "tokio", ] +[[package]] +name = "ream-sync-committee-pool" +version = "0.1.0" +dependencies = [ + "alloy-primitives", + "parking_lot", + "ream-bls", + "ream-validator-beacon", + "serde", + "ssz_types", + "thiserror 2.0.17", + "tracing", + "tree_hash", + "tree_hash_derive", +] + [[package]] name = "ream-syncer" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 59bdbb81e..7369b3b50 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ members = [ "crates/common/operation_pool", "crates/common/polynomial_commitments", "crates/common/sync", + "crates/common/sync_committee_pool", "crates/common/validator/beacon", "crates/common/validator/lean", "crates/crypto/bls", @@ -173,6 +174,7 @@ ream-rpc-common = { path = "crates/rpc/common" } ream-rpc-lean = { path = "crates/rpc/lean" } ream-storage = { path = "crates/storage" } ream-sync = { path = "crates/common/sync" } +ream-sync-committee-pool = { path = "crates/common/sync_committee_pool" } ream-syncer = { path = "crates/networking/syncer" } ream-validator-beacon = { path = "crates/common/validator/beacon" } ream-validator-lean = { path = "crates/common/validator/lean" } diff --git a/bin/ream/Cargo.toml b/bin/ream/Cargo.toml index 78751a76d..2effbd3c9 100644 --- a/bin/ream/Cargo.toml +++ b/bin/ream/Cargo.toml @@ -64,6 +64,7 @@ ream-rpc-common.workspace = true ream-rpc-lean.workspace = true ream-storage.workspace = true ream-sync.workspace = true +ream-sync-committee-pool.workspace = true ream-validator-beacon.workspace = true ream-validator-lean.workspace = true diff --git a/bin/ream/src/main.rs b/bin/ream/src/main.rs index 22e72f79c..e41a5e1ed 100644 --- a/bin/ream/src/main.rs +++ b/bin/ream/src/main.rs @@ -71,6 +71,7 @@ use ream_storage::{ tables::table::REDBTable, }; use ream_sync::rwlock::Writer; +use ream_sync_committee_pool::SyncCommitteePool; use ream_validator_beacon::{ beacon_api_client::{BeaconApiClient, http_client::ContentType}, builder::builder_client::{BuilderClient, BuilderConfig}, @@ -351,6 +352,7 @@ pub async fn run_beacon_node(config: BeaconNodeConfig, executor: ReamExecutor, r ); let operation_pool = Arc::new(OperationPool::default()); + let sync_committee_pool = Arc::new(SyncCommitteePool::default()); let (event_sender, _) = broadcast::channel::(1024); @@ -393,6 +395,7 @@ pub async fn run_beacon_node(config: BeaconNodeConfig, executor: ReamExecutor, r let beacon_chain = Arc::new(BeaconChain::new( beacon_db.clone(), operation_pool.clone(), + sync_committee_pool.clone(), execution_engine.clone(), Some(event_sender.clone()), )); @@ -405,6 +408,7 @@ pub async fn run_beacon_node(config: BeaconNodeConfig, executor: ReamExecutor, r beacon_db.clone(), beacon_db.data_dir.clone(), beacon_chain.clone(), + sync_committee_pool.clone(), cached_db.clone(), ) .await @@ -423,6 +427,7 @@ pub async fn run_beacon_node(config: BeaconNodeConfig, executor: ReamExecutor, r beacon_db, network_state, operation_pool, + sync_committee_pool, execution_engine, builder_client, event_sender, diff --git a/crates/common/api_types/beacon/src/query.rs b/crates/common/api_types/beacon/src/query.rs index 6851dda6e..b3740c3df 100644 --- a/crates/common/api_types/beacon/src/query.rs +++ b/crates/common/api_types/beacon/src/query.rs @@ -52,6 +52,13 @@ pub struct AttestationQuery { pub attestation_data_root: Option, } +#[derive(Debug, Deserialize)] +pub struct SyncCommitteeContributionQuery { + pub slot: u64, + pub subcommittee_index: u64, + pub beacon_block_root: B256, +} + #[derive(Debug, Deserialize)] pub struct ConnectionStateQuery { pub state: Option>, diff --git a/crates/common/chain/beacon/Cargo.toml b/crates/common/chain/beacon/Cargo.toml index 3979b399b..b82480420 100644 --- a/crates/common/chain/beacon/Cargo.toml +++ b/crates/common/chain/beacon/Cargo.toml @@ -27,6 +27,7 @@ ream-network-spec.workspace = true ream-operation-pool.workspace = true ream-p2p.workspace = true ream-storage.workspace = true +ream-sync-committee-pool.workspace = true [lints] workspace = true diff --git a/crates/common/chain/beacon/src/beacon_chain.rs b/crates/common/chain/beacon/src/beacon_chain.rs index f4061211b..b274cba82 100644 --- a/crates/common/chain/beacon/src/beacon_chain.rs +++ b/crates/common/chain/beacon/src/beacon_chain.rs @@ -19,6 +19,7 @@ use ream_storage::{ db::beacon::BeaconDB, tables::{field::REDBField, table::REDBTable}, }; +use ream_sync_committee_pool::SyncCommitteePool; use tokio::sync::{Mutex, broadcast}; use tracing::warn; @@ -34,11 +35,12 @@ impl BeaconChain { pub fn new( db: BeaconDB, operation_pool: Arc, + sync_committee_pool: Arc, execution_engine: Option, event_sender: Option>, ) -> Self { Self { - store: Mutex::new(Store::new(db, operation_pool)), + store: Mutex::new(Store::new(db, operation_pool, Some(sync_committee_pool))), execution_engine, event_sender, } diff --git a/crates/common/fork_choice/beacon/Cargo.toml b/crates/common/fork_choice/beacon/Cargo.toml index b12b24c70..b7e3e7164 100644 --- a/crates/common/fork_choice/beacon/Cargo.toml +++ b/crates/common/fork_choice/beacon/Cargo.toml @@ -42,6 +42,7 @@ ream-network-spec.workspace = true ream-operation-pool.workspace = true ream-polynomial-commitments.workspace = true ream-storage.workspace = true +ream-sync-committee-pool.workspace = true [lints] workspace = true diff --git a/crates/common/fork_choice/beacon/src/store.rs b/crates/common/fork_choice/beacon/src/store.rs index 602b3572e..794d4a15a 100644 --- a/crates/common/fork_choice/beacon/src/store.rs +++ b/crates/common/fork_choice/beacon/src/store.rs @@ -32,6 +32,7 @@ use ream_storage::{ table::{CustomTable, REDBTable}, }, }; +use ream_sync_committee_pool::SyncCommitteePool; use tree_hash::TreeHash; use crate::constants::{ @@ -50,11 +51,22 @@ pub struct BlockWithEpochInfo { pub struct Store { pub db: BeaconDB, pub operation_pool: Arc, + pub sync_committee_pool: Arc, } impl Store { - pub fn new(db: BeaconDB, operation_pool: Arc) -> Self { - Self { db, operation_pool } + pub fn new( + db: BeaconDB, + operation_pool: Arc, + sync_committee_pool: Option>, + ) -> Self { + let sync_committee_pool = + sync_committee_pool.unwrap_or_else(|| Arc::new(SyncCommitteePool::default())); + Self { + db, + operation_pool, + sync_committee_pool, + } } pub fn is_previous_epoch_justified(&self) -> anyhow::Result { @@ -589,6 +601,12 @@ impl Store { // If this is a new slot, reset store.proposer_boost_root if current_slot > previous_slot { self.db.proposer_boost_root_provider().insert(B256::ZERO)?; + + // Clean old sync committee messages and contributions per slot + self.sync_committee_pool + .clean_sync_committee_messages(current_slot); + self.sync_committee_pool + .clean_sync_committee_contributions(current_slot); } // If a new epoch, pull-up justification and finalization from previous epoch @@ -875,7 +893,7 @@ pub fn get_forkchoice_store( let operation_pool = Arc::new(OperationPool::default()); - Ok(Store { db, operation_pool }) + Ok(Store::new(db, operation_pool, None)) } pub fn compute_slots_since_epoch_start(slot: u64) -> u64 { diff --git a/crates/common/sync_committee_pool/Cargo.toml b/crates/common/sync_committee_pool/Cargo.toml new file mode 100644 index 000000000..e41eb6fb0 --- /dev/null +++ b/crates/common/sync_committee_pool/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "ream-sync-committee-pool" +authors.workspace = true +edition.workspace = true +keywords.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[dependencies] +alloy-primitives.workspace = true +parking_lot.workspace = true +serde.workspace = true +ssz_types.workspace = true +thiserror.workspace = true +tracing.workspace = true +tree_hash.workspace = true +tree_hash_derive.workspace = true + +# ream-dependencies +ream-bls.workspace = true +ream-validator-beacon.workspace = true diff --git a/crates/common/sync_committee_pool/src/lib.rs b/crates/common/sync_committee_pool/src/lib.rs new file mode 100644 index 000000000..e7fa4fb20 --- /dev/null +++ b/crates/common/sync_committee_pool/src/lib.rs @@ -0,0 +1,776 @@ +use std::collections::HashMap; + +use alloy_primitives::B256; +use parking_lot::RwLock; +use ream_bls::{BLSSignature, traits::Aggregatable}; +use ream_validator_beacon::{ + contribution_and_proof::SyncCommitteeContribution, sync_committee::SyncCommitteeMessage, +}; +use ssz_types::{BitVector, typenum::U128}; +use tracing::warn; +use tree_hash_derive::TreeHash; + +#[derive(Debug, Clone, PartialEq, Eq, Hash, TreeHash)] +pub struct SyncCommitteeDataKey { + pub slot: u64, + pub beacon_block_root: B256, + pub subcommittee_index: u64, +} + +// Default maximum number of sync committee contributions to store per key +// This prevents memory issues while allowing proper aggregation +// Mainnet: 128, Testnet: 8 +const DEFAULT_MAX_SYNC_CONTRIBUTIONS_PER_KEY: usize = 128; + +#[derive(Debug)] +pub struct SyncCommitteePool { + messages: RwLock>>, + contributions: RwLock>>, + max_contributions_per_key: usize, +} + +impl Default for SyncCommitteePool { + fn default() -> Self { + Self::new(DEFAULT_MAX_SYNC_CONTRIBUTIONS_PER_KEY) + } +} + +impl SyncCommitteePool { + /// Creates a new SyncCommitteePool with the specified maximum contributions per key. + /// + /// # Arguments + /// * `max_contributions_per_key` - Maximum number of contributions to store per key. + /// Recommended: 128 for mainnet, 8 for testnets. + pub fn new(max_contributions_per_key: usize) -> Self { + Self { + messages: RwLock::new(HashMap::new()), + contributions: RwLock::new(HashMap::new()), + max_contributions_per_key, + } + } + pub fn insert_sync_committee_message( + &self, + message: SyncCommitteeMessage, + subcommittee_index: u64, + ) { + // Store raw messages keyed by (slot, root, subcommittee_index) + let key = SyncCommitteeDataKey { + slot: message.slot, + beacon_block_root: message.beacon_block_root, + subcommittee_index, + }; + + let mut map = self.messages.write(); + let entry = map.entry(key).or_default(); + if entry.len() < self.max_contributions_per_key { + entry.push(message); + } else { + warn!( + "Sync committee message pool capacity reached ({}) for slot {} (subcommittee_index: {}, block_root: {:?}), dropping message from validator {}", + self.max_contributions_per_key, + message.slot, + subcommittee_index, + message.beacon_block_root, + message.validator_index + ); + } + } + + pub fn get_sync_committee_contributions( + &self, + slot: u64, + beacon_block_root: B256, + subcommittee_index: u64, + ) -> Vec { + let key = SyncCommitteeDataKey { + slot, + beacon_block_root, + subcommittee_index, + }; + + self.contributions + .read() + .get(&key) + .cloned() + .unwrap_or_default() + } + + pub fn get_best_sync_committee_contribution( + &self, + slot: u64, + beacon_block_root: B256, + subcommittee_index: u64, + ) -> Option { + let contributions = + self.get_sync_committee_contributions(slot, beacon_block_root, subcommittee_index); + + // Select the contribution with the most aggregation bits set (highest participation) + contributions + .into_iter() + .max_by_key(|c| c.aggregation_bits.iter().filter(|b| *b).count()) + } + + pub fn clean_sync_committee_contributions(&self, current_slot: u64) { + // Keep contributions for current slot and one slot back + self.contributions + .write() + .retain(|key, _| key.slot >= current_slot.saturating_sub(1)); + } + + pub fn clean_sync_committee_messages(&self, current_slot: u64) { + // Keep messages for current slot and one slot back + self.messages + .write() + .retain(|key, _| key.slot >= current_slot.saturating_sub(1)); + } + + /// Aggregates sync committee messages into contributions. + /// + /// This function takes an iterator of (SyncCommitteeMessage, index_in_subcommittee) pairs + /// and aggregates them into the pool's contributions. It creates or updates aggregates by + /// combining signatures and setting the appropriate aggregation bits. + /// + /// For each message: + /// - First attempts to aggregate the signature with existing contributions + /// - Only if aggregation succeeds, sets the corresponding bit in the aggregation bitfield + /// - Avoids duplicate aggregation if the bit is already set + /// + /// This ensures aggregation bits are only set when signatures are successfully aggregated, + /// preventing inconsistent state where a bit is set without the corresponding signature. + pub fn aggregate_messages( + &self, + slot: u64, + beacon_block_root: B256, + subcommittee_index: u64, + messages: impl IntoIterator, + ) { + let key = SyncCommitteeDataKey { + slot, + beacon_block_root, + subcommittee_index, + }; + + let mut contributions_map = self.contributions.write(); + let contributions = contributions_map.entry(key).or_default(); + + // Ensure we have at least one aggregate to work with + if contributions.is_empty() { + contributions.push(SyncCommitteeContribution { + slot, + beacon_block_root, + subcommittee_index, + aggregation_bits: BitVector::::default(), + signature: BLSSignature::infinity(), + }); + } + + for (message, index_in_subcommittee) in messages { + // Skip if any contribution already has this bit set (duplicate check) + if contributions.iter().any(|c| { + c.aggregation_bits + .get(index_in_subcommittee) + .unwrap_or(false) + }) { + continue; + } + + // Try to add this message to an existing contribution + let mut added = false; + + for contribution in contributions.iter_mut() { + // First try to aggregate the signatures. Only set the bit if aggregation succeeds. + match BLSSignature::aggregate(&[&contribution.signature, &message.signature]) { + Ok(aggregated_sig) => { + // Now set the bit for this validator's position + if contribution + .aggregation_bits + .set(index_in_subcommittee, true) + .is_err() + { + warn!( + "Invalid index_in_subcommittee: {} for validator {} at slot {} (subcommittee_index: {}, block_root: {:?})", + index_in_subcommittee, + message.validator_index, + slot, + subcommittee_index, + beacon_block_root + ); + continue; + } + + contribution.signature = aggregated_sig; + added = true; + break; + } + Err(e) => { + warn!( + "Failed to aggregate signature for validator {} at slot {} (subcommittee_index: {}, block_root: {:?}): {:?}", + message.validator_index, slot, subcommittee_index, beacon_block_root, e + ); + continue; + } + } + } + + // If we couldn't add to any existing contribution and we're under the limit, attempt to + // create a new one using the message signature alone. Only set the bit if the signature + // is valid (i.e., can be aggregated/parsed successfully). + if !added && contributions.len() < self.max_contributions_per_key { + match BLSSignature::aggregate(&[&message.signature]) { + Ok(valid_sig) => { + let mut aggregation_bits = BitVector::::default(); + if aggregation_bits.set(index_in_subcommittee, true).is_ok() { + contributions.push(SyncCommitteeContribution { + slot, + beacon_block_root, + subcommittee_index, + aggregation_bits, + signature: valid_sig, + }); + } else { + warn!( + "Invalid index_in_subcommittee: {} for validator {} at slot {} (subcommittee_index: {}, block_root: {:?})", + index_in_subcommittee, + message.validator_index, + slot, + subcommittee_index, + beacon_block_root + ); + } + } + Err(e) => { + warn!( + "Skipping new contribution due to invalid signature for validator {} at slot {} (subcommittee_index: {}, block_root: {:?}): {:?}", + message.validator_index, slot, subcommittee_index, beacon_block_root, e + ); + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use ssz_types::{FixedVector, typenum::U96}; + + use super::*; + + #[test] + fn test_aggregate_messages() { + let pool = SyncCommitteePool::default(); + + let slot = 100u64; + let root = B256::from([1u8; 32]); + let subcommittee_index = 1u64; + + // Create sync committee messages with different validator indices + let messages: Vec<(SyncCommitteeMessage, usize)> = vec![ + ( + SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 10, + signature: BLSSignature::infinity(), + }, + 0, // index_in_subcommittee + ), + ( + SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 20, + signature: BLSSignature::infinity(), + }, + 1, + ), + ( + SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 30, + signature: BLSSignature::infinity(), + }, + 2, + ), + ]; + + // Aggregate the messages + pool.aggregate_messages(slot, root, subcommittee_index, messages); + + // Get the aggregated contributions + let contributions = pool.get_sync_committee_contributions(slot, root, subcommittee_index); + + assert!( + !contributions.is_empty(), + "should have at least one contribution" + ); + + // Find the contribution with the most bits set + let best = contributions + .iter() + .max_by_key(|c| c.aggregation_bits.iter().filter(|b| *b).count()) + .unwrap(); + + // Verify bits 0, 1, 2 are set + assert!(best.aggregation_bits.get(0).unwrap()); + assert!(best.aggregation_bits.get(1).unwrap()); + assert!(best.aggregation_bits.get(2).unwrap()); + } + + #[test] + fn test_aggregate_messages_avoids_duplicates() { + let pool = SyncCommitteePool::default(); + + let slot = 100u64; + let root = B256::from([1u8; 32]); + let subcommittee_index = 1u64; + + let message = SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 10, + signature: BLSSignature::infinity(), + }; + + // Aggregate the same message twice with the same index + pool.aggregate_messages(slot, root, subcommittee_index, vec![(message.clone(), 0)]); + pool.aggregate_messages(slot, root, subcommittee_index, vec![(message, 0)]); + + let contributions = pool.get_sync_committee_contributions(slot, root, subcommittee_index); + + // Should still have one contribution with bit 0 set only once + assert_eq!(contributions.len(), 1); + assert!(contributions[0].aggregation_bits.get(0).unwrap()); + + let count = contributions[0] + .aggregation_bits + .iter() + .filter(|b| *b) + .count(); + assert_eq!(count, 1, "should have exactly 1 bit set, not duplicated"); + } + + #[test] + fn test_insert_sync_committee_message() { + let pool = SyncCommitteePool::default(); + + let slot = 100u64; + let root = B256::from([1u8; 32]); + let subcommittee_index = 1u64; + + let message = SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 42, + signature: BLSSignature::infinity(), + }; + + // Insert message + pool.insert_sync_committee_message(message.clone(), subcommittee_index); + + // Verify message was stored + let messages = pool.messages.read(); + let key = SyncCommitteeDataKey { + slot, + beacon_block_root: root, + subcommittee_index, + }; + let stored = messages.get(&key).expect("should have messages for key"); + assert_eq!(stored.len(), 1); + assert_eq!(stored[0].validator_index, 42); + } + + #[test] + fn test_insert_sync_committee_message_capacity_limit() { + let pool = SyncCommitteePool::default(); + let max_capacity = pool.max_contributions_per_key; + + let slot = 100u64; + let root = B256::from([1u8; 32]); + let subcommittee_index = 1u64; + + // Insert max_capacity messages + for i in 0..max_capacity { + let message = SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: i as u64, + signature: BLSSignature::infinity(), + }; + pool.insert_sync_committee_message(message, subcommittee_index); + } + + // Verify we hit the limit + let messages = pool.messages.read(); + let key = SyncCommitteeDataKey { + slot, + beacon_block_root: root, + subcommittee_index, + }; + let stored = messages.get(&key).expect("should have messages for key"); + assert_eq!(stored.len(), max_capacity); + + // Try to insert one more - should be ignored + let extra_message = SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 999, + signature: BLSSignature::infinity(), + }; + drop(messages); + pool.insert_sync_committee_message(extra_message, subcommittee_index); + + let messages = pool.messages.read(); + let stored = messages.get(&key).expect("should have messages for key"); + assert_eq!(stored.len(), max_capacity, "should not exceed capacity"); + } + + #[test] + fn test_get_sync_committee_contributions_empty() { + let pool = SyncCommitteePool::default(); + + let contributions = pool.get_sync_committee_contributions(100, B256::from([1u8; 32]), 1); + + assert!( + contributions.is_empty(), + "should return empty vec when no contributions exist" + ); + } + + #[test] + fn test_get_best_sync_committee_contribution_returns_highest_participation() { + let pool = SyncCommitteePool::default(); + + let slot = 100u64; + let root = B256::from([1u8; 32]); + let subcommittee_index = 1u64; + + // Create contributions with different participation levels + let messages = vec![ + ( + SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 1, + signature: BLSSignature::infinity(), + }, + 0, + ), + ( + SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 2, + signature: BLSSignature::infinity(), + }, + 1, + ), + ( + SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 3, + signature: BLSSignature::infinity(), + }, + 2, + ), + ]; + + pool.aggregate_messages(slot, root, subcommittee_index, messages); + + let best = pool + .get_best_sync_committee_contribution(slot, root, subcommittee_index) + .expect("should return best contribution"); + + let count = best.aggregation_bits.iter().filter(|b| *b).count(); + assert_eq!(count, 3, "best contribution should have 3 bits set"); + } + + #[test] + fn test_get_best_sync_committee_contribution_returns_none_when_empty() { + let pool = SyncCommitteePool::default(); + + let best = pool.get_best_sync_committee_contribution(100, B256::from([1u8; 32]), 1); + + assert!( + best.is_none(), + "should return None when no contributions exist" + ); + } + + #[test] + fn test_clean_sync_committee_contributions() { + let pool = SyncCommitteePool::default(); + + let root = B256::from([1u8; 32]); + let subcommittee_index = 1u64; + + // Insert contributions for multiple slots + for slot in [100, 101, 102, 103] { + pool.aggregate_messages( + slot, + root, + subcommittee_index, + vec![( + SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 42, + signature: BLSSignature::infinity(), + }, + 0, + )], + ); + } + + // Clean at slot 102 - should keep slots 101 and 102 (current_slot - 1 and current_slot) + pool.clean_sync_committee_contributions(102); + + // Verify old slots are removed + assert!( + pool.get_sync_committee_contributions(100, root, subcommittee_index) + .is_empty(), + "slot 100 should be cleaned" + ); + + // Verify recent slots are kept + assert!( + !pool + .get_sync_committee_contributions(101, root, subcommittee_index) + .is_empty(), + "slot 101 should be kept" + ); + assert!( + !pool + .get_sync_committee_contributions(102, root, subcommittee_index) + .is_empty(), + "slot 102 should be kept" + ); + assert!( + !pool + .get_sync_committee_contributions(103, root, subcommittee_index) + .is_empty(), + "slot 103 should be kept (future slot)" + ); + } + + #[test] + fn test_clean_sync_committee_messages() { + let pool = SyncCommitteePool::default(); + + let root = B256::from([1u8; 32]); + let subcommittee_index = 1u64; + + // Insert messages for multiple slots + for slot in [100, 101, 102, 103] { + let message = SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 42, + signature: BLSSignature::infinity(), + }; + pool.insert_sync_committee_message(message, subcommittee_index); + } + + // Clean at slot 102 - should keep slots 101 and 102 (current_slot - 1 and current_slot) + pool.clean_sync_committee_messages(102); + + let messages = pool.messages.read(); + + // Verify old slots are removed + let key_100 = SyncCommitteeDataKey { + slot: 100, + beacon_block_root: root, + subcommittee_index, + }; + assert!( + !messages.contains_key(&key_100), + "slot 100 should be cleaned" + ); + + // Verify recent slots are kept + let key_101 = SyncCommitteeDataKey { + slot: 101, + beacon_block_root: root, + subcommittee_index, + }; + assert!(messages.contains_key(&key_101), "slot 101 should be kept"); + + let key_102 = SyncCommitteeDataKey { + slot: 102, + beacon_block_root: root, + subcommittee_index, + }; + assert!(messages.contains_key(&key_102), "slot 102 should be kept"); + + let key_103 = SyncCommitteeDataKey { + slot: 103, + beacon_block_root: root, + subcommittee_index, + }; + assert!( + messages.contains_key(&key_103), + "slot 103 should be kept (future slot)" + ); + } + + #[test] + fn test_aggregate_messages_with_multiple_subcommittees() { + let pool = SyncCommitteePool::default(); + + let slot = 100u64; + let root = B256::from([1u8; 32]); + + // Add messages to different subcommittees + let messages_subnet_0 = vec![( + SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 1, + signature: BLSSignature::infinity(), + }, + 0, + )]; + + let messages_subnet_1 = vec![( + SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 2, + signature: BLSSignature::infinity(), + }, + 0, + )]; + + pool.aggregate_messages(slot, root, 0, messages_subnet_0); + pool.aggregate_messages(slot, root, 1, messages_subnet_1); + + // Verify both subcommittees have their own contributions + let contributions_0 = pool.get_sync_committee_contributions(slot, root, 0); + let contributions_1 = pool.get_sync_committee_contributions(slot, root, 1); + + assert!( + !contributions_0.is_empty(), + "subcommittee 0 should have contributions" + ); + assert!( + !contributions_1.is_empty(), + "subcommittee 1 should have contributions" + ); + } + + #[test] + fn test_custom_max_contributions_per_key() { + // Test with testnet-sized capacity + let pool = SyncCommitteePool::new(8); + assert_eq!(pool.max_contributions_per_key, 8); + + let slot = 100u64; + let root = B256::from([1u8; 32]); + let subcommittee_index = 1u64; + + // Insert 8 messages (should all be accepted) + for i in 0..8 { + let message = SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: i, + signature: BLSSignature::infinity(), + }; + pool.insert_sync_committee_message(message, subcommittee_index); + } + + let messages = pool.messages.read(); + let key = SyncCommitteeDataKey { + slot, + beacon_block_root: root, + subcommittee_index, + }; + let stored = messages.get(&key).expect("should have messages"); + assert_eq!(stored.len(), 8); + + // Try to insert 9th message - should be ignored + drop(messages); + let extra_message = SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 999, + signature: BLSSignature::infinity(), + }; + pool.insert_sync_committee_message(extra_message, subcommittee_index); + + let messages = pool.messages.read(); + let stored = messages.get(&key).expect("should have messages"); + assert_eq!(stored.len(), 8, "should not exceed custom capacity of 8"); + } + + #[test] + fn test_aggregate_messages_creates_initial_contribution() { + let pool = SyncCommitteePool::default(); + + let slot = 100u64; + let root = B256::from([1u8; 32]); + let subcommittee_index = 1u64; + + // Verify pool is initially empty + assert!( + pool.get_sync_committee_contributions(slot, root, subcommittee_index) + .is_empty() + ); + + // Aggregate with empty iterator - should create initial contribution + pool.aggregate_messages(slot, root, subcommittee_index, vec![]); + + // Should now have one contribution (the initial empty one) + let contributions = pool.get_sync_committee_contributions(slot, root, subcommittee_index); + assert_eq!(contributions.len(), 1); + assert_eq!( + contributions[0] + .aggregation_bits + .iter() + .filter(|b| *b) + .count(), + 0, + "initial contribution should have no bits set" + ); + } + + #[test] + fn test_bit_not_set_on_aggregation_failure() { + let pool = SyncCommitteePool::default(); + + let slot = 200u64; + let root = B256::from([2u8; 32]); + let subcommittee_index = 0u64; + + // Construct an invalid/corrupted signature that should fail conversion/validation. + let bad_sig = BLSSignature { + inner: FixedVector::::try_from(vec![0u8; 96]).unwrap(), + }; + + let message = SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 7, + signature: bad_sig, + }; + + // Aggregate a single bad message at index 0 + pool.aggregate_messages(slot, root, subcommittee_index, vec![(message, 0)]); + + let contributions = pool.get_sync_committee_contributions(slot, root, subcommittee_index); + assert!( + !contributions.is_empty(), + "there should be at least the initial contribution" + ); + + // The initial contribution is created before processing messages. + // Ensure its bit 0 is NOT set since aggregation failed. + let initial = &contributions[0]; + assert!( + !initial.aggregation_bits.get(0).unwrap_or(false), + "bit must not be set in the pre-existing contribution when aggregation fails" + ); + } +} diff --git a/crates/common/validator/beacon/src/contribution_and_proof.rs b/crates/common/validator/beacon/src/contribution_and_proof.rs index c594fa4b8..708b49fda 100644 --- a/crates/common/validator/beacon/src/contribution_and_proof.rs +++ b/crates/common/validator/beacon/src/contribution_and_proof.rs @@ -1,6 +1,8 @@ use ream_bls::{BLSSignature, PrivateKey, traits::Signable}; use ream_consensus_misc::misc::{compute_domain, compute_signing_root}; -use ream_events_beacon::contribution_and_proof::{ContributionAndProof, SyncCommitteeContribution}; +pub use ream_events_beacon::contribution_and_proof::{ + ContributionAndProof, SyncCommitteeContribution, +}; use ream_network_spec::networks::beacon_network_spec; use crate::{ diff --git a/crates/networking/manager/Cargo.toml b/crates/networking/manager/Cargo.toml index d16bb7003..62f046112 100644 --- a/crates/networking/manager/Cargo.toml +++ b/crates/networking/manager/Cargo.toml @@ -41,6 +41,7 @@ ream-operation-pool.workspace = true ream-p2p.workspace = true ream-polynomial-commitments.workspace = true ream-storage.workspace = true +ream-sync-committee-pool.workspace = true ream-syncer.workspace = true ream-validator-beacon.workspace = true diff --git a/crates/networking/manager/src/service.rs b/crates/networking/manager/src/service.rs index 0a69e3b9e..81c1d89c4 100644 --- a/crates/networking/manager/src/service.rs +++ b/crates/networking/manager/src/service.rs @@ -16,6 +16,7 @@ use ream_p2p::{ network::beacon::{Network, ReamNetworkEvent, network_state::NetworkState}, }; use ream_storage::{cache::CachedDB, db::beacon::BeaconDB}; +use ream_sync_committee_pool::SyncCommitteePool; use ream_syncer::block_range::BlockRangeSyncer; use tokio::{sync::mpsc, time::interval}; use tracing::{error, info}; @@ -35,6 +36,7 @@ pub struct NetworkManagerService { pub block_range_syncer: BlockRangeSyncer, pub ream_db: BeaconDB, pub cached_db: Arc, + pub sync_committee_pool: Arc, } /// The `NetworkManagerService` acts as the manager for all networking activities in Ream. @@ -57,6 +59,7 @@ impl NetworkManagerService { ream_db: BeaconDB, ream_dir: PathBuf, beacon_chain: Arc, + sync_committee_pool: Arc, cached_db: Arc, ) -> anyhow::Result { let discv5_config = discv5::ConfigBuilder::new(discv5::ListenConfig::from_ip( @@ -116,6 +119,7 @@ impl NetworkManagerService { block_range_syncer, ream_db, cached_db, + sync_committee_pool, }) } diff --git a/crates/rpc/beacon/Cargo.toml b/crates/rpc/beacon/Cargo.toml index 3abd78577..3bcc76a85 100644 --- a/crates/rpc/beacon/Cargo.toml +++ b/crates/rpc/beacon/Cargo.toml @@ -48,6 +48,7 @@ ream-p2p.workspace = true ream-peer.workspace = true ream-rpc-common.workspace = true ream-storage.workspace = true +ream-sync-committee-pool.workspace = true ream-validator-beacon.workspace = true [lints] diff --git a/crates/rpc/beacon/src/handlers/debug.rs b/crates/rpc/beacon/src/handlers/debug.rs index 4b4690802..bb20aef93 100644 --- a/crates/rpc/beacon/src/handlers/debug.rs +++ b/crates/rpc/beacon/src/handlers/debug.rs @@ -36,10 +36,11 @@ pub async fn get_debug_beacon_heads(db: Data) -> Result) -> Result, execution_engine: &Option, ) -> Result { - let store = Store { - db: db.clone(), - operation_pool: operation_pool.clone(), - }; + let store = Store::new(db.clone(), operation_pool.clone(), None); // get head_slot let head = store.get_head().map_err(|err| { diff --git a/crates/rpc/beacon/src/handlers/validator.rs b/crates/rpc/beacon/src/handlers/validator.rs index 592a76a61..46bc7a32d 100644 --- a/crates/rpc/beacon/src/handlers/validator.rs +++ b/crates/rpc/beacon/src/handlers/validator.rs @@ -10,7 +10,7 @@ use ream_api_types_beacon::{ block::{FullBlockData, ProduceBlockData, ProduceBlockResponse}, committee::BeaconCommitteeSubscription, id::ValidatorID, - query::{AttestationQuery, IdQuery, StatusQuery}, + query::{AttestationQuery, IdQuery, StatusQuery, SyncCommitteeContributionQuery}, request::ValidatorsPostRequest, responses::{BeaconResponse, DataResponse, DataVersionedResponse}, validator::{ValidatorBalance, ValidatorData, ValidatorStatus}, @@ -59,7 +59,11 @@ use ream_execution_engine::{ use ream_fork_choice_beacon::store::Store; use ream_network_manager::gossipsub::validate::sync_committee_contribution_and_proof::get_sync_subcommittee_pubkeys; use ream_operation_pool::OperationPool; -use ream_storage::{db::beacon::BeaconDB, tables::field::REDBField}; +use ream_storage::{ + db::beacon::BeaconDB, + tables::{field::REDBField, table::REDBTable}, +}; +use ream_sync_committee_pool::SyncCommitteePool; use ream_validator_beacon::{ aggregate_and_proof::SignedAggregateAndProof, attestation::{compute_on_chain_aggregate, compute_subnet_for_attestation}, @@ -500,10 +504,11 @@ pub async fn get_attestation_data( opertation_pool: Data>, query: Query, ) -> Result { - let store = Store { - db: db.get_ref().clone(), - operation_pool: opertation_pool.get_ref().clone(), - }; + let store = Store::new( + db.get_ref().clone(), + opertation_pool.get_ref().clone(), + None, + ); if store.is_syncing().map_err(|err| { ApiError::InternalError(format!("Failed to check syncing status, err: {err:?}")) @@ -567,6 +572,68 @@ pub async fn post_beacon_committee_selections( Ok(HttpResponse::NotImplemented()) } +#[get("/validator/sync_committee_contribution")] +pub async fn get_sync_committee_contribution( + db: Data, + operation_pool: Data>, + sync_committee_pool: Data>, + query: Query, +) -> Result { + let store = Store::new(db.get_ref().clone(), operation_pool.get_ref().clone(), None); + + if store.is_syncing().map_err(|err| { + ApiError::InternalError(format!("Failed to check syncing status, err: {err:?}")) + })? { + return Err(ApiError::UnderSyncing); + } + + let slot = query.slot; + let subcommittee_index = query.subcommittee_index; + let beacon_block_root = query.beacon_block_root; + + // Validate subcommittee index + if subcommittee_index >= SYNC_COMMITTEE_SUBNET_COUNT { + return Err(ApiError::InvalidParameter(format!( + "Invalid subcommittee_index: {subcommittee_index}, must be less than {SYNC_COMMITTEE_SUBNET_COUNT}" + ))); + } + + let current_slot = store.get_current_slot().map_err(|err| { + ApiError::InternalError(format!("Failed to get current slot, err: {err:?}")) + })?; + + // Validate slot is not too far in the future + if slot > current_slot + 1 { + return Err(ApiError::InvalidParameter(format!( + "Slot {slot:?} is too far ahead of the current slot {current_slot:?}" + ))); + } + + // Validate beacon block exists + db.block_provider() + .get(beacon_block_root) + .map_err(|err| ApiError::InternalError(format!("Failed to get beacon block: {err:?}")))? + .ok_or_else(|| { + ApiError::NotFound(format!("Beacon block root {beacon_block_root:?} not found")) + })?; + + // Check if block is fully verified (not optimistic) + // TODO: Add optimistic sync check when execution layer integration is complete + // For now, we assume all blocks in fork choice are fully verified + + // Try to get the best (highest-participation) sync committee contribution from the pool + // Per spec: return 404 if no contribution is available + let sync_committee_contribution = sync_committee_pool + .get_best_sync_committee_contribution(slot, beacon_block_root, subcommittee_index) + .ok_or_else(|| { + ApiError::NotFound(format!( + "No sync committee contribution available for beacon block root {beacon_block_root:?}" + )) + })?; + + Ok(HttpResponse::Ok().json(DataResponse::new(sync_committee_contribution))) +} + #[post("/validator/aggregate_and_proofs")] pub async fn post_aggregate_and_proofs_v2( db: Data, @@ -915,7 +982,7 @@ pub async fn post_contribution_and_proofs( event_sender: Data>, contributions: Json>, ) -> Result { - let store = Store::new(db.get_ref().clone(), operation_pool.get_ref().clone()); + let store = Store::new(db.get_ref().clone(), operation_pool.get_ref().clone(), None); if store.is_syncing().map_err(|err| { ApiError::InternalError(format!("Failed to check syncing status: {err:?}")) diff --git a/crates/rpc/beacon/src/routes/validator.rs b/crates/rpc/beacon/src/routes/validator.rs index 2aef11658..1cc94a75c 100644 --- a/crates/rpc/beacon/src/routes/validator.rs +++ b/crates/rpc/beacon/src/routes/validator.rs @@ -5,8 +5,9 @@ use crate::handlers::{ prepare_beacon_proposer::prepare_beacon_proposer, validator::{ get_aggregate_attestation, get_attestation_data, get_blocks_v3, - post_aggregate_and_proofs_v2, post_beacon_committee_selections, - post_beacon_committee_subscriptions, post_contribution_and_proofs, post_register_validator, + get_sync_committee_contribution, post_aggregate_and_proofs_v2, + post_beacon_committee_selections, post_beacon_committee_subscriptions, + post_contribution_and_proofs, post_register_validator, }, }; @@ -17,6 +18,7 @@ pub fn register_validator_routes_v1(config: &mut ServiceConfig) { config.service(prepare_beacon_proposer); config.service(get_attestation_data); config.service(post_beacon_committee_selections); + config.service(get_sync_committee_contribution); config.service(post_beacon_committee_subscriptions); config.service(post_contribution_and_proofs); config.service(post_register_validator); diff --git a/crates/rpc/beacon/src/server.rs b/crates/rpc/beacon/src/server.rs index dbe48ee86..e26fd1f40 100644 --- a/crates/rpc/beacon/src/server.rs +++ b/crates/rpc/beacon/src/server.rs @@ -8,6 +8,7 @@ use ream_operation_pool::OperationPool; use ream_p2p::network::beacon::network_state::NetworkState; use ream_rpc_common::{config::RpcServerConfig, server::RpcServerBuilder}; use ream_storage::{cache::CachedDB, db::beacon::BeaconDB}; +use ream_sync_committee_pool::SyncCommitteePool; use ream_validator_beacon::builder::builder_client::BuilderClient; use tokio::sync::broadcast; @@ -20,6 +21,7 @@ pub async fn start( db: BeaconDB, network_state: Arc, operation_pool: Arc, + sync_committee_pool: Arc, execution_engine: Option, builder_client: Option>, event_sender: broadcast::Sender, @@ -32,6 +34,7 @@ pub async fn start( .with_data(db) .with_data(network_state) .with_data(operation_pool) + .with_data(sync_committee_pool) .with_data(execution_engine) .with_data(builder_client) .with_data(event_sender) diff --git a/testing/gossip-validation/Cargo.toml b/testing/gossip-validation/Cargo.toml index 53ca80cea..74d22fbfa 100644 --- a/testing/gossip-validation/Cargo.toml +++ b/testing/gossip-validation/Cargo.toml @@ -28,6 +28,7 @@ ream-network-manager = { workspace = true, features = ["disable_ancestor_validat ream-network-spec.workspace = true ream-operation-pool.workspace = true ream-storage.workspace = true +ream-sync-committee-pool.workspace = true [lints] workspace = true diff --git a/testing/gossip-validation/tests/validate_block.rs b/testing/gossip-validation/tests/validate_block.rs index 16e7b4f04..7bec44653 100644 --- a/testing/gossip-validation/tests/validate_block.rs +++ b/testing/gossip-validation/tests/validate_block.rs @@ -21,6 +21,7 @@ mod tests { db::{ReamDB, beacon::BeaconDB}, tables::{field::REDBField, table::REDBTable}, }; + use ream_sync_committee_pool::SyncCommitteePool; use snap::raw::Decoder; use ssz::Decode; use tempdir::TempDir; @@ -72,8 +73,15 @@ mod tests { .await; let operation_pool = OperationPool::default(); + let sync_committee_pool = SyncCommitteePool::default(); let cached_db = CachedDB::default(); - let beacon_chain = BeaconChain::new(db, operation_pool.into(), None, None); + let beacon_chain = BeaconChain::new( + db, + operation_pool.into(), + sync_committee_pool.into(), + None, + None, + ); (beacon_chain, cached_db, block_root) } From 7b1dff74389922c99aa71651ee45e5cbde0221c9 Mon Sep 17 00:00:00 2001 From: ShiroObiJohn <28779404+ShiroObiJohn@users.noreply.github.com> Date: Fri, 3 Oct 2025 17:01:06 +0800 Subject: [PATCH 2/2] fix: add [lints] to sync_committee_pool Cargo.toml --- crates/common/sync_committee_pool/Cargo.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/common/sync_committee_pool/Cargo.toml b/crates/common/sync_committee_pool/Cargo.toml index e41eb6fb0..8a3d6dc7a 100644 --- a/crates/common/sync_committee_pool/Cargo.toml +++ b/crates/common/sync_committee_pool/Cargo.toml @@ -22,3 +22,6 @@ tree_hash_derive.workspace = true # ream-dependencies ream-bls.workspace = true ream-validator-beacon.workspace = true + +[lints] +workspace = true