From 7815ff668ba438ade4f304b879b6b2f85127a7d6 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 12 Feb 2025 01:17:45 +0000 Subject: [PATCH] v2.2: shares shreds' payload between window-service and retransmit-stage (backport of #4803) (#4936) --- core/src/window_service.rs | 56 +++++++++------------------------ turbine/src/sigverify_shreds.rs | 42 ++++++++++++++++++------- 2 files changed, 46 insertions(+), 52 deletions(-) diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 77110f44634cda..07adae2d8a18e4 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -15,6 +15,7 @@ use { }, result::{Error, Result}, }, + assert_matches::debug_assert_matches, bytes::Bytes, crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender}, rayon::{prelude::*, ThreadPool}, @@ -27,7 +28,6 @@ use { }, solana_measure::measure::Measure, solana_metrics::inc_new_counter_error, - solana_perf::packet::{Packet, PacketBatch}, solana_rayon_threadlimit::get_thread_count, solana_runtime::bank_forks::BankForks, solana_sdk::{ @@ -36,8 +36,6 @@ use { }, solana_turbine::cluster_nodes, std::{ - cmp::Reverse, - collections::HashMap, net::{SocketAddr, UdpSocket}, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, @@ -55,7 +53,6 @@ pub(crate) type DuplicateSlotReceiver = Receiver; #[derive(Default)] struct WindowServiceMetrics { run_insert_count: u64, - num_packets: usize, num_repairs: AtomicUsize, num_shreds_received: usize, handle_packets_elapsed_us: u64, @@ -65,12 +62,10 @@ struct WindowServiceMetrics { num_errors_cross_beam_recv_timeout: u64, num_errors_other: u64, num_errors_try_crossbeam_send: u64, - addrs: HashMap, } impl WindowServiceMetrics { fn report_metrics(&self, metric_name: &'static str) { - const MAX_NUM_ADDRS: usize = 5; datapoint_info!( metric_name, ( @@ -79,7 +74,6 @@ impl WindowServiceMetrics { i64 ), ("run_insert_count", self.run_insert_count as i64, i64), - ("num_packets", self.num_packets, i64), ("num_repairs", self.num_repairs.load(Ordering::Relaxed), i64), ("num_shreds_received", self.num_shreds_received, i64), ( @@ -101,19 +95,6 @@ impl WindowServiceMetrics { i64 ), ); - - let mut addrs: Vec<_> = self.addrs.iter().collect(); - let reverse_count = |(_addr, count): &_| Reverse(*count); - if addrs.len() > MAX_NUM_ADDRS { - addrs.select_nth_unstable_by_key(MAX_NUM_ADDRS, reverse_count); - addrs.truncate(MAX_NUM_ADDRS); - } - addrs.sort_unstable_by_key(reverse_count); - info!( - "num addresses: {}, top packets by source: {:?}", - self.addrs.len(), - addrs - ); } fn record_error(&mut self, err: &Error) { @@ -208,7 +189,7 @@ fn run_check_duplicate( #[allow(clippy::too_many_arguments)] fn run_insert( thread_pool: &ThreadPool, - verified_receiver: &Receiver>, + verified_receiver: &Receiver>, blockstore: &Blockstore, leader_schedule_cache: &LeaderScheduleCache, handle_duplicate: F, @@ -224,41 +205,34 @@ where { const RECV_TIMEOUT: Duration = Duration::from_millis(200); let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed"); - let mut packets = verified_receiver.recv_timeout(RECV_TIMEOUT)?; - packets.extend(verified_receiver.try_iter().flatten()); + let mut shreds = verified_receiver.recv_timeout(RECV_TIMEOUT)?; + shreds.extend(verified_receiver.try_iter().flatten()); shred_receiver_elapsed.stop(); ws_metrics.shred_receiver_elapsed_us += shred_receiver_elapsed.as_us(); ws_metrics.run_insert_count += 1; - let handle_packet = |packet: &Packet| { - if packet.meta().discard() { + let handle_shred = |(shred, repair): (shred::Payload, bool)| { + if accept_repairs_only && !repair { return None; } - let repair = packet.meta().repair(); if repair { ws_metrics.num_repairs.fetch_add(1, Ordering::Relaxed); + debug_assert_matches!(shred, shred::Payload::Unique(_)); + } else { + debug_assert_matches!(shred, shred::Payload::Shared(_)); } - if accept_repairs_only && !repair { - return None; - } - let shred = shred::layout::get_shred(packet)?; - let shred = Shred::new_from_serialized_shred(shred.to_vec()).ok()?; + let shred = Shred::new_from_serialized_shred(shred).ok()?; Some((shred, repair)) }; let now = Instant::now(); let shreds: Vec<_> = thread_pool.install(|| { - packets - .par_iter() + shreds + .into_par_iter() .with_min_len(32) - .flat_map_iter(|packets| packets.iter().filter_map(handle_packet)) + .filter_map(handle_shred) .collect() }); ws_metrics.handle_packets_elapsed_us += now.elapsed().as_micros() as u64; - ws_metrics.num_packets += packets.iter().map(PacketBatch::len).sum::(); ws_metrics.num_shreds_received += shreds.len(); - for packet in packets.iter().flat_map(PacketBatch::iter) { - let addr = packet.meta().socket_addr(); - *ws_metrics.addrs.entry(addr).or_default() += 1; - } let completed_data_sets = blockstore.insert_shreds_handle_duplicate( shreds, Some(leader_schedule_cache), @@ -286,7 +260,7 @@ impl WindowService { #[allow(clippy::too_many_arguments)] pub(crate) fn new( blockstore: Arc, - verified_receiver: Receiver>, + verified_receiver: Receiver>, retransmit_sender: Sender>, repair_socket: Arc, ancestor_hashes_socket: Arc, @@ -391,7 +365,7 @@ impl WindowService { exit: Arc, blockstore: Arc, leader_schedule_cache: Arc, - verified_receiver: Receiver>, + verified_receiver: Receiver>, check_duplicate_sender: Sender, completed_data_sets_sender: Option, retransmit_sender: Sender>, diff --git a/turbine/src/sigverify_shreds.rs b/turbine/src/sigverify_shreds.rs index a7b1edd254d1db..a7355e5f819250 100644 --- a/turbine/src/sigverify_shreds.rs +++ b/turbine/src/sigverify_shreds.rs @@ -4,6 +4,7 @@ use { retransmit_stage::RetransmitStage, }, crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender}, + itertools::{Either, Itertools}, rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, solana_feature_set as feature_set, solana_gossip::cluster_info::ClusterInfo, @@ -65,7 +66,7 @@ pub fn spawn_shred_sigverify( leader_schedule_cache: Arc, shred_fetch_receiver: Receiver, retransmit_sender: Sender>, - verified_sender: Sender>, + verified_sender: Sender>, num_sigverify_threads: NonZeroUsize, ) -> JoinHandle<()> { let recycler_cache = RecyclerCache::warmed(); @@ -130,7 +131,7 @@ fn run_shred_sigverify( deduper: &Deduper, shred_fetch_receiver: &Receiver, retransmit_sender: &Sender>, - verified_sender: &Sender>, + verified_sender: &Sender>, cluster_nodes_cache: &ClusterNodesCache, cache: &RwLock, stats: &mut ShredSigVerifyStats, @@ -242,18 +243,37 @@ fn run_shred_sigverify( }) }); stats.resign_micros += resign_start.elapsed().as_micros() as u64; - // Exclude repair packets from retransmit. - let shreds: Vec<_> = packets + // Extract shred payload from packets, and separate out repaired shreds. + let (shreds, repairs): (Vec<_>, Vec<_>) = packets .iter() .flat_map(PacketBatch::iter) - .filter(|packet| !packet.meta().discard() && !packet.meta().repair()) - .filter_map(shred::layout::get_shred) - .map(<[u8]>::to_vec) - .map(shred::Payload::from) - .collect(); + .filter(|packet| !packet.meta().discard()) + .filter_map(|packet| { + let shred = shred::layout::get_shred(packet)?.to_vec(); + Some((shred, packet.meta().repair())) + }) + .partition_map(|(shred, repair)| { + if repair { + // No need for Arc overhead here because repaired shreds are + // not retranmitted. + Either::Right(shred::Payload::from(shred)) + } else { + // Share the payload between the retransmit-stage and the + // window-service. + Either::Left(shred::Payload::from(Arc::new(shred))) + } + }); + // Repaired shreds are not retransmitted. stats.num_retransmit_shreds += shreds.len(); - retransmit_sender.send(shreds)?; - verified_sender.send(packets)?; + retransmit_sender.send(shreds.clone())?; + // Send all shreds to window service to be inserted into blockstore. + let shreds = shreds + .into_iter() + .map(|shred| (shred, /*is_repaired:*/ false)); + let repairs = repairs + .into_iter() + .map(|shred| (shred, /*is_repaired:*/ true)); + verified_sender.send(shreds.chain(repairs).collect())?; stats.elapsed_micros += now.elapsed().as_micros() as u64; Ok(()) }