Skip to content

Commit

Permalink
Merge pull request #4783 from driftluo/peer-store-optimization
Browse files Browse the repository at this point in the history
Peer store optimization
  • Loading branch information
driftluo authored Jan 17, 2025
2 parents b8f655a + e625cf0 commit 9b41c05
Show file tree
Hide file tree
Showing 18 changed files with 291 additions and 141 deletions.
3 changes: 2 additions & 1 deletion benches/benches/benchmarks/overall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ckb_chain::{start_chain_services, ChainController};
use ckb_chain_spec::consensus::{ConsensusBuilder, ProposalWindow};
use ckb_dao_utils::genesis_dao_data;
use ckb_jsonrpc_types::JsonBytes;
use ckb_network::{Flags, NetworkController, NetworkService, NetworkState};
use ckb_network::{network::TransportType, Flags, NetworkController, NetworkService, NetworkState};
use ckb_shared::{Shared, SharedBuilder};
use ckb_store::ChainStore;
use ckb_types::{
Expand Down Expand Up @@ -77,6 +77,7 @@ fn dummy_network(shared: &Shared) -> NetworkController {
"test".to_string(),
Flags::COMPATIBILITY,
),
TransportType::Tcp,
)
.start(shared.async_handle())
.expect("Start network service failed")
Expand Down
3 changes: 2 additions & 1 deletion chain/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use ckb_app_config::{BlockAssemblerConfig, NetworkConfig};
use ckb_chain_spec::consensus::{Consensus, ConsensusBuilder};
use ckb_dao_utils::genesis_dao_data;
use ckb_jsonrpc_types::ScriptHashType;
use ckb_network::{Flags, NetworkController, NetworkService, NetworkState};
use ckb_network::{network::TransportType, Flags, NetworkController, NetworkService, NetworkState};
use ckb_shared::{Shared, SharedBuilder};
use ckb_store::ChainStore;
use ckb_test_chain_utils::{always_success_cell, create_always_success_tx};
Expand Down Expand Up @@ -123,6 +123,7 @@ pub(crate) fn dummy_network(shared: &Shared) -> NetworkController {
"test".to_string(),
Flags::COMPATIBILITY,
),
TransportType::Tcp,
)
.start(shared.async_handle())
.expect("Start network service failed")
Expand Down
61 changes: 33 additions & 28 deletions network/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,14 @@ impl NetworkState {
.iter()
.chain(config.public_addresses.iter())
.cloned()
.filter_map(|mut addr| {
multiaddr_to_socketaddr(&addr)
.filter(|addr| is_reachable(addr.ip()))
.and({
if extract_peer_id(&addr).is_none() {
addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
}
Some(addr)
})
.filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
_ => {
if extract_peer_id(&addr).is_none() {
addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
}
Some(addr)
}
})
.collect();
info!("Loading the peer store. This process may take a few seconds to complete.");
Expand Down Expand Up @@ -158,15 +157,14 @@ impl NetworkState {
.iter()
.chain(config.public_addresses.iter())
.cloned()
.filter_map(|mut addr| {
multiaddr_to_socketaddr(&addr)
.filter(|addr| is_reachable(addr.ip()))
.and({
if extract_peer_id(&addr).is_none() {
addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
}
Some(addr)
})
.filter_map(|mut addr| match multiaddr_to_socketaddr(&addr) {
Some(socket_addr) if !is_reachable(socket_addr.ip()) => None,
_ => {
if extract_peer_id(&addr).is_none() {
addr.push(Protocol::P2P(Cow::Borrowed(local_peer_id.as_bytes())));
}
Some(addr)
}
})
.collect();
info!("Loading the peer store. This process may take a few seconds to complete.");
Expand Down Expand Up @@ -831,6 +829,7 @@ impl NetworkService {
required_protocol_ids: Vec<ProtocolId>,
// name, version, flags
identify_announce: (String, String, Flags),
transport_type: TransportType,
) -> Self {
let config = &network_state.config;

Expand Down Expand Up @@ -1017,7 +1016,7 @@ impl NetworkService {
service_builder = service_builder.tcp_config(bind_fn);
}
}
TransportType::Ws => {
TransportType::Ws | TransportType::Wss => {
// only bind once
if matches!(init, BindType::Ws) {
continue;
Expand Down Expand Up @@ -1074,6 +1073,7 @@ impl NetworkService {
Arc::clone(&network_state),
p2p_service.control().to_owned().into(),
Duration::from_secs(config.connect_outbound_interval_secs),
transport_type,
);
bg_services.push(Box::pin(outbound_peer_service) as Pin<Box<_>>);
};
Expand Down Expand Up @@ -1520,19 +1520,24 @@ pub(crate) async fn async_disconnect_with_message(
control.disconnect(peer_index).await
}

/// Transport type on ckb
#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord)]
pub(crate) enum TransportType {
pub enum TransportType {
/// Tcp
Tcp,
/// Ws
Ws,
/// Wss only on wasm
Wss,
}

pub(crate) fn find_type(addr: &Multiaddr) -> TransportType {
if addr
.iter()
.any(|proto| matches!(proto, Protocol::Ws | Protocol::Wss))
{
TransportType::Ws
} else {
TransportType::Tcp
}
let mut iter = addr.iter();

iter.find_map(|proto| match proto {
Protocol::Ws => Some(TransportType::Ws),
Protocol::Wss => Some(TransportType::Wss),
_ => None,
})
.unwrap_or(TransportType::Tcp)
}
25 changes: 21 additions & 4 deletions network/src/peer_registry.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Peer registry
use crate::network_group::Group;
use crate::peer_store::PeerStore;
use crate::Flags;
use crate::{
errors::{Error, PeerError},
extract_peer_id, Peer, PeerId, SessionType,
Expand All @@ -24,7 +25,7 @@ pub struct PeerRegistry {
// Only whitelist peers or allow all peers.
whitelist_only: bool,
whitelist_peers: HashSet<PeerId>,
feeler_peers: HashSet<PeerId>,
feeler_peers: HashMap<PeerId, Flags>,
}

/// Global network connection status
Expand Down Expand Up @@ -63,7 +64,7 @@ impl PeerRegistry {
PeerRegistry {
peers: HashMap::with_capacity_and_hasher(20, Default::default()),
whitelist_peers: whitelist_peers.iter().filter_map(extract_peer_id).collect(),
feeler_peers: HashSet::default(),
feeler_peers: HashMap::default(),
max_inbound,
max_outbound,
whitelist_only,
Expand Down Expand Up @@ -191,10 +192,26 @@ impl PeerRegistry {
/// Add feeler dail task
pub fn add_feeler(&mut self, addr: &Multiaddr) {
if let Some(peer_id) = extract_peer_id(addr) {
self.feeler_peers.insert(peer_id);
self.feeler_peers.insert(peer_id, Flags::COMPATIBILITY);
}
}

/// Identify change feeler flags
pub fn change_feeler_flags(&mut self, addr: &Multiaddr, flags: Flags) -> bool {
if let Some(peer_id) = extract_peer_id(addr) {
if let Some(i) = self.feeler_peers.get_mut(&peer_id) {
*i = flags;
return true;
}
}
false
}

/// Get feeler session flags
pub fn feeler_flags(&self, addr: &Multiaddr) -> Option<Flags> {
extract_peer_id(addr).and_then(|peer_id| self.feeler_peers.get(&peer_id).cloned())
}

/// Remove feeler dail task on session disconnects or fails
pub fn remove_feeler(&mut self, addr: &Multiaddr) {
if let Some(peer_id) = extract_peer_id(addr) {
Expand All @@ -205,7 +222,7 @@ impl PeerRegistry {
/// Whether this session is feeler session
pub fn is_feeler(&self, addr: &Multiaddr) -> bool {
extract_peer_id(addr)
.map(|peer_id| self.feeler_peers.contains(&peer_id))
.map(|peer_id| self.feeler_peers.contains_key(&peer_id))
.unwrap_or_default()
}

Expand Down
Loading

0 comments on commit 9b41c05

Please sign in to comment.