From 24e982bf85692a5be91099a419d819d039ca138e Mon Sep 17 00:00:00 2001 From: ShiroObiJohn <28779404+ShiroObiJohn@users.noreply.github.com> Date: Fri, 3 Oct 2025 15:51:33 +0800 Subject: [PATCH 1/2] feat: add sync committee pool to manage sync committee messages and contributions. get /eth/v1/validator/sync_committee_contribution endpoint --- Cargo.lock | 23 + Cargo.toml | 2 + bin/ream/Cargo.toml | 1 + bin/ream/src/main.rs | 4 + crates/common/api_types/beacon/src/query.rs | 7 + crates/common/chain/beacon/Cargo.toml | 1 + .../common/chain/beacon/src/beacon_chain.rs | 4 +- crates/common/fork_choice/Cargo.toml | 1 + crates/common/fork_choice/src/store.rs | 24 +- crates/common/sync_committee_pool/Cargo.toml | 24 + crates/common/sync_committee_pool/src/lib.rs | 776 ++++++++++++++++++ crates/networking/manager/Cargo.toml | 1 + crates/networking/manager/src/service.rs | 5 + crates/rpc/beacon/Cargo.toml | 2 + crates/rpc/beacon/src/handlers/debug.rs | 18 +- .../src/handlers/prepare_beacon_proposer.rs | 2 +- crates/rpc/beacon/src/handlers/syncing.rs | 5 +- crates/rpc/beacon/src/handlers/validator.rs | 80 +- crates/rpc/beacon/src/lib.rs | 3 + crates/rpc/beacon/src/routes/validator.rs | 5 +- testing/gossip-validation/Cargo.toml | 1 + .../gossip-validation/tests/validate_block.rs | 5 +- 22 files changed, 969 insertions(+), 25 deletions(-) create mode 100644 crates/common/sync_committee_pool/Cargo.toml create mode 100644 crates/common/sync_committee_pool/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 2263cd077..5d05f5110 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2580,6 +2580,7 @@ dependencies = [ "ream-network-spec", "ream-operation-pool", "ream-storage", + "ream-sync-committee-pool", "serde", "serde_json", "snap", @@ -5246,6 +5247,7 @@ dependencies = [ "ream-rpc-lean", "ream-storage", "ream-sync", + "ream-sync-committee-pool", "ream-validator-beacon", "ream-validator-lean", "serde_json", @@ -5342,6 +5344,7 @@ dependencies = [ "ream-operation-pool", "ream-p2p", "ream-storage", + "ream-sync-committee-pool", "tokio", "tracing", ] @@ -5536,6 +5539,7 @@ dependencies = [ "ream-operation-pool", "ream-polynomial-commitments", "ream-storage", + "ream-sync-committee-pool", "rust-kzg-blst", "serde", "serde_json", @@ -5622,6 +5626,7 @@ dependencies = [ "ream-p2p", "ream-polynomial-commitments", "ream-storage", + "ream-sync-committee-pool", "ream-syncer", "ream-validator-beacon", "tokio", @@ -5780,6 +5785,8 @@ dependencies = [ "ream-p2p", "ream-rpc-common", "ream-storage", + "ream-sync-committee-pool", + "ream-validator-beacon", "serde", "serde_json", "ssz_types", @@ -5853,6 +5860,22 @@ dependencies = [ "tokio", ] +[[package]] +name = "ream-sync-committee-pool" +version = "0.1.0" +dependencies = [ + "alloy-primitives", + "parking_lot", + "ream-bls", + "ream-validator-beacon", + "serde", + "ssz_types", + "thiserror 2.0.12", + "tracing", + "tree_hash", + "tree_hash_derive", +] + [[package]] name = "ream-syncer" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 25cb778c3..157205151 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "crates/common/operation_pool", "crates/common/polynomial_commitments", "crates/common/sync", + "crates/common/sync_committee_pool", "crates/common/validator/beacon", "crates/common/validator/lean", "crates/crypto/bls", @@ -161,6 +162,7 @@ ream-rpc-common = { path = "crates/rpc/common" } ream-rpc-lean = { path = "crates/rpc/lean" } ream-storage = { path = "crates/storage" } ream-sync = { path = "crates/common/sync" } +ream-sync-committee-pool = { path = "crates/common/sync_committee_pool" } ream-syncer = { path = "crates/networking/syncer" } ream-validator-beacon = { path = "crates/common/validator/beacon" } ream-validator-lean = { path = "crates/common/validator/lean" } diff --git a/bin/ream/Cargo.toml b/bin/ream/Cargo.toml index 8c5a2a632..199e70641 100644 --- a/bin/ream/Cargo.toml +++ b/bin/ream/Cargo.toml @@ -54,6 +54,7 @@ ream-rpc-beacon.workspace = true ream-rpc-lean.workspace = true ream-storage.workspace = true ream-sync.workspace = true +ream-sync-committee-pool.workspace = true ream-validator-beacon.workspace = true ream-validator-lean.workspace = true diff --git a/bin/ream/src/main.rs b/bin/ream/src/main.rs index 5b749f1d0..676a4b26a 100644 --- a/bin/ream/src/main.rs +++ b/bin/ream/src/main.rs @@ -58,6 +58,7 @@ use ream_storage::{ tables::table::Table, }; use ream_sync::rwlock::Writer; +use ream_sync_committee_pool::SyncCommitteePool; use ream_validator_beacon::{ beacon_api_client::BeaconApiClient, validator::ValidatorService, voluntary_exit::process_voluntary_exit, @@ -305,6 +306,7 @@ pub async fn run_beacon_node(config: BeaconNodeConfig, executor: ReamExecutor, r ); let operation_pool = Arc::new(OperationPool::default()); + let sync_committee_pool = Arc::new(SyncCommitteePool::default()); let server_config = RpcServerConfig::new( config.http_address, @@ -318,6 +320,7 @@ pub async fn run_beacon_node(config: BeaconNodeConfig, executor: ReamExecutor, r beacon_db.clone(), beacon_db.data_dir.clone(), operation_pool.clone(), + sync_committee_pool.clone(), ) .await .expect("Failed to create manager service"); @@ -336,6 +339,7 @@ pub async fn run_beacon_node(config: BeaconNodeConfig, executor: ReamExecutor, r beacon_db, network_state, operation_pool, + sync_committee_pool, execution_engine, ) .await diff --git a/crates/common/api_types/beacon/src/query.rs b/crates/common/api_types/beacon/src/query.rs index c1e2d2cbc..f3e24be37 100644 --- a/crates/common/api_types/beacon/src/query.rs +++ b/crates/common/api_types/beacon/src/query.rs @@ -50,6 +50,13 @@ pub struct AttestationQuery { pub committee_index: u64, } +#[derive(Debug, Deserialize)] +pub struct SyncCommitteeContributionQuery { + pub slot: u64, + pub subcommittee_index: u64, + pub beacon_block_root: B256, +} + impl StatusQuery { pub fn has_status(&self) -> bool { match &self.status { diff --git a/crates/common/chain/beacon/Cargo.toml b/crates/common/chain/beacon/Cargo.toml index cbf9d4bad..58e890c60 100644 --- a/crates/common/chain/beacon/Cargo.toml +++ b/crates/common/chain/beacon/Cargo.toml @@ -26,6 +26,7 @@ ream-network-spec.workspace = true ream-operation-pool.workspace = true ream-p2p.workspace = true ream-storage.workspace = true +ream-sync-committee-pool.workspace = true [lints] workspace = true diff --git a/crates/common/chain/beacon/src/beacon_chain.rs b/crates/common/chain/beacon/src/beacon_chain.rs index 504994e35..37857641c 100644 --- a/crates/common/chain/beacon/src/beacon_chain.rs +++ b/crates/common/chain/beacon/src/beacon_chain.rs @@ -18,6 +18,7 @@ use ream_storage::{ db::beacon::BeaconDB, tables::{field::Field, table::Table}, }; +use ream_sync_committee_pool::SyncCommitteePool; use tokio::sync::Mutex; use tracing::warn; @@ -32,10 +33,11 @@ impl BeaconChain { pub fn new( db: BeaconDB, operation_pool: Arc, + sync_committee_pool: Arc, execution_engine: Option, ) -> Self { Self { - store: Mutex::new(Store::new(db, operation_pool)), + store: Mutex::new(Store::new(db, operation_pool, Some(sync_committee_pool))), execution_engine, } } diff --git a/crates/common/fork_choice/Cargo.toml b/crates/common/fork_choice/Cargo.toml index f9d099d63..6cd4b0736 100644 --- a/crates/common/fork_choice/Cargo.toml +++ b/crates/common/fork_choice/Cargo.toml @@ -42,6 +42,7 @@ ream-network-spec.workspace = true ream-operation-pool.workspace = true ream-polynomial-commitments.workspace = true ream-storage.workspace = true +ream-sync-committee-pool.workspace = true [lints] workspace = true diff --git a/crates/common/fork_choice/src/store.rs b/crates/common/fork_choice/src/store.rs index 4a000d6eb..a55cd5cad 100644 --- a/crates/common/fork_choice/src/store.rs +++ b/crates/common/fork_choice/src/store.rs @@ -28,6 +28,7 @@ use ream_storage::{ db::beacon::BeaconDB, tables::{field::Field, multimap_table::MultimapTable, table::Table}, }; +use ream_sync_committee_pool::SyncCommitteePool; use tree_hash::TreeHash; use crate::constants::{ @@ -46,11 +47,22 @@ pub struct BlockWithEpochInfo { pub struct Store { pub db: BeaconDB, pub operation_pool: Arc, + pub sync_committee_pool: Arc, } impl Store { - pub fn new(db: BeaconDB, operation_pool: Arc) -> Self { - Self { db, operation_pool } + pub fn new( + db: BeaconDB, + operation_pool: Arc, + sync_committee_pool: Option>, + ) -> Self { + let sync_committee_pool = + sync_committee_pool.unwrap_or_else(|| Arc::new(SyncCommitteePool::default())); + Self { + db, + operation_pool, + sync_committee_pool, + } } pub fn is_previous_epoch_justified(&self) -> anyhow::Result { @@ -594,6 +606,12 @@ impl Store { // If this is a new slot, reset store.proposer_boost_root if current_slot > previous_slot { self.db.proposer_boost_root_provider().insert(B256::ZERO)?; + + // Clean old sync committee messages and contributions per slot + self.sync_committee_pool + .clean_sync_committee_messages(current_slot); + self.sync_committee_pool + .clean_sync_committee_contributions(current_slot); } // If a new epoch, pull-up justification and finalization from previous epoch @@ -880,7 +898,7 @@ pub fn get_forkchoice_store( let operation_pool = Arc::new(OperationPool::default()); - Ok(Store { db, operation_pool }) + Ok(Store::new(db, operation_pool, None)) } pub fn compute_slots_since_epoch_start(slot: u64) -> u64 { diff --git a/crates/common/sync_committee_pool/Cargo.toml b/crates/common/sync_committee_pool/Cargo.toml new file mode 100644 index 000000000..e41eb6fb0 --- /dev/null +++ b/crates/common/sync_committee_pool/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "ream-sync-committee-pool" +authors.workspace = true +edition.workspace = true +keywords.workspace = true +license.workspace = true +readme.workspace = true +repository.workspace = true +rust-version.workspace = true +version.workspace = true + +[dependencies] +alloy-primitives.workspace = true +parking_lot.workspace = true +serde.workspace = true +ssz_types.workspace = true +thiserror.workspace = true +tracing.workspace = true +tree_hash.workspace = true +tree_hash_derive.workspace = true + +# ream-dependencies +ream-bls.workspace = true +ream-validator-beacon.workspace = true diff --git a/crates/common/sync_committee_pool/src/lib.rs b/crates/common/sync_committee_pool/src/lib.rs new file mode 100644 index 000000000..a688180c4 --- /dev/null +++ b/crates/common/sync_committee_pool/src/lib.rs @@ -0,0 +1,776 @@ +use std::collections::HashMap; + +use alloy_primitives::B256; +use parking_lot::RwLock; +use ream_bls::{BLSSignature, traits::Aggregatable}; +use ream_validator_beacon::{ + contribution_and_proof::SyncCommitteeContribution, sync_committee::SyncCommitteeMessage, +}; +use ssz_types::{BitVector, typenum::U128}; +use tracing::warn; +use tree_hash_derive::TreeHash; + +#[derive(Debug, Clone, PartialEq, Eq, Hash, TreeHash)] +pub struct SyncCommitteeDataKey { + pub slot: u64, + pub beacon_block_root: B256, + pub subcommittee_index: u64, +} + +// Default maximum number of sync committee contributions to store per key +// This prevents memory issues while allowing proper aggregation +// Mainnet: 128, Testnet: 8 +const DEFAULT_MAX_SYNC_CONTRIBUTIONS_PER_KEY: usize = 128; + +#[derive(Debug)] +pub struct SyncCommitteePool { + messages: RwLock>>, + contributions: RwLock>>, + max_contributions_per_key: usize, +} + +impl Default for SyncCommitteePool { + fn default() -> Self { + Self::new(DEFAULT_MAX_SYNC_CONTRIBUTIONS_PER_KEY) + } +} + +impl SyncCommitteePool { + /// Creates a new SyncCommitteePool with the specified maximum contributions per key. + /// + /// # Arguments + /// * `max_contributions_per_key` - Maximum number of contributions to store per key. + /// Recommended: 128 for mainnet, 8 for testnets. + pub fn new(max_contributions_per_key: usize) -> Self { + Self { + messages: RwLock::new(HashMap::new()), + contributions: RwLock::new(HashMap::new()), + max_contributions_per_key, + } + } + pub fn insert_sync_committee_message( + &self, + message: SyncCommitteeMessage, + subcommittee_index: u64, + ) { + // Store raw messages keyed by (slot, root, subcommittee_index) + let key = SyncCommitteeDataKey { + slot: message.slot, + beacon_block_root: message.beacon_block_root, + subcommittee_index, + }; + + let mut map = self.messages.write(); + let entry = map.entry(key).or_default(); + if entry.len() < self.max_contributions_per_key { + entry.push(message); + } else { + warn!( + "Sync committee message pool capacity reached ({}) for slot {} (subcommittee_index: {}, block_root: {:?}), dropping message from validator {}", + self.max_contributions_per_key, + message.slot, + subcommittee_index, + message.beacon_block_root, + message.validator_index + ); + } + } + + pub fn get_sync_committee_contributions( + &self, + slot: u64, + beacon_block_root: B256, + subcommittee_index: u64, + ) -> Vec { + let key = SyncCommitteeDataKey { + slot, + beacon_block_root, + subcommittee_index, + }; + + self.contributions + .read() + .get(&key) + .cloned() + .unwrap_or_default() + } + + pub fn get_best_sync_committee_contribution( + &self, + slot: u64, + beacon_block_root: B256, + subcommittee_index: u64, + ) -> Option { + let contributions = + self.get_sync_committee_contributions(slot, beacon_block_root, subcommittee_index); + + // Select the contribution with the most aggregation bits set (highest participation) + contributions + .into_iter() + .max_by_key(|c| c.aggregation_bits.iter().filter(|b| *b).count()) + } + + pub fn clean_sync_committee_contributions(&self, current_slot: u64) { + // Keep contributions for current slot and one slot back + self.contributions + .write() + .retain(|key, _| key.slot >= current_slot.saturating_sub(1)); + } + + pub fn clean_sync_committee_messages(&self, current_slot: u64) { + // Keep messages for current slot and one slot back + self.messages + .write() + .retain(|key, _| key.slot >= current_slot.saturating_sub(1)); + } + + /// Aggregates sync committee messages into contributions. + /// + /// This function takes an iterator of (SyncCommitteeMessage, index_in_subcommittee) pairs + /// and aggregates them into the pool's contributions. It creates or updates aggregates by + /// combining signatures and setting the appropriate aggregation bits. + /// + /// For each message: + /// - First attempts to aggregate the signature with existing contributions + /// - Only if aggregation succeeds, sets the corresponding bit in the aggregation bitfield + /// - Avoids duplicate aggregation if the bit is already set + /// + /// This ensures aggregation bits are only set when signatures are successfully aggregated, + /// preventing inconsistent state where a bit is set without the corresponding signature. + pub fn aggregate_messages( + &self, + slot: u64, + beacon_block_root: B256, + subcommittee_index: u64, + messages: impl IntoIterator, + ) { + let key = SyncCommitteeDataKey { + slot, + beacon_block_root, + subcommittee_index, + }; + + let mut contributions_map = self.contributions.write(); + let contributions = contributions_map.entry(key).or_default(); + + // Ensure we have at least one aggregate to work with + if contributions.is_empty() { + contributions.push(SyncCommitteeContribution { + slot, + beacon_block_root, + subcommittee_index, + aggregation_bits: BitVector::::default(), + signature: BLSSignature::infinity(), + }); + } + + for (message, index_in_subcommittee) in messages { + // Skip if any contribution already has this bit set (duplicate check) + if contributions.iter().any(|c| { + c.aggregation_bits + .get(index_in_subcommittee) + .unwrap_or(false) + }) { + continue; + } + + // Try to add this message to an existing contribution + let mut added = false; + + for contribution in contributions.iter_mut() { + // First try to aggregate the signatures. Only set the bit if aggregation succeeds. + match BLSSignature::aggregate(&[&contribution.signature, &message.signature]) { + Ok(aggregated_sig) => { + // Now set the bit for this validator's position + if contribution + .aggregation_bits + .set(index_in_subcommittee, true) + .is_err() + { + warn!( + "Invalid index_in_subcommittee: {} for validator {} at slot {} (subcommittee_index: {}, block_root: {:?})", + index_in_subcommittee, + message.validator_index, + slot, + subcommittee_index, + beacon_block_root + ); + continue; + } + + contribution.signature = aggregated_sig; + added = true; + break; + } + Err(e) => { + warn!( + "Failed to aggregate signature for validator {} at slot {} (subcommittee_index: {}, block_root: {:?}): {:?}", + message.validator_index, slot, subcommittee_index, beacon_block_root, e + ); + continue; + } + } + } + + // If we couldn't add to any existing contribution and we're under the limit, attempt to + // create a new one using the message signature alone. Only set the bit if the signature + // is valid (i.e., can be aggregated/parsed successfully). + if !added && contributions.len() < self.max_contributions_per_key { + match BLSSignature::aggregate(&[&message.signature]) { + Ok(valid_sig) => { + let mut aggregation_bits = BitVector::::default(); + if aggregation_bits.set(index_in_subcommittee, true).is_ok() { + contributions.push(SyncCommitteeContribution { + slot, + beacon_block_root, + subcommittee_index, + aggregation_bits, + signature: valid_sig, + }); + } else { + warn!( + "Invalid index_in_subcommittee: {} for validator {} at slot {} (subcommittee_index: {}, block_root: {:?})", + index_in_subcommittee, + message.validator_index, + slot, + subcommittee_index, + beacon_block_root + ); + } + } + Err(e) => { + warn!( + "Skipping new contribution due to invalid signature for validator {} at slot {} (subcommittee_index: {}, block_root: {:?}): {:?}", + message.validator_index, slot, subcommittee_index, beacon_block_root, e + ); + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use ssz_types::{FixedVector, typenum::U96}; + + use super::*; + + #[test] + fn test_aggregate_messages() { + let pool = SyncCommitteePool::default(); + + let slot = 100u64; + let root = B256::from([1u8; 32]); + let subcommittee_index = 1u64; + + // Create sync committee messages with different validator indices + let messages: Vec<(SyncCommitteeMessage, usize)> = vec![ + ( + SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 10, + signature: BLSSignature::infinity(), + }, + 0, // index_in_subcommittee + ), + ( + SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 20, + signature: BLSSignature::infinity(), + }, + 1, + ), + ( + SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 30, + signature: BLSSignature::infinity(), + }, + 2, + ), + ]; + + // Aggregate the messages + pool.aggregate_messages(slot, root, subcommittee_index, messages); + + // Get the aggregated contributions + let contributions = pool.get_sync_committee_contributions(slot, root, subcommittee_index); + + assert!( + !contributions.is_empty(), + "should have at least one contribution" + ); + + // Find the contribution with the most bits set + let best = contributions + .iter() + .max_by_key(|c| c.aggregation_bits.iter().filter(|b| *b).count()) + .unwrap(); + + // Verify bits 0, 1, 2 are set + assert!(best.aggregation_bits.get(0).unwrap()); + assert!(best.aggregation_bits.get(1).unwrap()); + assert!(best.aggregation_bits.get(2).unwrap()); + } + + #[test] + fn test_aggregate_messages_avoids_duplicates() { + let pool = SyncCommitteePool::default(); + + let slot = 100u64; + let root = B256::from([1u8; 32]); + let subcommittee_index = 1u64; + + let message = SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 10, + signature: BLSSignature::infinity(), + }; + + // Aggregate the same message twice with the same index + pool.aggregate_messages(slot, root, subcommittee_index, vec![(message.clone(), 0)]); + pool.aggregate_messages(slot, root, subcommittee_index, vec![(message, 0)]); + + let contributions = pool.get_sync_committee_contributions(slot, root, subcommittee_index); + + // Should still have one contribution with bit 0 set only once + assert_eq!(contributions.len(), 1); + assert!(contributions[0].aggregation_bits.get(0).unwrap()); + + let count = contributions[0] + .aggregation_bits + .iter() + .filter(|b| *b) + .count(); + assert_eq!(count, 1, "should have exactly 1 bit set, not duplicated"); + } + + #[test] + fn test_insert_sync_committee_message() { + let pool = SyncCommitteePool::default(); + + let slot = 100u64; + let root = B256::from([1u8; 32]); + let subcommittee_index = 1u64; + + let message = SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 42, + signature: BLSSignature::infinity(), + }; + + // Insert message + pool.insert_sync_committee_message(message.clone(), subcommittee_index); + + // Verify message was stored + let messages = pool.messages.read(); + let key = SyncCommitteeDataKey { + slot, + beacon_block_root: root, + subcommittee_index, + }; + let stored = messages.get(&key).expect("should have messages for key"); + assert_eq!(stored.len(), 1); + assert_eq!(stored[0].validator_index, 42); + } + + #[test] + fn test_insert_sync_committee_message_capacity_limit() { + let pool = SyncCommitteePool::default(); + let max_capacity = pool.max_contributions_per_key; + + let slot = 100u64; + let root = B256::from([1u8; 32]); + let subcommittee_index = 1u64; + + // Insert max_capacity messages + for i in 0..max_capacity { + let message = SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: i as u64, + signature: BLSSignature::infinity(), + }; + pool.insert_sync_committee_message(message, subcommittee_index); + } + + // Verify we hit the limit + let messages = pool.messages.read(); + let key = SyncCommitteeDataKey { + slot, + beacon_block_root: root, + subcommittee_index, + }; + let stored = messages.get(&key).expect("should have messages for key"); + assert_eq!(stored.len(), max_capacity); + + // Try to insert one more - should be ignored + let extra_message = SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 999, + signature: BLSSignature::infinity(), + }; + drop(messages); + pool.insert_sync_committee_message(extra_message, subcommittee_index); + + let messages = pool.messages.read(); + let stored = messages.get(&key).expect("should have messages for key"); + assert_eq!(stored.len(), max_capacity, "should not exceed capacity"); + } + + #[test] + fn test_get_sync_committee_contributions_empty() { + let pool = SyncCommitteePool::default(); + + let contributions = pool.get_sync_committee_contributions(100, B256::from([1u8; 32]), 1); + + assert!( + contributions.is_empty(), + "should return empty vec when no contributions exist" + ); + } + + #[test] + fn test_get_best_sync_committee_contribution_returns_highest_participation() { + let pool = SyncCommitteePool::default(); + + let slot = 100u64; + let root = B256::from([1u8; 32]); + let subcommittee_index = 1u64; + + // Create contributions with different participation levels + let messages = vec![ + ( + SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 1, + signature: BLSSignature::infinity(), + }, + 0, + ), + ( + SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 2, + signature: BLSSignature::infinity(), + }, + 1, + ), + ( + SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 3, + signature: BLSSignature::infinity(), + }, + 2, + ), + ]; + + pool.aggregate_messages(slot, root, subcommittee_index, messages); + + let best = pool + .get_best_sync_committee_contribution(slot, root, subcommittee_index) + .expect("should return best contribution"); + + let count = best.aggregation_bits.iter().filter(|b| *b).count(); + assert_eq!(count, 3, "best contribution should have 3 bits set"); + } + + #[test] + fn test_get_best_sync_committee_contribution_returns_none_when_empty() { + let pool = SyncCommitteePool::default(); + + let best = pool.get_best_sync_committee_contribution(100, B256::from([1u8; 32]), 1); + + assert!( + best.is_none(), + "should return None when no contributions exist" + ); + } + + #[test] + fn test_clean_sync_committee_contributions() { + let pool = SyncCommitteePool::default(); + + let root = B256::from([1u8; 32]); + let subcommittee_index = 1u64; + + // Insert contributions for multiple slots + for slot in [100, 101, 102, 103] { + pool.aggregate_messages( + slot, + root, + subcommittee_index, + vec![( + SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 42, + signature: BLSSignature::infinity(), + }, + 0, + )], + ); + } + + // Clean at slot 102 - should keep slots 101 and 102 (current_slot - 1 and current_slot) + pool.clean_sync_committee_contributions(102); + + // Verify old slots are removed + assert!( + pool.get_sync_committee_contributions(100, root, subcommittee_index) + .is_empty(), + "slot 100 should be cleaned" + ); + + // Verify recent slots are kept + assert!( + !pool + .get_sync_committee_contributions(101, root, subcommittee_index) + .is_empty(), + "slot 101 should be kept" + ); + assert!( + !pool + .get_sync_committee_contributions(102, root, subcommittee_index) + .is_empty(), + "slot 102 should be kept" + ); + assert!( + !pool + .get_sync_committee_contributions(103, root, subcommittee_index) + .is_empty(), + "slot 103 should be kept (future slot)" + ); + } + + #[test] + fn test_clean_sync_committee_messages() { + let pool = SyncCommitteePool::default(); + + let root = B256::from([1u8; 32]); + let subcommittee_index = 1u64; + + // Insert messages for multiple slots + for slot in [100, 101, 102, 103] { + let message = SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 42, + signature: BLSSignature::infinity(), + }; + pool.insert_sync_committee_message(message, subcommittee_index); + } + + // Clean at slot 102 - should keep slots 101 and 102 (current_slot - 1 and current_slot) + pool.clean_sync_committee_messages(102); + + let messages = pool.messages.read(); + + // Verify old slots are removed + let key_100 = SyncCommitteeDataKey { + slot: 100, + beacon_block_root: root, + subcommittee_index, + }; + assert!( + !messages.contains_key(&key_100), + "slot 100 should be cleaned" + ); + + // Verify recent slots are kept + let key_101 = SyncCommitteeDataKey { + slot: 101, + beacon_block_root: root, + subcommittee_index, + }; + assert!(messages.contains_key(&key_101), "slot 101 should be kept"); + + let key_102 = SyncCommitteeDataKey { + slot: 102, + beacon_block_root: root, + subcommittee_index, + }; + assert!(messages.contains_key(&key_102), "slot 102 should be kept"); + + let key_103 = SyncCommitteeDataKey { + slot: 103, + beacon_block_root: root, + subcommittee_index, + }; + assert!( + messages.contains_key(&key_103), + "slot 103 should be kept (future slot)" + ); + } + + #[test] + fn test_aggregate_messages_with_multiple_subcommittees() { + let pool = SyncCommitteePool::default(); + + let slot = 100u64; + let root = B256::from([1u8; 32]); + + // Add messages to different subcommittees + let messages_subnet_0 = vec![( + SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 1, + signature: BLSSignature::infinity(), + }, + 0, + )]; + + let messages_subnet_1 = vec![( + SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 2, + signature: BLSSignature::infinity(), + }, + 0, + )]; + + pool.aggregate_messages(slot, root, 0, messages_subnet_0); + pool.aggregate_messages(slot, root, 1, messages_subnet_1); + + // Verify both subcommittees have their own contributions + let contributions_0 = pool.get_sync_committee_contributions(slot, root, 0); + let contributions_1 = pool.get_sync_committee_contributions(slot, root, 1); + + assert!( + !contributions_0.is_empty(), + "subcommittee 0 should have contributions" + ); + assert!( + !contributions_1.is_empty(), + "subcommittee 1 should have contributions" + ); + } + + #[test] + fn test_custom_max_contributions_per_key() { + // Test with testnet-sized capacity + let pool = SyncCommitteePool::new(8); + assert_eq!(pool.max_contributions_per_key, 8); + + let slot = 100u64; + let root = B256::from([1u8; 32]); + let subcommittee_index = 1u64; + + // Insert 8 messages (should all be accepted) + for i in 0..8 { + let message = SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: i, + signature: BLSSignature::infinity(), + }; + pool.insert_sync_committee_message(message, subcommittee_index); + } + + let messages = pool.messages.read(); + let key = SyncCommitteeDataKey { + slot, + beacon_block_root: root, + subcommittee_index, + }; + let stored = messages.get(&key).expect("should have messages"); + assert_eq!(stored.len(), 8); + + // Try to insert 9th message - should be ignored + drop(messages); + let extra_message = SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 999, + signature: BLSSignature::infinity(), + }; + pool.insert_sync_committee_message(extra_message, subcommittee_index); + + let messages = pool.messages.read(); + let stored = messages.get(&key).expect("should have messages"); + assert_eq!(stored.len(), 8, "should not exceed custom capacity of 8"); + } + + #[test] + fn test_aggregate_messages_creates_initial_contribution() { + let pool = SyncCommitteePool::default(); + + let slot = 100u64; + let root = B256::from([1u8; 32]); + let subcommittee_index = 1u64; + + // Verify pool is initially empty + assert!( + pool.get_sync_committee_contributions(slot, root, subcommittee_index) + .is_empty() + ); + + // Aggregate with empty iterator - should create initial contribution + pool.aggregate_messages(slot, root, subcommittee_index, vec![]); + + // Should now have one contribution (the initial empty one) + let contributions = pool.get_sync_committee_contributions(slot, root, subcommittee_index); + assert_eq!(contributions.len(), 1); + assert_eq!( + contributions[0] + .aggregation_bits + .iter() + .filter(|b| *b) + .count(), + 0, + "initial contribution should have no bits set" + ); + } + + #[test] + fn test_bit_not_set_on_aggregation_failure() { + let pool = SyncCommitteePool::default(); + + let slot = 200u64; + let root = B256::from([2u8; 32]); + let subcommittee_index = 0u64; + + // Construct an invalid/corrupted signature that should fail conversion/validation. + let bad_sig = BLSSignature { + inner: FixedVector::::from(vec![0u8; 96]), + }; + + let message = SyncCommitteeMessage { + slot, + beacon_block_root: root, + validator_index: 7, + signature: bad_sig, + }; + + // Aggregate a single bad message at index 0 + pool.aggregate_messages(slot, root, subcommittee_index, vec![(message, 0)]); + + let contributions = pool.get_sync_committee_contributions(slot, root, subcommittee_index); + assert!( + !contributions.is_empty(), + "there should be at least the initial contribution" + ); + + // The initial contribution is created before processing messages. + // Ensure its bit 0 is NOT set since aggregation failed. + let initial = &contributions[0]; + assert!( + !initial.aggregation_bits.get(0).unwrap_or(false), + "bit must not be set in the pre-existing contribution when aggregation fails" + ); + } +} diff --git a/crates/networking/manager/Cargo.toml b/crates/networking/manager/Cargo.toml index 6664b27b4..8f434d24f 100644 --- a/crates/networking/manager/Cargo.toml +++ b/crates/networking/manager/Cargo.toml @@ -39,6 +39,7 @@ ream-operation-pool.workspace = true ream-p2p.workspace = true ream-polynomial-commitments.workspace = true ream-storage.workspace = true +ream-sync-committee-pool.workspace = true ream-syncer.workspace = true ream-validator-beacon.workspace = true diff --git a/crates/networking/manager/src/service.rs b/crates/networking/manager/src/service.rs index c0b6e0e3e..7b1a4cf73 100644 --- a/crates/networking/manager/src/service.rs +++ b/crates/networking/manager/src/service.rs @@ -18,6 +18,7 @@ use ream_p2p::{ network::beacon::{Network, ReamNetworkEvent, network_state::NetworkState}, }; use ream_storage::{cache::CachedDB, db::beacon::BeaconDB}; +use ream_sync_committee_pool::SyncCommitteePool; use ream_syncer::block_range::BlockRangeSyncer; use tokio::{sync::mpsc, time::interval}; use tracing::{error, info}; @@ -37,6 +38,7 @@ pub struct NetworkManagerService { pub block_range_syncer: BlockRangeSyncer, pub ream_db: BeaconDB, pub cached_db: CachedDB, + pub sync_committee_pool: Arc, } /// The `NetworkManagerService` acts as the manager for all networking activities in Ream. @@ -59,6 +61,7 @@ impl NetworkManagerService { ream_db: BeaconDB, ream_dir: PathBuf, operation_pool: Arc, + sync_committee_pool: Arc, ) -> anyhow::Result { let discv5_config = discv5::ConfigBuilder::new(discv5::ListenConfig::from_ip( config.socket_address, @@ -101,6 +104,7 @@ impl NetworkManagerService { let beacon_chain = Arc::new(BeaconChain::new( ream_db.clone(), operation_pool, + sync_committee_pool.clone(), execution_engine, )); let status = beacon_chain.build_status_request().await?; @@ -130,6 +134,7 @@ impl NetworkManagerService { block_range_syncer, ream_db, cached_db, + sync_committee_pool, }) } diff --git a/crates/rpc/beacon/Cargo.toml b/crates/rpc/beacon/Cargo.toml index 6ffbf5f18..fc26209df 100644 --- a/crates/rpc/beacon/Cargo.toml +++ b/crates/rpc/beacon/Cargo.toml @@ -43,6 +43,8 @@ ream-operation-pool.workspace = true ream-p2p.workspace = true ream-rpc-common.workspace = true ream-storage.workspace = true +ream-sync-committee-pool.workspace = true +ream-validator-beacon.workspace = true [lints] workspace = true diff --git a/crates/rpc/beacon/src/handlers/debug.rs b/crates/rpc/beacon/src/handlers/debug.rs index dfd6e9aab..2ee818d9c 100644 --- a/crates/rpc/beacon/src/handlers/debug.rs +++ b/crates/rpc/beacon/src/handlers/debug.rs @@ -36,10 +36,11 @@ pub async fn get_debug_beacon_heads(db: Data) -> Result) -> Result>, execution_engine: Data>, ) -> Result { - let store = Store { - db: db.get_ref().clone(), - operation_pool: operation_pool.get_ref().clone(), - }; + let store = Store::new(db.get_ref().clone(), operation_pool.get_ref().clone(), None); // get head_slot let head = store.get_head().map_err(|err| { diff --git a/crates/rpc/beacon/src/handlers/validator.rs b/crates/rpc/beacon/src/handlers/validator.rs index 06defcd3c..b14e95959 100644 --- a/crates/rpc/beacon/src/handlers/validator.rs +++ b/crates/rpc/beacon/src/handlers/validator.rs @@ -6,7 +6,7 @@ use actix_web::{ }; use ream_api_types_beacon::{ id::ValidatorID, - query::{AttestationQuery, IdQuery, StatusQuery}, + query::{AttestationQuery, IdQuery, StatusQuery, SyncCommitteeContributionQuery}, request::ValidatorsPostRequest, responses::{BeaconResponse, DataResponse}, validator::{ValidatorBalance, ValidatorData, ValidatorStatus}, @@ -22,7 +22,11 @@ use ream_consensus_misc::{ }; use ream_fork_choice::store::Store; use ream_operation_pool::OperationPool; -use ream_storage::{db::beacon::BeaconDB, tables::field::Field}; +use ream_storage::{ + db::beacon::BeaconDB, + tables::{field::Field, table::Table}, +}; +use ream_sync_committee_pool::SyncCommitteePool; use serde::Serialize; use super::state::get_state_from_id; @@ -30,6 +34,7 @@ use super::state::get_state_from_id; /// For slots in Electra and later, this AttestationData must have a committee_index of 0. const ELECTRA_COMMITTEE_INDEX: u64 = 0; const MAX_VALIDATOR_COUNT: usize = 100; +const SYNC_COMMITTEE_SUBNET_COUNT: u64 = 4; fn build_validator_balances( validators: &[(Validator, u64)], @@ -438,10 +443,11 @@ pub async fn get_attestation_data( opertation_pool: Data>, query: Query, ) -> Result { - let store = Store { - db: db.get_ref().clone(), - operation_pool: opertation_pool.get_ref().clone(), - }; + let store = Store::new( + db.get_ref().clone(), + opertation_pool.get_ref().clone(), + None, + ); if store.is_syncing().map_err(|err| { ApiError::InternalError(format!("Failed to check syncing status, err: {err:?}")) @@ -504,3 +510,65 @@ pub async fn post_beacon_committee_selections( ) -> Result { Ok(HttpResponse::NotImplemented()) } + +#[get("/validator/sync_committee_contribution")] +pub async fn get_sync_committee_contribution( + db: Data, + operation_pool: Data>, + sync_committee_pool: Data>, + query: Query, +) -> Result { + let store = Store::new(db.get_ref().clone(), operation_pool.get_ref().clone(), None); + + if store.is_syncing().map_err(|err| { + ApiError::InternalError(format!("Failed to check syncing status, err: {err:?}")) + })? { + return Err(ApiError::UnderSyncing); + } + + let slot = query.slot; + let subcommittee_index = query.subcommittee_index; + let beacon_block_root = query.beacon_block_root; + + // Validate subcommittee index + if subcommittee_index >= SYNC_COMMITTEE_SUBNET_COUNT { + return Err(ApiError::InvalidParameter(format!( + "Invalid subcommittee_index: {subcommittee_index}, must be less than {SYNC_COMMITTEE_SUBNET_COUNT}" + ))); + } + + let current_slot = store.get_current_slot().map_err(|err| { + ApiError::InternalError(format!("Failed to get current slot, err: {err:?}")) + })?; + + // Validate slot is not too far in the future + if slot > current_slot + 1 { + return Err(ApiError::InvalidParameter(format!( + "Slot {slot:?} is too far ahead of the current slot {current_slot:?}" + ))); + } + + // Validate beacon block exists + db.beacon_block_provider() + .get(beacon_block_root) + .map_err(|err| ApiError::InternalError(format!("Failed to get beacon block: {err:?}")))? + .ok_or_else(|| { + ApiError::NotFound(format!("Beacon block root {beacon_block_root:?} not found")) + })?; + + // Check if block is fully verified (not optimistic) + // TODO: Add optimistic sync check when execution layer integration is complete + // For now, we assume all blocks in fork choice are fully verified + + // Try to get the best (highest-participation) sync committee contribution from the pool + // Per spec: return 404 if no contribution is available + let sync_committee_contribution = sync_committee_pool + .get_best_sync_committee_contribution(slot, beacon_block_root, subcommittee_index) + .ok_or_else(|| { + ApiError::NotFound(format!( + "No sync committee contribution available for beacon block root {beacon_block_root:?}" + )) + })?; + + Ok(HttpResponse::Ok().json(DataResponse::new(sync_committee_contribution))) +} diff --git a/crates/rpc/beacon/src/lib.rs b/crates/rpc/beacon/src/lib.rs index 04a81fb25..0e5041098 100644 --- a/crates/rpc/beacon/src/lib.rs +++ b/crates/rpc/beacon/src/lib.rs @@ -11,6 +11,7 @@ use ream_operation_pool::OperationPool; use ream_p2p::network::beacon::network_state::NetworkState; use ream_rpc_common::server::start_rpc_server; use ream_storage::db::beacon::BeaconDB; +use ream_sync_committee_pool::SyncCommitteePool; use crate::routes::register_routers; @@ -20,12 +21,14 @@ pub async fn start_server( db: BeaconDB, network_state: Arc, operation_pool: Arc, + sync_committee_pool: Arc, execution_engine: Option, ) -> std::io::Result<()> { let server = start_rpc_server(server_config.http_socket_address, move |cfg| { cfg.app_data(Data::new(db.clone())) .app_data(Data::new(network_state.clone())) .app_data(Data::new(operation_pool.clone())) + .app_data(Data::new(sync_committee_pool.clone())) .app_data(Data::new(execution_engine.clone())) .configure(register_routers); })?; diff --git a/crates/rpc/beacon/src/routes/validator.rs b/crates/rpc/beacon/src/routes/validator.rs index 35e5d27db..852e5edfa 100644 --- a/crates/rpc/beacon/src/routes/validator.rs +++ b/crates/rpc/beacon/src/routes/validator.rs @@ -3,7 +3,9 @@ use actix_web::web::ServiceConfig; use crate::handlers::{ duties::{get_attester_duties, get_proposer_duties}, prepare_beacon_proposer::prepare_beacon_proposer, - validator::{get_attestation_data, post_beacon_committee_selections}, + validator::{ + get_attestation_data, get_sync_committee_contribution, post_beacon_committee_selections, + }, }; pub fn register_validator_routes(config: &mut ServiceConfig) { @@ -12,4 +14,5 @@ pub fn register_validator_routes(config: &mut ServiceConfig) { config.service(prepare_beacon_proposer); config.service(get_attestation_data); config.service(post_beacon_committee_selections); + config.service(get_sync_committee_contribution); } diff --git a/testing/gossip-validation/Cargo.toml b/testing/gossip-validation/Cargo.toml index 53ca80cea..74d22fbfa 100644 --- a/testing/gossip-validation/Cargo.toml +++ b/testing/gossip-validation/Cargo.toml @@ -28,6 +28,7 @@ ream-network-manager = { workspace = true, features = ["disable_ancestor_validat ream-network-spec.workspace = true ream-operation-pool.workspace = true ream-storage.workspace = true +ream-sync-committee-pool.workspace = true [lints] workspace = true diff --git a/testing/gossip-validation/tests/validate_block.rs b/testing/gossip-validation/tests/validate_block.rs index 237ccfbea..fc55aaf5a 100644 --- a/testing/gossip-validation/tests/validate_block.rs +++ b/testing/gossip-validation/tests/validate_block.rs @@ -21,6 +21,7 @@ mod tests { db::{ReamDB, beacon::BeaconDB}, tables::{field::Field, table::Table}, }; + use ream_sync_committee_pool::SyncCommitteePool; use snap::raw::Decoder; use ssz::Decode; use tempdir::TempDir; @@ -72,8 +73,10 @@ mod tests { .await; let operation_pool = OperationPool::default(); + let sync_committee_pool = SyncCommitteePool::default(); let cached_db = CachedDB::default(); - let beacon_chain = BeaconChain::new(db, operation_pool.into(), None); + let beacon_chain = + BeaconChain::new(db, operation_pool.into(), sync_committee_pool.into(), None); (beacon_chain, cached_db, block_root) } From 41c1f07dde9964303b43dfe4ddb92f5e3879f032 Mon Sep 17 00:00:00 2001 From: ShiroObiJohn <28779404+ShiroObiJohn@users.noreply.github.com> Date: Fri, 3 Oct 2025 17:01:06 +0800 Subject: [PATCH 2/2] fix: add [lints] to sync_committee_pool Cargo.toml --- crates/common/sync_committee_pool/Cargo.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/crates/common/sync_committee_pool/Cargo.toml b/crates/common/sync_committee_pool/Cargo.toml index e41eb6fb0..8a3d6dc7a 100644 --- a/crates/common/sync_committee_pool/Cargo.toml +++ b/crates/common/sync_committee_pool/Cargo.toml @@ -22,3 +22,6 @@ tree_hash_derive.workspace = true # ream-dependencies ream-bls.workspace = true ream-validator-beacon.workspace = true + +[lints] +workspace = true