Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

shares shreds' payload between window-service and retransmit-stage #4803

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 15 additions & 41 deletions core/src/window_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use {
},
result::{Error, Result},
},
assert_matches::debug_assert_matches,
bytes::Bytes,
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
rayon::{prelude::*, ThreadPool},
Expand All @@ -27,7 +28,6 @@ use {
},
solana_measure::measure::Measure,
solana_metrics::inc_new_counter_error,
solana_perf::packet::{Packet, PacketBatch},
solana_rayon_threadlimit::get_thread_count,
solana_runtime::bank_forks::BankForks,
solana_sdk::{
Expand All @@ -36,8 +36,6 @@ use {
},
solana_turbine::cluster_nodes,
std::{
cmp::Reverse,
collections::HashMap,
net::{SocketAddr, UdpSocket},
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Expand All @@ -55,7 +53,6 @@ pub(crate) type DuplicateSlotReceiver = Receiver<Slot>;
#[derive(Default)]
struct WindowServiceMetrics {
run_insert_count: u64,
num_packets: usize,
num_repairs: AtomicUsize,
num_shreds_received: usize,
handle_packets_elapsed_us: u64,
Expand All @@ -65,12 +62,10 @@ struct WindowServiceMetrics {
num_errors_cross_beam_recv_timeout: u64,
num_errors_other: u64,
num_errors_try_crossbeam_send: u64,
addrs: HashMap</*source:*/ SocketAddr, /*num packets:*/ usize>,
}

impl WindowServiceMetrics {
fn report_metrics(&self, metric_name: &'static str) {
const MAX_NUM_ADDRS: usize = 5;
datapoint_info!(
metric_name,
(
Expand All @@ -79,7 +74,6 @@ impl WindowServiceMetrics {
i64
),
("run_insert_count", self.run_insert_count as i64, i64),
("num_packets", self.num_packets, i64),
("num_repairs", self.num_repairs.load(Ordering::Relaxed), i64),
("num_shreds_received", self.num_shreds_received, i64),
(
Expand All @@ -101,19 +95,6 @@ impl WindowServiceMetrics {
i64
),
);

let mut addrs: Vec<_> = self.addrs.iter().collect();
let reverse_count = |(_addr, count): &_| Reverse(*count);
if addrs.len() > MAX_NUM_ADDRS {
addrs.select_nth_unstable_by_key(MAX_NUM_ADDRS, reverse_count);
addrs.truncate(MAX_NUM_ADDRS);
}
addrs.sort_unstable_by_key(reverse_count);
info!(
"num addresses: {}, top packets by source: {:?}",
self.addrs.len(),
addrs
);
Comment on lines -105 to -116
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this info! log (and associated addrs bookkeeping) here because it is pretty inefficient to collect emit these logs here.
I will look into putting something similar elsewhere in the pipeline (maybe shred-fetch-stage or sigverify).

}

fn record_error(&mut self, err: &Error) {
Expand Down Expand Up @@ -208,7 +189,7 @@ fn run_check_duplicate(
#[allow(clippy::too_many_arguments)]
fn run_insert<F>(
thread_pool: &ThreadPool,
verified_receiver: &Receiver<Vec<PacketBatch>>,
verified_receiver: &Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
blockstore: &Blockstore,
leader_schedule_cache: &LeaderScheduleCache,
handle_duplicate: F,
Expand All @@ -224,41 +205,34 @@ where
{
const RECV_TIMEOUT: Duration = Duration::from_millis(200);
let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed");
let mut packets = verified_receiver.recv_timeout(RECV_TIMEOUT)?;
packets.extend(verified_receiver.try_iter().flatten());
let mut shreds = verified_receiver.recv_timeout(RECV_TIMEOUT)?;
shreds.extend(verified_receiver.try_iter().flatten());
shred_receiver_elapsed.stop();
ws_metrics.shred_receiver_elapsed_us += shred_receiver_elapsed.as_us();
ws_metrics.run_insert_count += 1;
let handle_packet = |packet: &Packet| {
if packet.meta().discard() {
let handle_shred = |(shred, repair): (shred::Payload, bool)| {
if accept_repairs_only && !repair {
return None;
}
let repair = packet.meta().repair();
if repair {
ws_metrics.num_repairs.fetch_add(1, Ordering::Relaxed);
debug_assert_matches!(shred, shred::Payload::Unique(_));
} else {
debug_assert_matches!(shred, shred::Payload::Shared(_));
}
if accept_repairs_only && !repair {
return None;
}
let shred = shred::layout::get_shred(packet)?;
let shred = Shred::new_from_serialized_shred(shred.to_vec()).ok()?;
let shred = Shred::new_from_serialized_shred(shred).ok()?;
Some((shred, repair))
};
let now = Instant::now();
let shreds: Vec<_> = thread_pool.install(|| {
packets
.par_iter()
shreds
.into_par_iter()
.with_min_len(32)
.flat_map_iter(|packets| packets.iter().filter_map(handle_packet))
.filter_map(handle_shred)
.collect()
});
ws_metrics.handle_packets_elapsed_us += now.elapsed().as_micros() as u64;
ws_metrics.num_packets += packets.iter().map(PacketBatch::len).sum::<usize>();
ws_metrics.num_shreds_received += shreds.len();
for packet in packets.iter().flat_map(PacketBatch::iter) {
let addr = packet.meta().socket_addr();
*ws_metrics.addrs.entry(addr).or_default() += 1;
}
let completed_data_sets = blockstore.insert_shreds_handle_duplicate(
shreds,
Some(leader_schedule_cache),
Expand Down Expand Up @@ -286,7 +260,7 @@ impl WindowService {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
blockstore: Arc<Blockstore>,
verified_receiver: Receiver<Vec<PacketBatch>>,
verified_receiver: Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
retransmit_sender: Sender<Vec<shred::Payload>>,
repair_socket: Arc<UdpSocket>,
ancestor_hashes_socket: Arc<UdpSocket>,
Expand Down Expand Up @@ -391,7 +365,7 @@ impl WindowService {
exit: Arc<AtomicBool>,
blockstore: Arc<Blockstore>,
leader_schedule_cache: Arc<LeaderScheduleCache>,
verified_receiver: Receiver<Vec<PacketBatch>>,
verified_receiver: Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
check_duplicate_sender: Sender<PossibleDuplicateShred>,
completed_data_sets_sender: Option<CompletedDataSetsSender>,
retransmit_sender: Sender<Vec<shred::Payload>>,
Expand Down
42 changes: 31 additions & 11 deletions turbine/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use {
retransmit_stage::RetransmitStage,
},
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
itertools::{Either, Itertools},
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_feature_set as feature_set,
solana_gossip::cluster_info::ClusterInfo,
Expand Down Expand Up @@ -65,7 +66,7 @@ pub fn spawn_shred_sigverify(
leader_schedule_cache: Arc<LeaderScheduleCache>,
shred_fetch_receiver: Receiver<PacketBatch>,
retransmit_sender: Sender<Vec<shred::Payload>>,
verified_sender: Sender<Vec<PacketBatch>>,
verified_sender: Sender<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
num_sigverify_threads: NonZeroUsize,
) -> JoinHandle<()> {
let recycler_cache = RecyclerCache::warmed();
Expand Down Expand Up @@ -130,7 +131,7 @@ fn run_shred_sigverify<const K: usize>(
deduper: &Deduper<K, [u8]>,
shred_fetch_receiver: &Receiver<PacketBatch>,
retransmit_sender: &Sender<Vec<shred::Payload>>,
verified_sender: &Sender<Vec<PacketBatch>>,
verified_sender: &Sender<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
cache: &RwLock<LruCache>,
stats: &mut ShredSigVerifyStats,
Expand Down Expand Up @@ -242,18 +243,37 @@ fn run_shred_sigverify<const K: usize>(
})
});
stats.resign_micros += resign_start.elapsed().as_micros() as u64;
// Exclude repair packets from retransmit.
let shreds: Vec<_> = packets
// Extract shred payload from packets, and separate out repaired shreds.
let (shreds, repairs): (Vec<_>, Vec<_>) = packets
.iter()
.flat_map(PacketBatch::iter)
.filter(|packet| !packet.meta().discard() && !packet.meta().repair())
.filter_map(shred::layout::get_shred)
.map(<[u8]>::to_vec)
.map(shred::Payload::from)
.collect();
.filter(|packet| !packet.meta().discard())
.filter_map(|packet| {
let shred = shred::layout::get_shred(packet)?.to_vec();
Some((shred, packet.meta().repair()))
})
.partition_map(|(shred, repair)| {
if repair {
// No need for Arc overhead here because repaired shreds are
// not retranmitted.
Either::Right(shred::Payload::from(shred))
} else {
// Share the payload between the retransmit-stage and the
// window-service.
Either::Left(shred::Payload::from(Arc::new(shred)))
}
});
// Repaired shreds are not retransmitted.
stats.num_retransmit_shreds += shreds.len();
retransmit_sender.send(shreds)?;
verified_sender.send(packets)?;
retransmit_sender.send(shreds.clone())?;
// Send all shreds to window service to be inserted into blockstore.
let shreds = shreds
.into_iter()
.map(|shred| (shred, /*is_repaired:*/ false));
let repairs = repairs
.into_iter()
.map(|shred| (shred, /*is_repaired:*/ true));
verified_sender.send(shreds.chain(repairs).collect())?;
stats.elapsed_micros += now.elapsed().as_micros() as u64;
Ok(())
}
Expand Down
Loading