Skip to content

Commit d01029d

Browse files
sanityclaude
andcommitted
fix: resolve API mismatches after rebase
- Add ObservedAddr type to transport/mod.rs for NAT address tracking - Update add_subscriber wrapper in ring/mod.rs to accept upstream_addr param - Add upstream_addr (None) to all local subscription call sites - Remove duplicate get_peer_location_by_addr functions The add_subscriber method now takes an optional ObservedAddr parameter to track the transport-level address for NAT peers. For local subscriptions (no remote upstream), None is passed. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
1 parent 14f867c commit d01029d

File tree

6 files changed

+125
-111
lines changed

6 files changed

+125
-111
lines changed

crates/core/src/node/network_bridge/p2p_protoc.rs

Lines changed: 69 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,9 @@ impl P2pConnManager {
464464
);
465465
match peer_connection {
466466
Some(peer_connection) => {
467-
if let Err(e) = peer_connection.sender.send(Left(msg.clone())).await {
467+
if let Err(e) =
468+
peer_connection.sender.send(Left(msg.clone())).await
469+
{
468470
tracing::error!(
469471
tx = %msg.id(),
470472
"Failed to send message to peer: {}", e
@@ -624,10 +626,11 @@ impl P2pConnManager {
624626
);
625627

626628
// Clean up all active connections
627-
let peers_to_cleanup: Vec<_> =
628-
ctx.connections.iter().map(|(addr, entry)| {
629-
(*addr, entry.pub_key.clone())
630-
}).collect();
629+
let peers_to_cleanup: Vec<_> = ctx
630+
.connections
631+
.iter()
632+
.map(|(addr, entry)| (*addr, entry.pub_key.clone()))
633+
.collect();
631634
for (peer_addr, pub_key_opt) in peers_to_cleanup {
632635
tracing::debug!(%peer_addr, "Cleaning up active connection due to critical channel closure");
633636

@@ -636,7 +639,16 @@ impl P2pConnManager {
636639
PeerId::new(peer_addr, pub_key)
637640
} else {
638641
// Use our own pub_key as placeholder if we don't know the peer's
639-
PeerId::new(peer_addr, (*ctx.bridge.op_manager.ring.connection_manager.pub_key).clone())
642+
PeerId::new(
643+
peer_addr,
644+
(*ctx
645+
.bridge
646+
.op_manager
647+
.ring
648+
.connection_manager
649+
.pub_key)
650+
.clone(),
651+
)
640652
};
641653
ctx.bridge
642654
.op_manager
@@ -696,7 +708,16 @@ impl P2pConnManager {
696708
let peer = if let Some(ref pub_key) = entry.pub_key {
697709
PeerId::new(peer_addr, pub_key.clone())
698710
} else {
699-
PeerId::new(peer_addr, (*ctx.bridge.op_manager.ring.connection_manager.pub_key).clone())
711+
PeerId::new(
712+
peer_addr,
713+
(*ctx
714+
.bridge
715+
.op_manager
716+
.ring
717+
.connection_manager
718+
.pub_key)
719+
.clone(),
720+
)
700721
};
701722
let pub_key_to_remove = entry.pub_key.clone();
702723

@@ -795,13 +816,24 @@ impl P2pConnManager {
795816
}
796817
NodeEvent::QueryConnections { callback } => {
797818
// Reconstruct PeerIds from stored connections
798-
let connections: Vec<PeerId> = ctx.connections.iter()
819+
let connections: Vec<PeerId> = ctx
820+
.connections
821+
.iter()
799822
.map(|(addr, entry)| {
800823
if let Some(ref pub_key) = entry.pub_key {
801824
PeerId::new(*addr, pub_key.clone())
802825
} else {
803826
// Use our own pub_key as placeholder if we don't know the peer's
804-
PeerId::new(*addr, (*ctx.bridge.op_manager.ring.connection_manager.pub_key).clone())
827+
PeerId::new(
828+
*addr,
829+
(*ctx
830+
.bridge
831+
.op_manager
832+
.ring
833+
.connection_manager
834+
.pub_key)
835+
.clone(),
836+
)
805837
}
806838
})
807839
.collect();
@@ -861,12 +893,23 @@ impl P2pConnManager {
861893
}
862894

863895
// Reconstruct PeerIds from stored connections
864-
let connections: Vec<PeerId> = ctx.connections.iter()
896+
let connections: Vec<PeerId> = ctx
897+
.connections
898+
.iter()
865899
.map(|(addr, entry)| {
866900
if let Some(ref pub_key) = entry.pub_key {
867901
PeerId::new(*addr, pub_key.clone())
868902
} else {
869-
PeerId::new(*addr, (*ctx.bridge.op_manager.ring.connection_manager.pub_key).clone())
903+
PeerId::new(
904+
*addr,
905+
(*ctx
906+
.bridge
907+
.op_manager
908+
.ring
909+
.connection_manager
910+
.pub_key)
911+
.clone(),
912+
)
870913
}
871914
})
872915
.collect();
@@ -1894,12 +1937,16 @@ impl P2pConnManager {
18941937
}
18951938
let (tx, rx) = mpsc::channel(10);
18961939
tracing::debug!(self_peer = %self.bridge.op_manager.ring.connection_manager.pub_key, %peer_id, conn_map_size = self.connections.len(), "[CONN_TRACK] INSERT: OutboundConnectionSuccessful - adding to connections HashMap");
1897-
self.connections.insert(peer_id.addr, ConnectionEntry {
1898-
sender: tx,
1899-
pub_key: Some(peer_id.pub_key.clone()),
1900-
});
1940+
self.connections.insert(
1941+
peer_id.addr,
1942+
ConnectionEntry {
1943+
sender: tx,
1944+
pub_key: Some(peer_id.pub_key.clone()),
1945+
},
1946+
);
19011947
// Add to reverse lookup
1902-
self.addr_by_pub_key.insert(peer_id.pub_key.clone(), peer_id.addr);
1948+
self.addr_by_pub_key
1949+
.insert(peer_id.pub_key.clone(), peer_id.addr);
19031950
let Some(conn_events) = self.conn_event_tx.as_ref().cloned() else {
19041951
anyhow::bail!("Connection event channel not initialized");
19051952
};
@@ -2077,7 +2124,8 @@ impl P2pConnManager {
20772124
entry.pub_key = Some(new_peer_id.pub_key.clone());
20782125
}
20792126
// Add new reverse lookup
2080-
self.addr_by_pub_key.insert(new_peer_id.pub_key.clone(), remote_addr);
2127+
self.addr_by_pub_key
2128+
.insert(new_peer_id.pub_key.clone(), remote_addr);
20812129
}
20822130
}
20832131
}
@@ -2106,7 +2154,10 @@ impl P2pConnManager {
21062154
let peer = if let Some(ref pub_key) = entry.pub_key {
21072155
PeerId::new(remote_addr, pub_key.clone())
21082156
} else {
2109-
PeerId::new(remote_addr, (*self.bridge.op_manager.ring.connection_manager.pub_key).clone())
2157+
PeerId::new(
2158+
remote_addr,
2159+
(*self.bridge.op_manager.ring.connection_manager.pub_key).clone(),
2160+
)
21102161
};
21112162
// Remove from reverse lookup
21122163
if let Some(pub_key) = entry.pub_key {

crates/core/src/operations/subscribe.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,11 @@ async fn complete_local_subscription(
274274
key: ContractKey,
275275
) -> Result<(), OpError> {
276276
let subscriber = op_manager.ring.connection_manager.own_location();
277-
if let Err(err) = op_manager.ring.add_subscriber(&key, subscriber.clone()) {
277+
// Local subscription - no upstream NAT address
278+
if let Err(err) = op_manager
279+
.ring
280+
.add_subscriber(&key, subscriber.clone(), None)
281+
{
278282
tracing::warn!(
279283
%key,
280284
tx = %id,
@@ -451,9 +455,10 @@ impl Operation for SubscribeOp {
451455
"subscribe: handling RequestSub locally (contract available)"
452456
);
453457

458+
// Local registration - no upstream NAT address
454459
if op_manager
455460
.ring
456-
.add_subscriber(key, subscriber.clone())
461+
.add_subscriber(key, subscriber.clone(), None)
457462
.is_err()
458463
{
459464
tracing::warn!(
@@ -722,9 +727,10 @@ impl Operation for SubscribeOp {
722727
subscribers_before = ?before_direct,
723728
"subscribe: attempting to register direct subscriber"
724729
);
730+
// Local registration - no upstream NAT address
725731
if op_manager
726732
.ring
727-
.add_subscriber(key, subscriber.clone())
733+
.add_subscriber(key, subscriber.clone(), None)
728734
.is_err()
729735
{
730736
tracing::warn!(
@@ -872,9 +878,10 @@ impl Operation for SubscribeOp {
872878
subscribers_before = ?before_upstream,
873879
"subscribe: attempting to register upstream link"
874880
);
881+
// Local registration - no upstream NAT address
875882
if op_manager
876883
.ring
877-
.add_subscriber(key, upstream_subscriber.clone())
884+
.add_subscriber(key, upstream_subscriber.clone(), None)
878885
.is_err()
879886
{
880887
tracing::warn!(
@@ -904,7 +911,12 @@ impl Operation for SubscribeOp {
904911
subscribers_before = ?before_provider,
905912
"subscribe: registering provider/subscription source"
906913
);
907-
if op_manager.ring.add_subscriber(key, sender.clone()).is_err() {
914+
// Local registration - no upstream NAT address
915+
if op_manager
916+
.ring
917+
.add_subscriber(key, sender.clone(), None)
918+
.is_err()
919+
{
908920
// concurrently it reached max number of subscribers for this contract
909921
tracing::debug!(
910922
tx = %id,

crates/core/src/operations/update.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1005,10 +1005,10 @@ pub(crate) async fn request_update(
10051005
.closest_potentially_caching(&key, [sender.peer().clone()].as_slice());
10061006

10071007
if let Some(target) = remote_target {
1008-
// Subscribe to the contract
1008+
// Subscribe to the contract (local subscription, no upstream NAT addr)
10091009
op_manager
10101010
.ring
1011-
.add_subscriber(&key, sender.clone())
1011+
.add_subscriber(&key, sender.clone(), None)
10121012
.map_err(|_| RingError::NoCachingPeers(key))?;
10131013

10141014
target

crates/core/src/ring/connection_manager.rs

Lines changed: 0 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -373,90 +373,6 @@ impl ConnectionManager {
373373
None
374374
}
375375

376-
/// Look up a PeerKeyLocation by socket address from connections_by_location or transient connections.
377-
/// Used for connection-based routing when we need full peer info from just an address.
378-
pub fn get_peer_location_by_addr(&self, addr: SocketAddr) -> Option<PeerKeyLocation> {
379-
// Check connections by location
380-
let connections = self.connections_by_location.read();
381-
for conns in connections.values() {
382-
for conn in conns {
383-
if conn.location.addr() == addr {
384-
return Some(conn.location.clone());
385-
}
386-
}
387-
}
388-
drop(connections);
389-
390-
// Check transient connections - construct PeerKeyLocation from PeerId
391-
if let Some((peer, entry)) = self
392-
.transient_connections
393-
.iter()
394-
.find(|e| e.key().addr == addr)
395-
.map(|e| (e.key().clone(), e.value().clone()))
396-
{
397-
let mut pkl = PeerKeyLocation::new(peer.pub_key, peer.addr);
398-
pkl.location = entry.location;
399-
return Some(pkl);
400-
}
401-
None
402-
}
403-
404-
/// Look up a PeerKeyLocation by socket address from connections_by_location or transient connections.
405-
/// Used for connection-based routing when we need full peer info from just an address.
406-
pub fn get_peer_location_by_addr(&self, addr: SocketAddr) -> Option<PeerKeyLocation> {
407-
// Check connections by location
408-
let connections = self.connections_by_location.read();
409-
for conns in connections.values() {
410-
for conn in conns {
411-
if conn.location.addr() == addr {
412-
return Some(conn.location.clone());
413-
}
414-
}
415-
}
416-
drop(connections);
417-
418-
// Check transient connections - construct PeerKeyLocation from PeerId
419-
if let Some((peer, entry)) = self
420-
.transient_connections
421-
.iter()
422-
.find(|e| e.key().addr == addr)
423-
.map(|e| (e.key().clone(), e.value().clone()))
424-
{
425-
let mut pkl = PeerKeyLocation::new(peer.pub_key, peer.addr);
426-
pkl.location = entry.location;
427-
return Some(pkl);
428-
}
429-
None
430-
}
431-
432-
/// Look up a PeerKeyLocation by socket address from connections_by_location or transient connections.
433-
/// Used for connection-based routing when we need full peer info from just an address.
434-
pub fn get_peer_location_by_addr(&self, addr: SocketAddr) -> Option<PeerKeyLocation> {
435-
// Check connections by location
436-
let connections = self.connections_by_location.read();
437-
for conns in connections.values() {
438-
for conn in conns {
439-
if conn.location.addr() == addr {
440-
return Some(conn.location.clone());
441-
}
442-
}
443-
}
444-
drop(connections);
445-
446-
// Check transient connections - construct PeerKeyLocation from PeerId
447-
if let Some((peer, entry)) = self
448-
.transient_connections
449-
.iter()
450-
.find(|e| e.key().addr == addr)
451-
.map(|e| (e.key().clone(), e.value().clone()))
452-
{
453-
let mut pkl = PeerKeyLocation::new(peer.pub_key, peer.addr);
454-
pkl.location = entry.location;
455-
return Some(pkl);
456-
}
457-
None
458-
}
459-
460376
pub fn is_gateway(&self) -> bool {
461377
self.is_gateway
462378
}

crates/core/src/ring/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::topology::rate::Rate;
2020
use crate::topology::TopologyAdjustment;
2121
use crate::tracing::{NetEventLog, NetEventRegister};
2222

23-
use crate::transport::TransportPublicKey;
23+
use crate::transport::{ObservedAddr, TransportPublicKey};
2424
use crate::util::Contains;
2525
use crate::{
2626
config::GlobalExecutor,
@@ -327,12 +327,19 @@ impl Ring {
327327
}
328328

329329
/// Will return an error in case the max number of subscribers has been added.
330+
///
331+
/// The `upstream_addr` parameter is the transport-level address from which the subscribe
332+
/// message was received. This is used instead of the address embedded in `subscriber`
333+
/// because NAT peers may embed incorrect (e.g., loopback) addresses in their messages.
334+
/// The transport address is the only reliable way to route back to them.
330335
pub fn add_subscriber(
331336
&self,
332337
contract: &ContractKey,
333338
subscriber: PeerKeyLocation,
339+
upstream_addr: Option<ObservedAddr>,
334340
) -> Result<(), ()> {
335-
self.seeding_manager.add_subscriber(contract, subscriber)
341+
self.seeding_manager
342+
.add_subscriber(contract, subscriber, upstream_addr)
336343
}
337344

338345
/// Remove a subscriber by peer ID from a specific contract

crates/core/src/transport/mod.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,34 @@ type MessagePayload = Vec<u8>;
2525

2626
type PacketId = u32;
2727

28+
/// A wrapper around SocketAddr that represents an address observed at the transport layer.
29+
/// This is the "ground truth" for NAT scenarios - it's the actual address we see
30+
/// at the network layer, not what the peer claims in protocol messages.
31+
///
32+
/// Using a newtype instead of raw `SocketAddr` makes the address semantics explicit
33+
/// and prevents accidental confusion with advertised/claimed addresses.
34+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
35+
pub struct ObservedAddr(SocketAddr);
36+
37+
impl ObservedAddr {
38+
/// Get the underlying socket address.
39+
pub fn socket_addr(&self) -> SocketAddr {
40+
self.0
41+
}
42+
}
43+
44+
impl std::fmt::Display for ObservedAddr {
45+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46+
write!(f, "{}", self.0)
47+
}
48+
}
49+
50+
impl From<SocketAddr> for ObservedAddr {
51+
fn from(addr: SocketAddr) -> Self {
52+
Self(addr)
53+
}
54+
}
55+
2856
pub use self::crypto::{TransportKeypair, TransportPublicKey};
2957
pub(crate) use self::{
3058
connection_handler::{

0 commit comments

Comments
 (0)