Skip to content

Commit

Permalink
shares shreds' payload between window-service and retransmit-stage
Browse files Browse the repository at this point in the history
Shreds received from turbine are concurrently sent to window-service to
be deserialized and inserted into blockstore, while their payload is
sent to retransmit-stage. Using a shared payload between the two
concurrent paths will reduce allocations and memcopies.
  • Loading branch information
behzadnouri committed Feb 7, 2025
1 parent 6a95359 commit fbaf03c
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 52 deletions.
56 changes: 15 additions & 41 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use {
},
result::{Error, Result},
},
assert_matches::debug_assert_matches,
bytes::Bytes,
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
rayon::{prelude::*, ThreadPool},
Expand All @@ -27,7 +28,6 @@ use {
},
solana_measure::measure::Measure,
solana_metrics::inc_new_counter_error,
solana_perf::packet::{Packet, PacketBatch},
solana_rayon_threadlimit::get_thread_count,
solana_runtime::bank_forks::BankForks,
solana_sdk::{
Expand All @@ -36,8 +36,6 @@ use {
},
solana_turbine::cluster_nodes,
std::{
cmp::Reverse,
collections::HashMap,
net::{SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Expand All @@ -55,7 +53,6 @@ pub(crate) type DuplicateSlotReceiver = Receiver<Slot>;
#[derive(Default)]
struct WindowServiceMetrics {
run_insert_count: u64,
num_packets: usize,
num_repairs: AtomicUsize,
num_shreds_received: usize,
handle_packets_elapsed_us: u64,
Expand All @@ -65,12 +62,10 @@ struct WindowServiceMetrics {
num_errors_cross_beam_recv_timeout: u64,
num_errors_other: u64,
num_errors_try_crossbeam_send: u64,
addrs: HashMap</*source:*/ SocketAddr, /*num packets:*/ usize>,
}

impl WindowServiceMetrics {
fn report_metrics(&self, metric_name: &'static str) {
const MAX_NUM_ADDRS: usize = 5;
datapoint_info!(
metric_name,
(
Expand All @@ -79,7 +74,6 @@ impl WindowServiceMetrics {
i64
),
("run_insert_count", self.run_insert_count as i64, i64),
("num_packets", self.num_packets, i64),
("num_repairs", self.num_repairs.load(Ordering::Relaxed), i64),
("num_shreds_received", self.num_shreds_received, i64),
(
Expand All @@ -101,19 +95,6 @@ impl WindowServiceMetrics {
i64
),
);

let mut addrs: Vec<_> = self.addrs.iter().collect();
let reverse_count = |(_addr, count): &_| Reverse(*count);
if addrs.len() > MAX_NUM_ADDRS {
addrs.select_nth_unstable_by_key(MAX_NUM_ADDRS, reverse_count);
addrs.truncate(MAX_NUM_ADDRS);
}
addrs.sort_unstable_by_key(reverse_count);
info!(
"num addresses: {}, top packets by source: {:?}",
self.addrs.len(),
addrs
);
}

fn record_error(&mut self, err: &Error) {
Expand Down Expand Up @@ -208,7 +189,7 @@ fn run_check_duplicate(
#[allow(clippy::too_many_arguments)]
fn run_insert<F>(
thread_pool: &ThreadPool,
verified_receiver: &Receiver<Vec<PacketBatch>>,
verified_receiver: &Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
blockstore: &Blockstore,
leader_schedule_cache: &LeaderScheduleCache,
handle_duplicate: F,
Expand All @@ -224,41 +205,34 @@ where
{
const RECV_TIMEOUT: Duration = Duration::from_millis(200);
let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed");
let mut packets = verified_receiver.recv_timeout(RECV_TIMEOUT)?;
packets.extend(verified_receiver.try_iter().flatten());
let mut shreds = verified_receiver.recv_timeout(RECV_TIMEOUT)?;
shreds.extend(verified_receiver.try_iter().flatten());
shred_receiver_elapsed.stop();
ws_metrics.shred_receiver_elapsed_us += shred_receiver_elapsed.as_us();
ws_metrics.run_insert_count += 1;
let handle_packet = |packet: &Packet| {
if packet.meta().discard() {
let handle_shred = |(shred, repair): (shred::Payload, bool)| {
if accept_repairs_only && !repair {
return None;
}
let repair = packet.meta().repair();
if repair {
ws_metrics.num_repairs.fetch_add(1, Ordering::Relaxed);
debug_assert_matches!(shred, shred::Payload::Unique(_));
} else {
debug_assert_matches!(shred, shred::Payload::Shared(_));
}
if accept_repairs_only && !repair {
return None;
}
let shred = shred::layout::get_shred(packet)?;
let shred = Shred::new_from_serialized_shred(shred.to_vec()).ok()?;
let shred = Shred::new_from_serialized_shred(shred).ok()?;
Some((shred, repair))
};
let now = Instant::now();
let shreds: Vec<_> = thread_pool.install(|| {
packets
.par_iter()
shreds
.into_par_iter()
.with_min_len(32)
.flat_map_iter(|packets| packets.iter().filter_map(handle_packet))
.filter_map(handle_shred)
.collect()
});
ws_metrics.handle_packets_elapsed_us += now.elapsed().as_micros() as u64;
ws_metrics.num_packets += packets.iter().map(PacketBatch::len).sum::<usize>();
ws_metrics.num_shreds_received += shreds.len();
for packet in packets.iter().flat_map(PacketBatch::iter) {
let addr = packet.meta().socket_addr();
*ws_metrics.addrs.entry(addr).or_default() += 1;
}
let completed_data_sets = blockstore.insert_shreds_handle_duplicate(
shreds,
Some(leader_schedule_cache),
Expand Down Expand Up @@ -286,7 +260,7 @@ impl WindowService {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
blockstore: Arc<Blockstore>,
verified_receiver: Receiver<Vec<PacketBatch>>,
verified_receiver: Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
retransmit_sender: Sender<Vec<shred::Payload>>,
repair_socket: Arc<UdpSocket>,
ancestor_hashes_socket: Arc<UdpSocket>,
Expand Down Expand Up @@ -391,7 +365,7 @@ impl WindowService {
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
verified_receiver: Receiver<Vec<PacketBatch>>,
verified_receiver: Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
check_duplicate_sender: Sender<PossibleDuplicateShred>,
completed_data_sets_sender: Option<CompletedDataSetsSender>,
retransmit_sender: Sender<Vec<shred::Payload>>,
Expand Down
42 changes: 31 additions & 11 deletions turbine/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use {
retransmit_stage::RetransmitStage,
},
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
itertools::{Either, Itertools},
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_feature_set as feature_set,
solana_gossip::cluster_info::ClusterInfo,
Expand Down Expand Up @@ -65,7 +66,7 @@ pub fn spawn_shred_sigverify(
leader_schedule_cache: Arc<LeaderScheduleCache>,
shred_fetch_receiver: Receiver<PacketBatch>,
retransmit_sender: Sender<Vec<shred::Payload>>,
verified_sender: Sender<Vec<PacketBatch>>,
verified_sender: Sender<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
num_sigverify_threads: NonZeroUsize,
) -> JoinHandle<()> {
let recycler_cache = RecyclerCache::warmed();
Expand Down Expand Up @@ -130,7 +131,7 @@ fn run_shred_sigverify<const K: usize>(
deduper: &Deduper<K, [u8]>,
shred_fetch_receiver: &Receiver<PacketBatch>,
retransmit_sender: &Sender<Vec<shred::Payload>>,
verified_sender: &Sender<Vec<PacketBatch>>,
verified_sender: &Sender<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
cache: &RwLock<LruCache>,
stats: &mut ShredSigVerifyStats,
Expand Down Expand Up @@ -242,18 +243,37 @@ fn run_shred_sigverify<const K: usize>(
})
});
stats.resign_micros += resign_start.elapsed().as_micros() as u64;
// Exclude repair packets from retransmit.
let shreds: Vec<_> = packets
// Extract shred payload from packets, and separate out repaired shreds.
let (shreds, repairs): (Vec<_>, Vec<_>) = packets
.iter()
.flat_map(PacketBatch::iter)
.filter(|packet| !packet.meta().discard() && !packet.meta().repair())
.filter_map(shred::layout::get_shred)
.map(<[u8]>::to_vec)
.map(shred::Payload::from)
.collect();
.filter(|packet| !packet.meta().discard())
.filter_map(|packet| {
let shred = shred::layout::get_shred(packet)?.to_vec();
Some((shred, packet.meta().repair()))
})
.partition_map(|(shred, repair)| {
if repair {
// No need for Arc overhead here because repaired shreds are
// not retranmitted.
Either::Right(shred::Payload::from(shred))
} else {
// Share the payload between the retransmit-stage and the
// window-service.
Either::Left(shred::Payload::from(Arc::new(shred)))
}
});
// Repaired shreds are not retransmitted.
stats.num_retransmit_shreds += shreds.len();
retransmit_sender.send(shreds)?;
verified_sender.send(packets)?;
retransmit_sender.send(shreds.clone())?;
// Send all shreds to window service to be inserted into blockstore.
let shreds = shreds
.into_iter()
.map(|shred| (shred, /*is_repaired:*/ false));
let repairs = repairs
.into_iter()
.map(|shred| (shred, /*is_repaired:*/ true));
verified_sender.send(shreds.chain(repairs).collect())?;
stats.elapsed_micros += now.elapsed().as_micros() as u64;
Ok(())
}
Expand Down

0 comments on commit fbaf03c

Please sign in to comment.