Skip to content

Commit 86942c1

Browse files
behzadnourimergify[bot]
authored andcommitted
shares shreds' payload between window-service and retransmit-stage (#4803)
Shreds received from turbine are concurrently sent to window-service to be deserialized and inserted into blockstore, while their payload is sent to retransmit-stage. Using a shared payload between the two concurrent paths will reduce allocations and memcopies. (cherry picked from commit d590995)
1 parent cd3a8d4 commit 86942c1

File tree

2 files changed

+46
-52
lines changed

2 files changed

+46
-52
lines changed

core/src/window_service.rs

+15-41
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use {
1515
},
1616
result::{Error, Result},
1717
},
18+
assert_matches::debug_assert_matches,
1819
bytes::Bytes,
1920
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
2021
rayon::{prelude::*, ThreadPool},
@@ -27,7 +28,6 @@ use {
2728
},
2829
solana_measure::measure::Measure,
2930
solana_metrics::inc_new_counter_error,
30-
solana_perf::packet::{Packet, PacketBatch},
3131
solana_rayon_threadlimit::get_thread_count,
3232
solana_runtime::bank_forks::BankForks,
3333
solana_sdk::{
@@ -36,8 +36,6 @@ use {
3636
},
3737
solana_turbine::cluster_nodes,
3838
std::{
39-
cmp::Reverse,
40-
collections::HashMap,
4139
net::{SocketAddr, UdpSocket},
4240
sync::{
4341
atomic::{AtomicBool, AtomicUsize, Ordering},
@@ -55,7 +53,6 @@ pub(crate) type DuplicateSlotReceiver = Receiver<Slot>;
5553
#[derive(Default)]
5654
struct WindowServiceMetrics {
5755
run_insert_count: u64,
58-
num_packets: usize,
5956
num_repairs: AtomicUsize,
6057
num_shreds_received: usize,
6158
handle_packets_elapsed_us: u64,
@@ -65,12 +62,10 @@ struct WindowServiceMetrics {
6562
num_errors_cross_beam_recv_timeout: u64,
6663
num_errors_other: u64,
6764
num_errors_try_crossbeam_send: u64,
68-
addrs: HashMap</*source:*/ SocketAddr, /*num packets:*/ usize>,
6965
}
7066

7167
impl WindowServiceMetrics {
7268
fn report_metrics(&self, metric_name: &'static str) {
73-
const MAX_NUM_ADDRS: usize = 5;
7469
datapoint_info!(
7570
metric_name,
7671
(
@@ -79,7 +74,6 @@ impl WindowServiceMetrics {
7974
i64
8075
),
8176
("run_insert_count", self.run_insert_count as i64, i64),
82-
("num_packets", self.num_packets, i64),
8377
("num_repairs", self.num_repairs.load(Ordering::Relaxed), i64),
8478
("num_shreds_received", self.num_shreds_received, i64),
8579
(
@@ -101,19 +95,6 @@ impl WindowServiceMetrics {
10195
i64
10296
),
10397
);
104-
105-
let mut addrs: Vec<_> = self.addrs.iter().collect();
106-
let reverse_count = |(_addr, count): &_| Reverse(*count);
107-
if addrs.len() > MAX_NUM_ADDRS {
108-
addrs.select_nth_unstable_by_key(MAX_NUM_ADDRS, reverse_count);
109-
addrs.truncate(MAX_NUM_ADDRS);
110-
}
111-
addrs.sort_unstable_by_key(reverse_count);
112-
info!(
113-
"num addresses: {}, top packets by source: {:?}",
114-
self.addrs.len(),
115-
addrs
116-
);
11798
}
11899

119100
fn record_error(&mut self, err: &Error) {
@@ -208,7 +189,7 @@ fn run_check_duplicate(
208189
#[allow(clippy::too_many_arguments)]
209190
fn run_insert<F>(
210191
thread_pool: &ThreadPool,
211-
verified_receiver: &Receiver<Vec<PacketBatch>>,
192+
verified_receiver: &Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
212193
blockstore: &Blockstore,
213194
leader_schedule_cache: &LeaderScheduleCache,
214195
handle_duplicate: F,
@@ -224,41 +205,34 @@ where
224205
{
225206
const RECV_TIMEOUT: Duration = Duration::from_millis(200);
226207
let mut shred_receiver_elapsed = Measure::start("shred_receiver_elapsed");
227-
let mut packets = verified_receiver.recv_timeout(RECV_TIMEOUT)?;
228-
packets.extend(verified_receiver.try_iter().flatten());
208+
let mut shreds = verified_receiver.recv_timeout(RECV_TIMEOUT)?;
209+
shreds.extend(verified_receiver.try_iter().flatten());
229210
shred_receiver_elapsed.stop();
230211
ws_metrics.shred_receiver_elapsed_us += shred_receiver_elapsed.as_us();
231212
ws_metrics.run_insert_count += 1;
232-
let handle_packet = |packet: &Packet| {
233-
if packet.meta().discard() {
213+
let handle_shred = |(shred, repair): (shred::Payload, bool)| {
214+
if accept_repairs_only && !repair {
234215
return None;
235216
}
236-
let repair = packet.meta().repair();
237217
if repair {
238218
ws_metrics.num_repairs.fetch_add(1, Ordering::Relaxed);
219+
debug_assert_matches!(shred, shred::Payload::Unique(_));
220+
} else {
221+
debug_assert_matches!(shred, shred::Payload::Shared(_));
239222
}
240-
if accept_repairs_only && !repair {
241-
return None;
242-
}
243-
let shred = shred::layout::get_shred(packet)?;
244-
let shred = Shred::new_from_serialized_shred(shred.to_vec()).ok()?;
223+
let shred = Shred::new_from_serialized_shred(shred).ok()?;
245224
Some((shred, repair))
246225
};
247226
let now = Instant::now();
248227
let shreds: Vec<_> = thread_pool.install(|| {
249-
packets
250-
.par_iter()
228+
shreds
229+
.into_par_iter()
251230
.with_min_len(32)
252-
.flat_map_iter(|packets| packets.iter().filter_map(handle_packet))
231+
.filter_map(handle_shred)
253232
.collect()
254233
});
255234
ws_metrics.handle_packets_elapsed_us += now.elapsed().as_micros() as u64;
256-
ws_metrics.num_packets += packets.iter().map(PacketBatch::len).sum::<usize>();
257235
ws_metrics.num_shreds_received += shreds.len();
258-
for packet in packets.iter().flat_map(PacketBatch::iter) {
259-
let addr = packet.meta().socket_addr();
260-
*ws_metrics.addrs.entry(addr).or_default() += 1;
261-
}
262236
let completed_data_sets = blockstore.insert_shreds_handle_duplicate(
263237
shreds,
264238
Some(leader_schedule_cache),
@@ -286,7 +260,7 @@ impl WindowService {
286260
#[allow(clippy::too_many_arguments)]
287261
pub(crate) fn new(
288262
blockstore: Arc<Blockstore>,
289-
verified_receiver: Receiver<Vec<PacketBatch>>,
263+
verified_receiver: Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
290264
retransmit_sender: Sender<Vec<shred::Payload>>,
291265
repair_socket: Arc<UdpSocket>,
292266
ancestor_hashes_socket: Arc<UdpSocket>,
@@ -391,7 +365,7 @@ impl WindowService {
391365
exit: Arc<AtomicBool>,
392366
blockstore: Arc<Blockstore>,
393367
leader_schedule_cache: Arc<LeaderScheduleCache>,
394-
verified_receiver: Receiver<Vec<PacketBatch>>,
368+
verified_receiver: Receiver<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
395369
check_duplicate_sender: Sender<PossibleDuplicateShred>,
396370
completed_data_sets_sender: Option<CompletedDataSetsSender>,
397371
retransmit_sender: Sender<Vec<shred::Payload>>,

turbine/src/sigverify_shreds.rs

+31-11
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use {
44
retransmit_stage::RetransmitStage,
55
},
66
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
7+
itertools::{Either, Itertools},
78
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
89
solana_feature_set as feature_set,
910
solana_gossip::cluster_info::ClusterInfo,
@@ -65,7 +66,7 @@ pub fn spawn_shred_sigverify(
6566
leader_schedule_cache: Arc<LeaderScheduleCache>,
6667
shred_fetch_receiver: Receiver<PacketBatch>,
6768
retransmit_sender: Sender<Vec<shred::Payload>>,
68-
verified_sender: Sender<Vec<PacketBatch>>,
69+
verified_sender: Sender<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
6970
num_sigverify_threads: NonZeroUsize,
7071
) -> JoinHandle<()> {
7172
let recycler_cache = RecyclerCache::warmed();
@@ -130,7 +131,7 @@ fn run_shred_sigverify<const K: usize>(
130131
deduper: &Deduper<K, [u8]>,
131132
shred_fetch_receiver: &Receiver<PacketBatch>,
132133
retransmit_sender: &Sender<Vec<shred::Payload>>,
133-
verified_sender: &Sender<Vec<PacketBatch>>,
134+
verified_sender: &Sender<Vec<(shred::Payload, /*is_repaired:*/ bool)>>,
134135
cluster_nodes_cache: &ClusterNodesCache<RetransmitStage>,
135136
cache: &RwLock<LruCache>,
136137
stats: &mut ShredSigVerifyStats,
@@ -242,18 +243,37 @@ fn run_shred_sigverify<const K: usize>(
242243
})
243244
});
244245
stats.resign_micros += resign_start.elapsed().as_micros() as u64;
245-
// Exclude repair packets from retransmit.
246-
let shreds: Vec<_> = packets
246+
// Extract shred payload from packets, and separate out repaired shreds.
247+
let (shreds, repairs): (Vec<_>, Vec<_>) = packets
247248
.iter()
248249
.flat_map(PacketBatch::iter)
249-
.filter(|packet| !packet.meta().discard() && !packet.meta().repair())
250-
.filter_map(shred::layout::get_shred)
251-
.map(<[u8]>::to_vec)
252-
.map(shred::Payload::from)
253-
.collect();
250+
.filter(|packet| !packet.meta().discard())
251+
.filter_map(|packet| {
252+
let shred = shred::layout::get_shred(packet)?.to_vec();
253+
Some((shred, packet.meta().repair()))
254+
})
255+
.partition_map(|(shred, repair)| {
256+
if repair {
257+
// No need for Arc overhead here because repaired shreds are
258+
// not retranmitted.
259+
Either::Right(shred::Payload::from(shred))
260+
} else {
261+
// Share the payload between the retransmit-stage and the
262+
// window-service.
263+
Either::Left(shred::Payload::from(Arc::new(shred)))
264+
}
265+
});
266+
// Repaired shreds are not retransmitted.
254267
stats.num_retransmit_shreds += shreds.len();
255-
retransmit_sender.send(shreds)?;
256-
verified_sender.send(packets)?;
268+
retransmit_sender.send(shreds.clone())?;
269+
// Send all shreds to window service to be inserted into blockstore.
270+
let shreds = shreds
271+
.into_iter()
272+
.map(|shred| (shred, /*is_repaired:*/ false));
273+
let repairs = repairs
274+
.into_iter()
275+
.map(|shred| (shred, /*is_repaired:*/ true));
276+
verified_sender.send(shreds.chain(repairs).collect())?;
257277
stats.elapsed_micros += now.elapsed().as_micros() as u64;
258278
Ok(())
259279
}

0 commit comments

Comments
 (0)