diff --git a/crates/common/api_types/beacon/src/committee.rs b/crates/common/api_types/beacon/src/committee.rs index 076fbfc9d..464ddf7ce 100644 --- a/crates/common/api_types/beacon/src/committee.rs +++ b/crates/common/api_types/beacon/src/committee.rs @@ -12,3 +12,13 @@ pub struct BeaconCommitteeSubscription { pub slot: u64, pub is_aggregator: bool, } + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct SyncCommitteeSubscription { + #[serde(with = "serde_utils::quoted_u64")] + pub validator_index: u64, + #[serde(with = "serde_utils::quoted_u64_vec")] + pub sync_committee_indices: Vec, + #[serde(with = "serde_utils::quoted_u64")] + pub until_epoch: u64, +} diff --git a/crates/rpc/beacon/src/handlers/validator.rs b/crates/rpc/beacon/src/handlers/validator.rs index f67e90657..8e6c7e616 100644 --- a/crates/rpc/beacon/src/handlers/validator.rs +++ b/crates/rpc/beacon/src/handlers/validator.rs @@ -7,7 +7,7 @@ use actix_web::{ use alloy_primitives::{Address, B256, U256, aliases::B32}; use ream_api_types_beacon::{ block::{FullBlockData, ProduceBlockData, ProduceBlockResponse}, - committee::BeaconCommitteeSubscription, + committee::{BeaconCommitteeSubscription, SyncCommitteeSubscription}, id::ValidatorID, query::{AttestationQuery, IdQuery, StatusQuery}, request::ValidatorsPostRequest, @@ -73,7 +73,10 @@ use ream_validator_beacon::{ DOMAIN_SYNC_COMMITTEE_SELECTION_PROOF, SYNC_COMMITTEE_SUBNET_COUNT, }, execution_requests::get_execution_requests, - sync_committee::{SyncAggregatorSelectionData, is_sync_committee_aggregator}, + sync_committee::{ + SyncAggregatorSelectionData, compute_subnets_for_sync_committee, + is_sync_committee_aggregator, + }, }; use serde::Serialize; use ssz_types::{ @@ -764,6 +767,103 @@ pub async fn post_beacon_committee_subscriptions( }))) } +#[post("/validator/sync_committee_subscriptions")] +pub async fn post_sync_committee_subscriptions( + db: Data, + subscriptions: Json>, + network: Data>, +) -> Result { + let subscriptions = subscriptions.into_inner(); + + if subscriptions.is_empty() { + return Err(ApiError::BadRequest("Empty request body".to_string())); + } + + let highest_slot = db + .slot_index_provider() + .get_highest_slot() + .map_err(|err| { + ApiError::InternalError(format!("Failed to get_highest_slot, error: {err:?}")) + })? + .ok_or(ApiError::NotFound( + "Failed to find highest slot".to_string(), + ))?; + let state = get_state_from_id(ID::Slot(highest_slot), &db).await?; + let current_epoch = state.get_current_epoch(); + + let mut subnets_to_subscribe: HashSet<(u64, B32)> = HashSet::new(); + + for subscription in subscriptions { + let validator = state + .validators + .get(subscription.validator_index as usize) + .ok_or_else(|| { + ApiError::BadRequest(format!( + "Validator index {} not found", + subscription.validator_index + )) + })?; + + if !validator.is_active_validator(current_epoch) { + return Err(ApiError::BadRequest(format!( + "Validator {} is not active", + subscription.validator_index + ))); + } + + // Validate until_epoch is in the future + if subscription.until_epoch <= current_epoch { + return Err(ApiError::BadRequest(format!( + "until_epoch {} must be greater than current epoch {current_epoch}", + subscription.until_epoch + ))); + } + + // Compute which subnets this validator needs to subscribe to + let validator_subnets = compute_subnets_for_sync_committee( + &state, + subscription.validator_index, + ) + .map_err(|err| { + ApiError::InternalError(format!("Failed to compute sync committee subnets: {err}")) + })?; + + // Validate that the provided sync_committee_indices match the computed subnets + let provided_indices: HashSet = subscription + .sync_committee_indices + .iter() + .copied() + .collect(); + if provided_indices != validator_subnets { + return Err(ApiError::BadRequest(format!( + "Provided sync_committee_indices {provided_indices:?} do not match computed subnets {validator_subnets:?}", + ))); + } + + let fork = state.fork.current_version; + for subnet_id in validator_subnets { + subnets_to_subscribe.insert((subnet_id, fork)); + } + } + + // Subscribe to all required subnets + let mut network = network.lock().await; + for (subnet_id, fork) in subnets_to_subscribe { + let topic = GossipTopic { + fork, + kind: GossipTopicKind::SyncCommittee(subnet_id), + }; + + if !network.subscribe_to_topic(topic) { + return Err(ApiError::InternalError(format!( + "Failed to subscribe to sync committee subnet {subnet_id}", + ))); + } + } + + Ok(HttpResponse::Ok().body("")) +} + /// Verify validator registration signature fn verify_validator_registration_signature( signed_registration: &SignedValidatorRegistrationV1, diff --git a/crates/rpc/beacon/src/routes/validator.rs b/crates/rpc/beacon/src/routes/validator.rs index 2aef11658..3a52b7a06 100644 --- a/crates/rpc/beacon/src/routes/validator.rs +++ b/crates/rpc/beacon/src/routes/validator.rs @@ -7,6 +7,7 @@ use crate::handlers::{ get_aggregate_attestation, get_attestation_data, get_blocks_v3, post_aggregate_and_proofs_v2, post_beacon_committee_selections, post_beacon_committee_subscriptions, post_contribution_and_proofs, post_register_validator, + post_sync_committee_subscriptions, }, }; @@ -18,6 +19,7 @@ pub fn register_validator_routes_v1(config: &mut ServiceConfig) { config.service(get_attestation_data); config.service(post_beacon_committee_selections); config.service(post_beacon_committee_subscriptions); + config.service(post_sync_committee_subscriptions); config.service(post_contribution_and_proofs); config.service(post_register_validator); }