Skip to content

Commit 5f4a08c

Browse files
lijunwangsLijun Wang
and
Lijun Wang
authored
Make quic endpoints count configurable (#2035)
* Make number of quic_endpoints configurable * Fixed unit test errors * moved DEFAULT_QUIC_ENDPOINTS to quic::streamer * make tpu fwd to use the configured count of quic endpoints as well * show quic_endpoints count in stats * Addressed a few comments from Alessandro * use unwrap instead of expect as windows clippy complains about it * reverted changes to NonZeroUsize const init * argument name for tpu_num_quic_endpoints --> num_quic_endpoints --------- Co-authored-by: Lijun Wang <[email protected]>
1 parent a71c8f7 commit 5f4a08c

File tree

5 files changed

+55
-10
lines changed

5 files changed

+55
-10
lines changed

gossip/src/cluster_info.rs

+27-10
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ use {
7272
},
7373
solana_streamer::{
7474
packet,
75+
quic::DEFAULT_QUIC_ENDPOINTS,
7576
socket::SocketAddrSpace,
7677
streamer::{PacketBatchReceiver, PacketBatchSender},
7778
},
@@ -2897,11 +2898,10 @@ pub struct NodeConfig {
28972898
pub public_tpu_forwards_addr: Option<SocketAddr>,
28982899
/// The number of TVU sockets to create
28992900
pub num_tvu_sockets: NonZeroUsize,
2901+
/// The number of QUIC tpu endpoints
2902+
pub num_quic_endpoints: NonZeroUsize,
29002903
}
29012904

2902-
// This will be adjusted and parameterized in follow-on PRs.
2903-
const QUIC_ENDPOINTS: usize = 1;
2904-
29052905
#[derive(Debug)]
29062906
pub struct Node {
29072907
pub info: ContactInfo,
@@ -2913,7 +2913,15 @@ impl Node {
29132913
let pubkey = solana_sdk::pubkey::new_rand();
29142914
Self::new_localhost_with_pubkey(&pubkey)
29152915
}
2916+
29162917
pub fn new_localhost_with_pubkey(pubkey: &Pubkey) -> Self {
2918+
Self::new_localhost_with_pubkey_and_quic_endpoints(pubkey, DEFAULT_QUIC_ENDPOINTS)
2919+
}
2920+
2921+
pub fn new_localhost_with_pubkey_and_quic_endpoints(
2922+
pubkey: &Pubkey,
2923+
num_quic_endpoints: usize,
2924+
) -> Self {
29172925
let localhost_ip_addr = IpAddr::V4(Ipv4Addr::LOCALHOST);
29182926
let localhost_bind_addr = format!("{localhost_ip_addr:?}:0");
29192927
let unspecified_bind_addr = format!("{:?}:0", IpAddr::V4(Ipv4Addr::UNSPECIFIED));
@@ -2931,7 +2939,7 @@ impl Node {
29312939
)
29322940
.unwrap();
29332941
let tpu_quic =
2934-
bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap();
2942+
bind_more_with_config(tpu_quic, num_quic_endpoints, quic_config.clone()).unwrap();
29352943
let (gossip_port, (gossip, ip_echo)) =
29362944
bind_common_in_range(localhost_ip_addr, port_range).unwrap();
29372945
let gossip_addr = SocketAddr::new(localhost_ip_addr, gossip_port);
@@ -2947,7 +2955,7 @@ impl Node {
29472955
)
29482956
.unwrap();
29492957
let tpu_forwards_quic =
2950-
bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config).unwrap();
2958+
bind_more_with_config(tpu_forwards_quic, num_quic_endpoints, quic_config).unwrap();
29512959
let tpu_vote = UdpSocket::bind(&localhost_bind_addr).unwrap();
29522960
let repair = UdpSocket::bind(&localhost_bind_addr).unwrap();
29532961
let rpc_port = find_available_port_in_range(localhost_ip_addr, port_range).unwrap();
@@ -3068,7 +3076,7 @@ impl Node {
30683076
)
30693077
.unwrap();
30703078
let tpu_quic =
3071-
bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap();
3079+
bind_more_with_config(tpu_quic, DEFAULT_QUIC_ENDPOINTS, quic_config.clone()).unwrap();
30723080
let ((tpu_forwards_port, tpu_forwards), (_tpu_forwards_quic_port, tpu_forwards_quic)) =
30733081
bind_two_in_range_with_offset_and_config(
30743082
bind_ip_addr,
@@ -3079,7 +3087,7 @@ impl Node {
30793087
)
30803088
.unwrap();
30813089
let tpu_forwards_quic =
3082-
bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config).unwrap();
3090+
bind_more_with_config(tpu_forwards_quic, DEFAULT_QUIC_ENDPOINTS, quic_config).unwrap();
30833091
let (tpu_vote_port, tpu_vote) = Self::bind(bind_ip_addr, port_range);
30843092
let (_, retransmit_socket) = Self::bind(bind_ip_addr, port_range);
30853093
let (_, repair) = Self::bind(bind_ip_addr, port_range);
@@ -3151,6 +3159,7 @@ impl Node {
31513159
public_tpu_addr,
31523160
public_tpu_forwards_addr,
31533161
num_tvu_sockets,
3162+
num_quic_endpoints,
31543163
} = config;
31553164

31563165
let (gossip_port, (gossip, ip_echo)) =
@@ -3170,7 +3179,7 @@ impl Node {
31703179
quic_config.clone(),
31713180
);
31723181
let tpu_quic =
3173-
bind_more_with_config(tpu_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap();
3182+
bind_more_with_config(tpu_quic, num_quic_endpoints.get(), quic_config.clone()).unwrap();
31743183

31753184
let (tpu_forwards_port, tpu_forwards_sockets) =
31763185
multi_bind_in_range(bind_ip_addr, port_range, 8).expect("tpu_forwards multi_bind");
@@ -3183,8 +3192,12 @@ impl Node {
31833192
),
31843193
quic_config.clone(),
31853194
);
3186-
let tpu_forwards_quic =
3187-
bind_more_with_config(tpu_forwards_quic, QUIC_ENDPOINTS, quic_config.clone()).unwrap();
3195+
let tpu_forwards_quic = bind_more_with_config(
3196+
tpu_forwards_quic,
3197+
num_quic_endpoints.get(),
3198+
quic_config.clone(),
3199+
)
3200+
.unwrap();
31883201

31893202
let (tpu_vote_port, tpu_vote_sockets) =
31903203
multi_bind_in_range(bind_ip_addr, port_range, 1).expect("tpu_vote multi_bind");
@@ -3399,6 +3412,8 @@ mod tests {
33993412
sync::Arc,
34003413
},
34013414
};
3415+
const DEFAULT_NUM_QUIC_ENDPOINTS: NonZeroUsize =
3416+
unsafe { NonZeroUsize::new_unchecked(DEFAULT_QUIC_ENDPOINTS) };
34023417

34033418
#[test]
34043419
fn test_gossip_node() {
@@ -3813,6 +3828,7 @@ mod tests {
38133828
public_tpu_addr: None,
38143829
public_tpu_forwards_addr: None,
38153830
num_tvu_sockets: MINIMUM_NUM_TVU_SOCKETS,
3831+
num_quic_endpoints: DEFAULT_NUM_QUIC_ENDPOINTS,
38163832
};
38173833

38183834
let node = Node::new_with_external_ip(&solana_sdk::pubkey::new_rand(), config);
@@ -3835,6 +3851,7 @@ mod tests {
38353851
public_tpu_addr: None,
38363852
public_tpu_forwards_addr: None,
38373853
num_tvu_sockets: MINIMUM_NUM_TVU_SOCKETS,
3854+
num_quic_endpoints: DEFAULT_NUM_QUIC_ENDPOINTS,
38383855
};
38393856

38403857
let node = Node::new_with_external_ip(&solana_sdk::pubkey::new_rand(), config);

streamer/src/nonblocking/quic.rs

+3
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,9 @@ async fn run_server(
266266
max_unstaked_connections,
267267
max_streams_per_ms,
268268
));
269+
stats
270+
.quic_endpoints_count
271+
.store(incoming.len(), Ordering::Relaxed);
269272
let staked_connection_table: Arc<Mutex<ConnectionTable>> =
270273
Arc::new(Mutex::new(ConnectionTable::new()));
271274
let (sender, receiver) = async_unbounded();

streamer/src/quic.rs

+9
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ use {
2828
pub const MAX_STAKED_CONNECTIONS: usize = 2000;
2929
pub const MAX_UNSTAKED_CONNECTIONS: usize = 500;
3030

31+
// This will be adjusted and parameterized in follow-on PRs.
32+
pub const DEFAULT_QUIC_ENDPOINTS: usize = 1;
33+
3134
pub struct SkipClientVerification;
3235

3336
impl SkipClientVerification {
@@ -197,6 +200,7 @@ pub struct StreamerStats {
197200
pub(crate) connection_rate_limiter_length: AtomicUsize,
198201
pub(crate) outstanding_incoming_connection_attempts: AtomicUsize,
199202
pub(crate) total_incoming_connection_attempts: AtomicUsize,
203+
pub(crate) quic_endpoints_count: AtomicUsize,
200204
}
201205

202206
impl StreamerStats {
@@ -537,6 +541,11 @@ impl StreamerStats {
537541
.load(Ordering::Relaxed),
538542
i64
539543
),
544+
(
545+
"quic_endpoints_count",
546+
self.quic_endpoints_count.load(Ordering::Relaxed),
547+
i64
548+
),
540549
);
541550
}
542551
}

validator/src/cli.rs

+14
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ use {
4747
solana_send_transaction_service::send_transaction_service::{
4848
self, MAX_BATCH_SEND_RATE_MS, MAX_TRANSACTION_BATCH_SIZE,
4949
},
50+
solana_streamer::quic::DEFAULT_QUIC_ENDPOINTS,
5051
solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE,
5152
solana_unified_scheduler_pool::DefaultSchedulerPool,
5253
std::{path::PathBuf, str::FromStr},
@@ -903,6 +904,17 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
903904
.hidden(hidden_unless_forced())
904905
.help("Controls the rate of the clients connections per IpAddr per minute."),
905906
)
907+
.arg(
908+
Arg::with_name("num_quic_endpoints")
909+
.long("num-quic-endpoints")
910+
.takes_value(true)
911+
.default_value(&default_args.num_quic_endpoints)
912+
.validator(is_parsable::<usize>)
913+
.hidden(hidden_unless_forced())
914+
.help("The number of QUIC endpoints used for TPU and TPU-Forward. It can be increased to \
915+
increase network ingest throughput, at the expense of higher CPU and general \
916+
validator load."),
917+
)
906918
.arg(
907919
Arg::with_name("staked_nodes_overrides")
908920
.long("staked-nodes-overrides")
@@ -2213,6 +2225,7 @@ pub struct DefaultArgs {
22132225
pub accounts_shrink_ratio: String,
22142226
pub tpu_connection_pool_size: String,
22152227
pub tpu_max_connections_per_ipaddr_per_minute: String,
2228+
pub num_quic_endpoints: String,
22162229

22172230
// Exit subcommand
22182231
pub exit_min_idle_time: String,
@@ -2304,6 +2317,7 @@ impl DefaultArgs {
23042317
tpu_connection_pool_size: DEFAULT_TPU_CONNECTION_POOL_SIZE.to_string(),
23052318
tpu_max_connections_per_ipaddr_per_minute:
23062319
DEFAULT_MAX_CONNECTIONS_PER_IPADDR_PER_MINUTE.to_string(),
2320+
num_quic_endpoints: DEFAULT_QUIC_ENDPOINTS.to_string(),
23072321
rpc_max_request_body_size: MAX_REQUEST_BODY_SIZE.to_string(),
23082322
exit_min_idle_time: "10".to_string(),
23092323
exit_max_delinquent_stake: "5".to_string(),

validator/src/main.rs

+2
Original file line numberDiff line numberDiff line change
@@ -1946,13 +1946,15 @@ pub fn main() {
19461946
})
19471947
});
19481948

1949+
let num_quic_endpoints = value_t_or_exit!(matches, "num_quic_endpoints", NonZeroUsize);
19491950
let node_config = NodeConfig {
19501951
gossip_addr,
19511952
port_range: dynamic_port_range,
19521953
bind_ip_addr: bind_address,
19531954
public_tpu_addr,
19541955
public_tpu_forwards_addr,
19551956
num_tvu_sockets: tvu_receive_threads,
1957+
num_quic_endpoints,
19561958
};
19571959

19581960
let cluster_entrypoints = entrypoint_addrs

0 commit comments

Comments
 (0)