From 358acba0f7ef25b7d80fd82667b60804a8839be8 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Thu, 8 Aug 2024 20:18:42 +0000 Subject: [PATCH] gossip: remove vote buffer, send all verified votes to banking_stage --- core/src/cluster_info_vote_listener.rs | 284 +------- core/src/lib.rs | 1 - core/src/tpu.rs | 1 - core/src/verified_vote_packets.rs | 930 ------------------------- 4 files changed, 12 insertions(+), 1204 deletions(-) delete mode 100644 core/src/verified_vote_packets.rs diff --git a/core/src/cluster_info_vote_listener.rs b/core/src/cluster_info_vote_listener.rs index ae90a97b88377f..a4306dcbea2ea4 100644 --- a/core/src/cluster_info_vote_listener.rs +++ b/core/src/cluster_info_vote_listener.rs @@ -6,9 +6,6 @@ use { replay_stage::DUPLICATE_THRESHOLD, result::{Error, Result}, sigverify, - verified_vote_packets::{ - ValidatorGossipVotesIterator, VerifiedVoteMetadata, VerifiedVotePackets, - }, }, crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Select, Sender}, log::*, @@ -19,8 +16,7 @@ use { solana_ledger::blockstore::Blockstore, solana_measure::measure::Measure, solana_metrics::inc_new_counter_debug, - solana_perf::packet, - solana_poh::poh_recorder::PohRecorder, + solana_perf::packet::{self, PacketBatch}, solana_rpc::{ optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSender}, rpc_subscriptions::RpcSubscriptions, @@ -30,11 +26,10 @@ use { epoch_stakes::EpochStakes, vote_sender_types::ReplayVoteReceiver, }, solana_sdk::{ - clock::{Slot, DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SLOT}, + clock::{Slot, DEFAULT_MS_PER_SLOT}, hash::Hash, pubkey::Pubkey, signature::Signature, - slot_hashes, timing::AtomicInterval, transaction::Transaction, }, @@ -44,7 +39,7 @@ use { }, std::{ cmp::max, - collections::{HashMap, HashSet}, + collections::HashMap, iter::repeat, sync::{ atomic::{AtomicBool, Ordering}, @@ -57,8 +52,6 @@ use { // Map from a vote account to the authorized voter for an epoch pub type ThresholdConfirmedSlots = Vec<(Slot, Hash)>; -pub type VerifiedLabelVotePacketsSender = Sender>; -pub type VerifiedLabelVotePacketsReceiver = Receiver>; pub type VerifiedVoteTransactionsSender = Sender>; pub type VerifiedVoteTransactionsReceiver = Receiver>; pub type VerifiedVoteSender = Sender<(Pubkey, Vec)>; @@ -69,7 +62,6 @@ pub type DuplicateConfirmedSlotsSender = Sender; pub type DuplicateConfirmedSlotsReceiver = Receiver; const THRESHOLDS_TO_CHECK: [f64; 2] = [DUPLICATE_THRESHOLD, VOTE_THRESHOLD_SIZE]; -const BANK_SEND_VOTES_LOOP_SLEEP_MS: u128 = 10; #[derive(Default)] pub struct SlotVoteTracker { @@ -144,45 +136,6 @@ impl VoteTracker { } } -struct BankVoteSenderState { - bank: Arc, - previously_sent_to_bank_votes: HashSet, - bank_send_votes_stats: BankSendVotesStats, -} - -impl BankVoteSenderState { - fn new(bank: Arc) -> Self { - Self { - bank, - previously_sent_to_bank_votes: HashSet::new(), - bank_send_votes_stats: BankSendVotesStats::default(), - } - } - - fn report_metrics(&self) { - self.bank_send_votes_stats.report_metrics(self.bank.slot()); - } -} - -#[derive(Default)] -struct BankSendVotesStats { - num_votes_sent: usize, - num_batches_sent: usize, - total_elapsed: u64, -} - -impl BankSendVotesStats { - fn report_metrics(&self, slot: Slot) { - datapoint_info!( - "cluster_info_vote_listener-bank-send-vote-stats", - ("slot", slot, i64), - ("num_votes_sent", self.num_votes_sent, i64), - ("total_elapsed", self.total_elapsed, i64), - ("num_batches_sent", self.num_batches_sent, i64), - ); - } -} - #[derive(Default)] struct VoteProcessingTiming { gossip_txn_processing_time_us: u64, @@ -234,7 +187,6 @@ impl ClusterInfoVoteListener { exit: Arc, cluster_info: Arc, verified_packets_sender: BankingPacketSender, - poh_recorder: Arc>, vote_tracker: Arc, bank_forks: Arc>, subscriptions: Arc, @@ -245,8 +197,6 @@ impl ClusterInfoVoteListener { bank_notification_sender: Option, duplicate_confirmed_slot_sender: DuplicateConfirmedSlotsSender, ) -> Self { - let (verified_vote_label_packets_sender, verified_vote_label_packets_receiver) = - unbounded(); let (verified_vote_transactions_sender, verified_vote_transactions_receiver) = unbounded(); let listen_thread = { let exit = exit.clone(); @@ -258,28 +208,14 @@ impl ClusterInfoVoteListener { exit, &cluster_info, &bank_forks, - verified_vote_label_packets_sender, + verified_packets_sender, verified_vote_transactions_sender, ); }) .unwrap() }; - let bank_send_thread = { - let exit = exit.clone(); - Builder::new() - .name("solCiBankSend".to_string()) - .spawn(move || { - let _ = Self::bank_send_loop( - exit, - verified_vote_label_packets_receiver, - poh_recorder, - &verified_packets_sender, - ); - }) - .unwrap() - }; - let send_thread = Builder::new() + let process_thread = Builder::new() .name("solCiProcVotes".to_string()) .spawn(move || { let _ = Self::process_votes_loop( @@ -299,7 +235,7 @@ impl ClusterInfoVoteListener { .unwrap(); Self { - thread_hdls: vec![listen_thread, send_thread, bank_send_thread], + thread_hdls: vec![listen_thread, process_thread], } } @@ -311,7 +247,7 @@ impl ClusterInfoVoteListener { exit: Arc, cluster_info: &ClusterInfo, bank_forks: &RwLock, - verified_vote_label_packets_sender: VerifiedLabelVotePacketsSender, + verified_packets_sender: BankingPacketSender, verified_vote_transactions_sender: VerifiedVoteTransactionsSender, ) -> Result<()> { let mut cursor = Cursor::default(); @@ -321,7 +257,7 @@ impl ClusterInfoVoteListener { if !votes.is_empty() { let (vote_txs, packets) = Self::verify_votes(votes, bank_forks); verified_vote_transactions_sender.send(vote_txs)?; - verified_vote_label_packets_sender.send(packets)?; + verified_packets_sender.send(BankingPacketBatch::new((packets, None)))?; } sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS)); } @@ -332,7 +268,7 @@ impl ClusterInfoVoteListener { fn verify_votes( votes: Vec, bank_forks: &RwLock, - ) -> (Vec, Vec) { + ) -> (Vec, Vec) { let mut packet_batches = packet::to_packet_batches(&votes, 1); // Votes should already be filtered by this point. @@ -363,126 +299,11 @@ impl ClusterInfoVoteListener { if !keys.any(|(i, key)| tx.message.is_signer(i) && key == authorized_voter) { return None; } - let verified_vote_metadata = VerifiedVoteMetadata { - vote_account_key, - vote, - packet_batch, - signature: *tx.signatures.first()?, - }; - Some((tx, verified_vote_metadata)) + Some((tx, packet_batch)) }) .unzip() } - fn bank_send_loop( - exit: Arc, - verified_vote_label_packets_receiver: VerifiedLabelVotePacketsReceiver, - poh_recorder: Arc>, - verified_packets_sender: &BankingPacketSender, - ) -> Result<()> { - let mut verified_vote_packets = VerifiedVotePackets::default(); - let mut time_since_lock = Instant::now(); - let mut bank_vote_sender_state_option: Option = None; - - loop { - if exit.load(Ordering::Relaxed) { - return Ok(()); - } - - let would_be_leader = poh_recorder - .read() - .unwrap() - .would_be_leader(3 * slot_hashes::MAX_ENTRIES as u64 * DEFAULT_TICKS_PER_SLOT); - - if let Err(e) = verified_vote_packets.receive_and_process_vote_packets( - &verified_vote_label_packets_receiver, - would_be_leader, - ) { - match e { - Error::RecvTimeout(RecvTimeoutError::Disconnected) - | Error::RecvTimeout(RecvTimeoutError::Timeout) => (), - _ => { - error!("thread {:?} error {:?}", thread::current().name(), e); - } - } - } - - if time_since_lock.elapsed().as_millis() > BANK_SEND_VOTES_LOOP_SLEEP_MS { - // Always set this to avoid taking the poh lock too often - time_since_lock = Instant::now(); - // We will take this lock at most once every `BANK_SEND_VOTES_LOOP_SLEEP_MS` - Self::check_for_leader_bank_and_send_votes( - &mut bank_vote_sender_state_option, - poh_recorder.read().unwrap().bank(), - verified_packets_sender, - &verified_vote_packets, - )?; - } - } - } - - fn check_for_leader_bank_and_send_votes( - bank_vote_sender_state_option: &mut Option, - current_working_bank: Option>, - verified_packets_sender: &BankingPacketSender, - verified_vote_packets: &VerifiedVotePackets, - ) -> Result<()> { - let Some(current_working_bank) = current_working_bank else { - // We are not the leader! - if let Some(bank_vote_sender_state) = bank_vote_sender_state_option { - // This ensures we report the last slot's metrics - bank_vote_sender_state.report_metrics(); - *bank_vote_sender_state_option = None; - } - return Ok(()); - }; - // We will take this lock at most once every `BANK_SEND_VOTES_LOOP_SLEEP_MS` - if let Some(bank_vote_sender_state) = bank_vote_sender_state_option { - if bank_vote_sender_state.bank.slot() != current_working_bank.slot() { - bank_vote_sender_state.report_metrics(); - *bank_vote_sender_state_option = - Some(BankVoteSenderState::new(current_working_bank)); - } - } else { - *bank_vote_sender_state_option = Some(BankVoteSenderState::new(current_working_bank)); - } - - let bank_vote_sender_state = bank_vote_sender_state_option.as_mut().unwrap(); - let BankVoteSenderState { - ref bank, - ref mut bank_send_votes_stats, - ref mut previously_sent_to_bank_votes, - } = bank_vote_sender_state; - - // This logic may run multiple times for the same leader bank, - // we just have to ensure that the same votes are not sent - // to the bank multiple times, which is guaranteed by - // `previously_sent_to_bank_votes` - let gossip_votes_iterator = ValidatorGossipVotesIterator::new( - bank.clone(), - verified_vote_packets, - previously_sent_to_bank_votes, - ); - - let mut filter_gossip_votes_timing = Measure::start("filter_gossip_votes"); - - // Send entire batch at a time so that there is no partial processing of - // a single validator's votes by two different banks. This might happen - // if we sent each vote individually, for instance if we created two different - // leader banks from the same common parent, one leader bank may process - // only the later votes and ignore the earlier votes. - for single_validator_votes in gossip_votes_iterator { - bank_send_votes_stats.num_votes_sent += single_validator_votes.len(); - bank_send_votes_stats.num_batches_sent += 1; - verified_packets_sender - .send(BankingPacketBatch::new((single_validator_votes, None)))?; - } - filter_gossip_votes_timing.stop(); - bank_send_votes_stats.total_elapsed += filter_gossip_votes_timing.as_us(); - - Ok(()) - } - #[allow(clippy::too_many_arguments)] fn process_votes_loop( exit: Arc, @@ -896,7 +717,6 @@ impl ClusterInfoVoteListener { mod tests { use { super::*, - crate::banking_trace::BankingTracer, itertools::Itertools, solana_perf::packet, solana_rpc::optimistically_confirmed_bank_tracker::OptimisticallyConfirmedBank, @@ -1629,11 +1449,8 @@ mod tests { assert!(packets.is_empty()); } - fn verify_packets_len(packets: &[VerifiedVoteMetadata], ref_value: usize) { - let num_packets: usize = packets - .iter() - .map(|vote_metadata| vote_metadata.packet_batch.len()) - .sum(); + fn verify_packets_len(packets: &[PacketBatch], ref_value: usize) { + let num_packets: usize = packets.iter().map(|pb| pb.len()).sum(); assert_eq!(num_packets, ref_value); } @@ -1723,83 +1540,6 @@ mod tests { run_test_bad_vote(Some(Hash::default())); } - #[test] - fn test_check_for_leader_bank_and_send_votes() { - let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(1000); - let current_leader_bank = Arc::new(Bank::new_for_tests(&genesis_config)); - let mut bank_vote_sender_state_option: Option = None; - let verified_vote_packets = VerifiedVotePackets::default(); - let (verified_packets_sender, _verified_packets_receiver) = - BankingTracer::channel_for_test(); - - // 1) If we hand over a `current_leader_bank`, vote sender state should be updated - ClusterInfoVoteListener::check_for_leader_bank_and_send_votes( - &mut bank_vote_sender_state_option, - Some(current_leader_bank.clone()), - &verified_packets_sender, - &verified_vote_packets, - ) - .unwrap(); - - assert_eq!( - bank_vote_sender_state_option.as_ref().unwrap().bank.slot(), - current_leader_bank.slot() - ); - bank_vote_sender_state_option - .as_mut() - .unwrap() - .previously_sent_to_bank_votes - .insert(Signature::new_unique()); - - // 2) Handing over the same leader bank again should not update the state - ClusterInfoVoteListener::check_for_leader_bank_and_send_votes( - &mut bank_vote_sender_state_option, - Some(current_leader_bank.clone()), - &verified_packets_sender, - &verified_vote_packets, - ) - .unwrap(); - // If we hand over a `current_leader_bank`, vote sender state should be updated - assert_eq!( - bank_vote_sender_state_option.as_ref().unwrap().bank.slot(), - current_leader_bank.slot() - ); - assert_eq!( - bank_vote_sender_state_option - .as_ref() - .unwrap() - .previously_sent_to_bank_votes - .len(), - 1 - ); - - let slot = current_leader_bank.slot() + 1; - let current_leader_bank = Arc::new(Bank::new_from_parent( - current_leader_bank, - &Pubkey::default(), - slot, - )); - ClusterInfoVoteListener::check_for_leader_bank_and_send_votes( - &mut bank_vote_sender_state_option, - Some(current_leader_bank.clone()), - &verified_packets_sender, - &verified_vote_packets, - ) - .unwrap(); - - // 3) If we hand over a new `current_leader_bank`, vote sender state should be updated - // to the new bank - assert_eq!( - bank_vote_sender_state_option.as_ref().unwrap().bank.slot(), - current_leader_bank.slot() - ); - assert!(bank_vote_sender_state_option - .as_ref() - .unwrap() - .previously_sent_to_bank_votes - .is_empty()); - } - #[test] fn test_track_new_votes_filter() { let validator_keypairs: Vec<_> = diff --git a/core/src/lib.rs b/core/src/lib.rs index c6ab7b7e9c8b3d..da9d69ed508875 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -44,7 +44,6 @@ pub mod tracer_packet_stats; pub mod tvu; pub mod unfrozen_gossip_verified_vote_hashes; pub mod validator; -pub mod verified_vote_packets; pub mod vote_simulator; pub mod voting_service; pub mod warm_quic_cache_service; diff --git a/core/src/tpu.rs b/core/src/tpu.rs index c76d2dd4d0094e..05982c9c3edf29 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -222,7 +222,6 @@ impl Tpu { exit.clone(), cluster_info.clone(), gossip_vote_sender, - poh_recorder.clone(), vote_tracker, bank_forks.clone(), subscriptions.clone(), diff --git a/core/src/verified_vote_packets.rs b/core/src/verified_vote_packets.rs deleted file mode 100644 index 0840c57b809d22..00000000000000 --- a/core/src/verified_vote_packets.rs +++ /dev/null @@ -1,930 +0,0 @@ -use { - crate::{cluster_info_vote_listener::VerifiedLabelVotePacketsReceiver, result::Result}, - itertools::Itertools, - solana_perf::packet::PacketBatch, - solana_runtime::bank::Bank, - solana_sdk::{ - account::from_account, - clock::{Slot, UnixTimestamp}, - hash::Hash, - pubkey::Pubkey, - signature::Signature, - slot_hashes::SlotHashes, - sysvar, - }, - solana_vote::vote_transaction::VoteTransaction, - std::{ - collections::{BTreeMap, HashMap, HashSet}, - sync::Arc, - time::Duration, - }, -}; - -const MAX_VOTES_PER_VALIDATOR: usize = 1000; - -pub struct VerifiedVoteMetadata { - pub vote_account_key: Pubkey, - pub vote: VoteTransaction, - pub packet_batch: PacketBatch, - pub signature: Signature, -} - -pub struct ValidatorGossipVotesIterator<'a> { - my_leader_bank: Arc, - slot_hashes: SlotHashes, - verified_vote_packets: &'a VerifiedVotePackets, - vote_account_keys: Vec, - previously_sent_to_bank_votes: &'a mut HashSet, -} - -impl<'a> ValidatorGossipVotesIterator<'a> { - pub fn new( - my_leader_bank: Arc, - verified_vote_packets: &'a VerifiedVotePackets, - previously_sent_to_bank_votes: &'a mut HashSet, - ) -> Self { - let slot_hashes_account = my_leader_bank.get_account(&sysvar::slot_hashes::id()); - - if slot_hashes_account.is_none() { - warn!( - "Slot hashes sysvar doesn't exist on bank {}", - my_leader_bank.slot() - ); - } - - let slot_hashes_account = slot_hashes_account.unwrap_or_default(); - let slot_hashes = from_account::(&slot_hashes_account).unwrap_or_default(); - - // TODO: my_leader_bank.vote_accounts() may not contain zero-staked validators - // in this epoch, but those validators may have stake warming up in the next epoch - // Sort by stake weight so heavier validators' votes are sent first - let vote_account_keys: Vec = my_leader_bank - .vote_accounts() - .iter() - .map(|(pubkey, &(stake, _))| (pubkey, stake)) - .sorted_unstable_by_key(|&(_, stake)| std::cmp::Reverse(stake)) - .map(|(&pubkey, _)| pubkey) - .collect(); - Self { - my_leader_bank, - slot_hashes, - verified_vote_packets, - vote_account_keys, - previously_sent_to_bank_votes, - } - } - - fn filter_vote( - &mut self, - slot: &Slot, - hash: &Hash, - packet: &PacketBatch, - tx_signature: &Signature, - ) -> Option { - // Don't send the same vote to the same bank multiple times - if self.previously_sent_to_bank_votes.contains(tx_signature) { - return None; - } - self.previously_sent_to_bank_votes.insert(*tx_signature); - // Filter out votes on the wrong fork (or too old to be) - // on this fork - if self - .slot_hashes - .get(slot) - .map(|found_hash| found_hash == hash) - .unwrap_or(false) - { - Some(packet.clone()) - } else { - None - } - } -} - -/// Each iteration returns all of the missing votes for a single validator, the votes -/// ordered from smallest to largest. -/// -/// Iterator is done after iterating through all vote accounts -impl<'a> Iterator for ValidatorGossipVotesIterator<'a> { - type Item = Vec; - - fn next(&mut self) -> Option { - use SingleValidatorVotes::*; - while let Some(vote_account_key) = self.vote_account_keys.pop() { - // Get all the gossip votes we've queued up for this validator - // that are: - // 1) missing from the current leader bank - // 2) on the same fork - let validator_votes = self - .verified_vote_packets - .0 - .get(&vote_account_key) - .and_then(|validator_gossip_votes| { - // Fetch the validator's vote state from the bank - self.my_leader_bank - .vote_accounts() - .get(&vote_account_key) - .and_then(|(_stake, vote_account)| { - vote_account.vote_state().as_ref().ok().map(|vote_state| { - let start_vote_slot = - vote_state.last_voted_slot().map(|x| x + 1).unwrap_or(0); - match validator_gossip_votes { - FullTowerVote(GossipVote { - slot, - hash, - packet_batch, - signature, - .. - }) => self - .filter_vote(slot, hash, packet_batch, signature) - .map(|packet| vec![packet]) - .unwrap_or_default(), - IncrementalVotes(validator_gossip_votes) => { - validator_gossip_votes - .range((start_vote_slot, Hash::default())..) - .filter_map(|((slot, hash), (packet, tx_signature))| { - self.filter_vote(slot, hash, packet, tx_signature) - }) - .collect::>() - } - } - }) - }) - }); - if let Some(validator_votes) = validator_votes { - if !validator_votes.is_empty() { - return Some(validator_votes); - } - } - } - None - } -} - -#[derive(Debug, Default, Clone)] -pub struct GossipVote { - slot: Slot, - hash: Hash, - packet_batch: PacketBatch, - signature: Signature, - timestamp: Option, -} - -pub enum SingleValidatorVotes { - FullTowerVote(GossipVote), - IncrementalVotes(BTreeMap<(Slot, Hash), (PacketBatch, Signature)>), -} - -impl SingleValidatorVotes { - fn get_latest_gossip_slot(&self) -> Slot { - match self { - Self::FullTowerVote(vote) => vote.slot, - _ => 0, - } - } - - fn get_latest_timestamp(&self) -> Option { - match self { - Self::FullTowerVote(vote) => vote.timestamp, - _ => None, - } - } - - #[cfg(test)] - fn len(&self) -> usize { - match self { - Self::IncrementalVotes(votes) => votes.len(), - _ => 1, - } - } -} - -#[derive(Default)] -pub struct VerifiedVotePackets(HashMap); - -impl VerifiedVotePackets { - pub fn receive_and_process_vote_packets( - &mut self, - vote_packets_receiver: &VerifiedLabelVotePacketsReceiver, - would_be_leader: bool, - ) -> Result<()> { - use SingleValidatorVotes::*; - const RECV_TIMEOUT: Duration = Duration::from_millis(200); - let vote_packets = vote_packets_receiver.recv_timeout(RECV_TIMEOUT)?; - let vote_packets = std::iter::once(vote_packets).chain(vote_packets_receiver.try_iter()); - - // No need to process any votes if we will not be the leader soon. But, - // return early only after draining the channel to avoid accumulating - // votes that will be stale by the time we do become leader - if !would_be_leader { - return Ok(()); - } - - for gossip_votes in vote_packets { - for verfied_vote_metadata in gossip_votes { - let VerifiedVoteMetadata { - vote_account_key, - vote, - packet_batch, - signature, - } = verfied_vote_metadata; - if vote.is_empty() { - error!("Empty votes should have been filtered out earlier in the pipeline"); - continue; - } - let slot = vote.last_voted_slot().unwrap(); - let hash = vote.hash(); - let timestamp = vote.timestamp(); - - match vote { - VoteTransaction::VoteStateUpdate(_) | VoteTransaction::TowerSync(_) => { - let (latest_gossip_slot, latest_timestamp) = - self.0.get(&vote_account_key).map_or((0, None), |vote| { - (vote.get_latest_gossip_slot(), vote.get_latest_timestamp()) - }); - // Since votes are not incremental, we keep only the latest vote - // If the vote is for the same slot we will only allow it if - // it has a later timestamp (refreshed vote) - // - // Timestamp can be None if something was wrong with the senders clock. - // We directly compare as Options to ensure that votes with proper - // timestamps have precedence (Some is > None). - if slot > latest_gossip_slot - || ((slot == latest_gossip_slot) && (timestamp > latest_timestamp)) - { - self.0.insert( - vote_account_key, - FullTowerVote(GossipVote { - slot, - hash, - packet_batch, - signature, - timestamp, - }), - ); - } - } - _ => { - if let Some(FullTowerVote(gossip_vote)) = self.0.get_mut(&vote_account_key) - { - if slot > gossip_vote.slot { - warn!( - "Originally {} submitted full tower votes, but now has reverted to incremental votes. Converting back to old format.", - vote_account_key - ); - let mut votes = BTreeMap::new(); - let GossipVote { - slot, - hash, - packet_batch, - signature, - .. - } = std::mem::take(gossip_vote); - votes.insert((slot, hash), (packet_batch, signature)); - self.0.insert(vote_account_key, IncrementalVotes(votes)); - } else { - continue; - } - }; - let validator_votes: &mut BTreeMap<(Slot, Hash), (PacketBatch, Signature)> = - match self - .0 - .entry(vote_account_key) - .or_insert(IncrementalVotes(BTreeMap::new())) - { - IncrementalVotes(votes) => votes, - FullTowerVote(_) => continue, // Should never happen - }; - validator_votes.insert((slot, hash), (packet_batch, signature)); - if validator_votes.len() > MAX_VOTES_PER_VALIDATOR { - let smallest_key = validator_votes.keys().next().cloned().unwrap(); - validator_votes.remove(&smallest_key).unwrap(); - } - } - } - } - } - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use { - super::{SingleValidatorVotes::*, *}, - crate::{result::Error, vote_simulator::VoteSimulator}, - crossbeam_channel::{unbounded, Receiver, Sender}, - solana_perf::packet::Packet, - solana_sdk::slot_hashes::MAX_ENTRIES, - solana_vote_program::vote_state::{Lockout, TowerSync, Vote}, - std::collections::VecDeque, - }; - - #[test] - fn test_verified_vote_packets_receive_and_process_vote_packets() { - let (s, r) = unbounded(); - let vote_account_key = solana_sdk::pubkey::new_rand(); - - // Construct the buffer - let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); - - // Send a vote from `vote_account_key`, check that it was inserted - let vote_slot = 0; - let vote_hash = Hash::new_unique(); - let vote = Vote::new(vec![vote_slot], vote_hash); - s.send(vec![VerifiedVoteMetadata { - vote_account_key, - vote: VoteTransaction::from(vote.clone()), - packet_batch: PacketBatch::default(), - signature: Signature::from([1u8; 64]), - }]) - .unwrap(); - verified_vote_packets - .receive_and_process_vote_packets(&r, true) - .unwrap(); - assert_eq!( - verified_vote_packets - .0 - .get(&vote_account_key) - .unwrap() - .len(), - 1 - ); - - // Same slot, same hash, should not be inserted - s.send(vec![VerifiedVoteMetadata { - vote_account_key, - vote: VoteTransaction::from(vote), - packet_batch: PacketBatch::default(), - signature: Signature::from([1u8; 64]), - }]) - .unwrap(); - verified_vote_packets - .receive_and_process_vote_packets(&r, true) - .unwrap(); - assert_eq!( - verified_vote_packets - .0 - .get(&vote_account_key) - .unwrap() - .len(), - 1 - ); - - // Same slot, different hash, should still be inserted - let new_vote_hash = Hash::new_unique(); - let vote = Vote::new(vec![vote_slot], new_vote_hash); - s.send(vec![VerifiedVoteMetadata { - vote_account_key, - vote: VoteTransaction::from(vote), - packet_batch: PacketBatch::default(), - signature: Signature::from([1u8; 64]), - }]) - .unwrap(); - verified_vote_packets - .receive_and_process_vote_packets(&r, true) - .unwrap(); - assert_eq!( - verified_vote_packets - .0 - .get(&vote_account_key) - .unwrap() - .len(), - 2 - ); - - // Different vote slot, should be inserted - let vote_slot = 1; - let vote_hash = Hash::new_unique(); - let vote = Vote::new(vec![vote_slot], vote_hash); - s.send(vec![VerifiedVoteMetadata { - vote_account_key, - vote: VoteTransaction::from(vote), - packet_batch: PacketBatch::default(), - signature: Signature::from([2u8; 64]), - }]) - .unwrap(); - verified_vote_packets - .receive_and_process_vote_packets(&r, true) - .unwrap(); - assert_eq!( - verified_vote_packets - .0 - .get(&vote_account_key) - .unwrap() - .len(), - 3 - ); - - // No new messages, should time out - assert_matches!( - verified_vote_packets.receive_and_process_vote_packets(&r, true), - Err(Error::RecvTimeout(_)) - ); - } - - #[test] - fn test_verified_vote_packets_receive_and_process_vote_packets_max_len() { - let (s, r) = unbounded(); - let vote_account_key = solana_sdk::pubkey::new_rand(); - - // Construct the buffer - let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); - - // Send many more votes than the upper limit per validator - for _ in 0..2 * MAX_VOTES_PER_VALIDATOR { - let vote_slot = 0; - let vote_hash = Hash::new_unique(); - let vote = Vote::new(vec![vote_slot], vote_hash); - s.send(vec![VerifiedVoteMetadata { - vote_account_key, - vote: VoteTransaction::from(vote), - packet_batch: PacketBatch::default(), - signature: Signature::from([1u8; 64]), - }]) - .unwrap(); - } - - // At most `MAX_VOTES_PER_VALIDATOR` should be stored per validator - verified_vote_packets - .receive_and_process_vote_packets(&r, true) - .unwrap(); - assert_eq!( - verified_vote_packets - .0 - .get(&vote_account_key) - .unwrap() - .len(), - MAX_VOTES_PER_VALIDATOR - ); - } - - #[test] - fn test_verified_vote_packets_validator_gossip_votes_iterator_wrong_fork() { - let (s, r) = unbounded(); - let vote_simulator = VoteSimulator::new(1); - let my_leader_bank = vote_simulator.bank_forks.read().unwrap().root_bank(); - let vote_account_key = vote_simulator.vote_pubkeys[0]; - - // Create a bunch of votes with random vote hashes, which should all be ignored - // since they are not on the same fork as `my_leader_bank`, i.e. their hashes do - // not exist in the SlotHashes sysvar for `my_leader_bank` - for _ in 0..MAX_VOTES_PER_VALIDATOR { - let vote_slot = 0; - let vote_hash = Hash::new_unique(); - let vote = Vote::new(vec![vote_slot], vote_hash); - s.send(vec![VerifiedVoteMetadata { - vote_account_key, - vote: VoteTransaction::from(vote), - packet_batch: PacketBatch::default(), - signature: Signature::new_unique(), - }]) - .unwrap(); - } - - // Ingest the votes into the buffer - let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); - verified_vote_packets - .receive_and_process_vote_packets(&r, true) - .unwrap(); - - // Create tracker for previously sent bank votes - let mut previously_sent_to_bank_votes = HashSet::new(); - let mut gossip_votes_iterator = ValidatorGossipVotesIterator::new( - my_leader_bank, - &verified_vote_packets, - &mut previously_sent_to_bank_votes, - ); - - // Wrong fork, we should get no hashes - assert!(gossip_votes_iterator.next().is_none()); - } - - #[test] - fn test_verified_vote_packets_validator_gossip_votes_iterator_correct_fork() { - let (s, r) = unbounded(); - let num_validators = 2; - let vote_simulator = VoteSimulator::new(num_validators); - let mut my_leader_bank = vote_simulator.bank_forks.read().unwrap().root_bank(); - - // Create a set of valid ancestor hashes for this fork - for _ in 0..MAX_ENTRIES { - let slot = my_leader_bank.slot() + 1; - my_leader_bank = Arc::new(Bank::new_from_parent( - my_leader_bank, - &Pubkey::default(), - slot, - )); - } - let slot_hashes_account = my_leader_bank - .get_account(&sysvar::slot_hashes::id()) - .expect("Slot hashes sysvar must exist"); - let slot_hashes = from_account::(&slot_hashes_account).unwrap(); - - // Create valid votes - for i in 0..num_validators { - let vote_account_key = vote_simulator.vote_pubkeys[i]; - // Used to uniquely identify the packets for each validator - let num_packets = i + 1; - for (vote_slot, vote_hash) in slot_hashes.slot_hashes().iter() { - let vote = Vote::new(vec![*vote_slot], *vote_hash); - s.send(vec![VerifiedVoteMetadata { - vote_account_key, - vote: VoteTransaction::from(vote), - packet_batch: PacketBatch::new(vec![Packet::default(); num_packets]), - signature: Signature::new_unique(), - }]) - .unwrap(); - } - } - - // Ingest the votes into the buffer - let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); - verified_vote_packets - .receive_and_process_vote_packets(&r, true) - .unwrap(); - - // One batch of vote packets per validator - assert_eq!(verified_vote_packets.0.len(), num_validators); - // Each validator should have one vote per slot - assert!(verified_vote_packets - .0 - .values() - .all(|validator_votes| validator_votes.len() == slot_hashes.slot_hashes().len())); - - let mut previously_sent_to_bank_votes = HashSet::new(); - let mut gossip_votes_iterator = ValidatorGossipVotesIterator::new( - my_leader_bank.clone(), - &verified_vote_packets, - &mut previously_sent_to_bank_votes, - ); - - // Get and verify batches - for _ in 0..num_validators { - let validator_batch: Vec = gossip_votes_iterator.next().unwrap(); - assert_eq!(validator_batch.len(), slot_hashes.slot_hashes().len()); - let expected_len = validator_batch[0].len(); - assert!(validator_batch - .iter() - .all(|batch| batch.len() == expected_len)); - } - - // Should be empty now - assert!(gossip_votes_iterator.next().is_none()); - - // If we construct another iterator, should return nothing because `previously_sent_to_bank_votes` - // should filter out everything - let mut gossip_votes_iterator = ValidatorGossipVotesIterator::new( - my_leader_bank.clone(), - &verified_vote_packets, - &mut previously_sent_to_bank_votes, - ); - assert!(gossip_votes_iterator.next().is_none()); - - // If we add a new vote, we should return it - my_leader_bank.freeze(); - let vote_slot = my_leader_bank.slot(); - let vote_hash = my_leader_bank.hash(); - let new_leader_slot = my_leader_bank.slot() + 1; - let my_leader_bank = Arc::new(Bank::new_from_parent( - my_leader_bank, - &Pubkey::default(), - new_leader_slot, - )); - let vote_account_key = vote_simulator.vote_pubkeys[1]; - let vote = VoteTransaction::from(Vote::new(vec![vote_slot], vote_hash)); - s.send(vec![VerifiedVoteMetadata { - vote_account_key, - vote, - packet_batch: PacketBatch::default(), - signature: Signature::new_unique(), - }]) - .unwrap(); - // Ingest the votes into the buffer - verified_vote_packets - .receive_and_process_vote_packets(&r, true) - .unwrap(); - let mut gossip_votes_iterator = ValidatorGossipVotesIterator::new( - my_leader_bank, - &verified_vote_packets, - &mut previously_sent_to_bank_votes, - ); - assert!(gossip_votes_iterator.next().is_some()); - assert!(gossip_votes_iterator.next().is_none()); - } - - #[test] - fn test_only_latest_vote_is_sent_with_feature() { - let (s, r) = unbounded(); - let vote_account_key = solana_sdk::pubkey::new_rand(); - - // Send three tower syncs that are out of order - let first_vote = TowerSync::from(vec![(2, 4), (4, 3), (6, 2), (7, 1)]); - let second_vote = TowerSync::from(vec![(2, 4), (4, 3), (11, 1)]); - let third_vote = TowerSync::from(vec![(2, 5), (4, 4), (11, 3), (12, 2), (13, 1)]); - - for vote in [second_vote, first_vote] { - s.send(vec![VerifiedVoteMetadata { - vote_account_key, - vote: VoteTransaction::from(vote), - packet_batch: PacketBatch::default(), - signature: Signature::from([1u8; 64]), - }]) - .unwrap(); - } - - let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); - verified_vote_packets - .receive_and_process_vote_packets(&r, true) - .unwrap(); - - // second_vote should be kept and first_vote ignored - let slot = verified_vote_packets - .0 - .get(&vote_account_key) - .unwrap() - .get_latest_gossip_slot(); - assert_eq!(11, slot); - - // Now send the third_vote, it should overwrite second_vote - s.send(vec![VerifiedVoteMetadata { - vote_account_key, - vote: VoteTransaction::from(third_vote), - packet_batch: PacketBatch::default(), - signature: Signature::from([1u8; 64]), - }]) - .unwrap(); - - verified_vote_packets - .receive_and_process_vote_packets(&r, true) - .unwrap(); - let slot = verified_vote_packets - .0 - .get(&vote_account_key) - .unwrap() - .get_latest_gossip_slot(); - assert_eq!(13, slot); - } - - fn send_tower_sync_and_process( - s: &Sender>, - r: &Receiver>, - vote: TowerSync, - vote_account_key: Pubkey, - verified_vote_packets: &mut VerifiedVotePackets, - ) -> GossipVote { - s.send(vec![VerifiedVoteMetadata { - vote_account_key, - vote: VoteTransaction::from(vote), - packet_batch: PacketBatch::default(), - signature: Signature::from([1u8; 64]), - }]) - .unwrap(); - verified_vote_packets - .receive_and_process_vote_packets(r, true) - .unwrap(); - match verified_vote_packets.0.get(&vote_account_key).unwrap() { - SingleValidatorVotes::FullTowerVote(gossip_vote) => gossip_vote.clone(), - _ => panic!("Received incremental vote"), - } - } - - #[test] - fn test_latest_vote_tie_break_with_feature() { - let (s, r) = unbounded(); - let vote_account_key = solana_sdk::pubkey::new_rand(); - - // Send identical tower syncs with different timestamps - let mut vote = TowerSync::from(vec![(2, 4), (4, 3), (6, 2), (7, 1)]); - vote.timestamp = Some(5); - - let mut vote_later_ts = vote.clone(); - vote_later_ts.timestamp = Some(6); - - let mut vote_earlier_ts = vote.clone(); - vote_earlier_ts.timestamp = Some(4); - - let mut vote_no_ts = vote.clone(); - vote_no_ts.timestamp = None; - - let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); - - // Original vote - let GossipVote { - slot, timestamp, .. - } = send_tower_sync_and_process( - &s, - &r, - vote.clone(), - vote_account_key, - &mut verified_vote_packets, - ); - assert_eq!(slot, vote.last_voted_slot().unwrap()); - assert_eq!(timestamp, vote.timestamp); - - // Same vote with later timestamp should override - let GossipVote { - slot, timestamp, .. - } = send_tower_sync_and_process( - &s, - &r, - vote_later_ts.clone(), - vote_account_key, - &mut verified_vote_packets, - ); - assert_eq!(slot, vote_later_ts.last_voted_slot().unwrap()); - assert_eq!(timestamp, vote_later_ts.timestamp); - - // Same vote with earlier timestamp should not override - let GossipVote { - slot, timestamp, .. - } = send_tower_sync_and_process( - &s, - &r, - vote_earlier_ts, - vote_account_key, - &mut verified_vote_packets, - ); - assert_eq!(slot, vote_later_ts.last_voted_slot().unwrap()); - assert_eq!(timestamp, vote_later_ts.timestamp); - - // Same vote with no timestamp should not override - let GossipVote { - slot, timestamp, .. - } = send_tower_sync_and_process( - &s, - &r, - vote_no_ts, - vote_account_key, - &mut verified_vote_packets, - ); - assert_eq!(slot, vote_later_ts.last_voted_slot().unwrap()); - assert_eq!(timestamp, vote_later_ts.timestamp); - } - - #[test] - fn test_latest_vote_feature_upgrade() { - let (s, r) = unbounded(); - let vote_account_key = solana_sdk::pubkey::new_rand(); - - // Send incremental votes - for i in 0..100 { - let vote = VoteTransaction::from(Vote::new(vec![i], Hash::new_unique())); - s.send(vec![VerifiedVoteMetadata { - vote_account_key, - vote, - packet_batch: PacketBatch::default(), - signature: Signature::from([1u8; 64]), - }]) - .unwrap(); - } - - let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); - // Receive votes without the feature active - verified_vote_packets - .receive_and_process_vote_packets(&r, true) - .unwrap(); - assert_eq!( - 100, - verified_vote_packets - .0 - .get(&vote_account_key) - .unwrap() - .len() - ); - - // Now send some new votes - for i in 101..201 { - let slots = std::iter::zip((i - 30)..(i + 1), (1..32).rev()) - .map(|(slot, confirmation_count)| { - Lockout::new_with_confirmation_count(slot, confirmation_count) - }) - .collect::>(); - let vote = VoteTransaction::from(TowerSync::new( - slots, - Some(i - 32), - Hash::new_unique(), - Hash::new_unique(), - )); - s.send(vec![VerifiedVoteMetadata { - vote_account_key, - vote, - packet_batch: PacketBatch::default(), - signature: Signature::from([1u8; 64]), - }]) - .unwrap(); - } - - // Receive votes with the feature active - verified_vote_packets - .receive_and_process_vote_packets(&r, true) - .unwrap(); - if let FullTowerVote(vote) = verified_vote_packets.0.get(&vote_account_key).unwrap() { - assert_eq!(200, vote.slot); - } else { - panic!("Feature active but incremental votes are present"); - } - } - - #[test] - fn test_incremental_votes_with_feature_active() { - let (s, r) = unbounded(); - let vote_account_key = solana_sdk::pubkey::new_rand(); - let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); - - let hash = Hash::new_unique(); - let vote = VoteTransaction::from(Vote::new(vec![42], hash)); - s.send(vec![VerifiedVoteMetadata { - vote_account_key, - vote, - packet_batch: PacketBatch::default(), - signature: Signature::from([1u8; 64]), - }]) - .unwrap(); - - // Receive incremental votes with the feature active - verified_vote_packets - .receive_and_process_vote_packets(&r, true) - .unwrap(); - - // Should still store as incremental votes - if let IncrementalVotes(votes) = verified_vote_packets.0.get(&vote_account_key).unwrap() { - assert!(votes.contains_key(&(42, hash))); - } else { - panic!("Although feature is active, incremental votes should not be stored as full tower votes"); - } - } - - #[test] - fn test_latest_votes_downgrade_full_to_incremental() { - let (s, r) = unbounded(); - let vote_account_key = solana_sdk::pubkey::new_rand(); - let mut verified_vote_packets = VerifiedVotePackets(HashMap::new()); - - let vote = VoteTransaction::from(TowerSync::from(vec![(42, 1)])); - let hash_42 = vote.hash(); - s.send(vec![VerifiedVoteMetadata { - vote_account_key, - vote, - packet_batch: PacketBatch::default(), - signature: Signature::from([1u8; 64]), - }]) - .unwrap(); - - // Receive full votes - verified_vote_packets - .receive_and_process_vote_packets(&r, true) - .unwrap(); - assert_eq!( - 42, - verified_vote_packets - .0 - .get(&vote_account_key) - .unwrap() - .get_latest_gossip_slot() - ); - - // Try to send an old incremental vote from pre feature activation - let vote = VoteTransaction::from(Vote::new(vec![34], Hash::new_unique())); - s.send(vec![VerifiedVoteMetadata { - vote_account_key, - vote, - packet_batch: PacketBatch::default(), - signature: Signature::from([1u8; 64]), - }]) - .unwrap(); - - // Try to receive nothing should happen - verified_vote_packets - .receive_and_process_vote_packets(&r, true) - .unwrap(); - if let FullTowerVote(vote) = verified_vote_packets.0.get(&vote_account_key).unwrap() { - assert_eq!(42, vote.slot); - } else { - panic!("Old vote triggered a downgrade conversion"); - } - - // Now try to send an incremental vote - let vote = VoteTransaction::from(Vote::new(vec![43], Hash::new_unique())); - let hash_43 = vote.hash(); - s.send(vec![VerifiedVoteMetadata { - vote_account_key, - vote, - packet_batch: PacketBatch::default(), - signature: Signature::from([1u8; 64]), - }]) - .unwrap(); - - // Try to receive and vote lands as well as the conversion back to incremental votes - verified_vote_packets - .receive_and_process_vote_packets(&r, true) - .unwrap(); - if let IncrementalVotes(votes) = verified_vote_packets.0.get(&vote_account_key).unwrap() { - assert!(votes.contains_key(&(42, hash_42))); - assert!(votes.contains_key(&(43, hash_43))); - assert_eq!(2, votes.len()); - } else { - panic!("Conversion back to incremental votes failed"); - } - } -}