Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,9 @@ CNode* CConnman::ConnectNode(CAddress addrConnect,
}
}

LogDebug(BCLog::NET, "trying %s connection %s lastseen=%.1fhrs\n",
LogDebug(BCLog::NET, "trying %s connection (%s) to %s, lastseen=%.1fhrs\n",
use_v2transport ? "v2" : "v1",
ConnectionTypeAsString(conn_type),
pszDest ? pszDest : addrConnect.ToStringAddrPort(),
Ticks<HoursDouble>(pszDest ? 0h : Now<NodeSeconds>() - addrConnect.nTime));

Expand Down
110 changes: 86 additions & 24 deletions test/functional/p2p_private_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
Test how locally submitted transactions are sent to the network when private broadcast is used.
"""

import re
import time
import threading

Expand Down Expand Up @@ -46,9 +47,6 @@
MiniWallet,
)

MAX_OUTBOUND_FULL_RELAY_CONNECTIONS = 8
MAX_BLOCK_RELAY_ONLY_CONNECTIONS = 2
NUM_INITIAL_CONNECTIONS = MAX_OUTBOUND_FULL_RELAY_CONNECTIONS + MAX_BLOCK_RELAY_ONLY_CONNECTIONS
NUM_PRIVATE_BROADCAST_PER_TX = 3

# Fill addrman with these addresses. Must have enough Tor addresses, so that even
Expand Down Expand Up @@ -184,24 +182,56 @@ def setup_nodes(self):

self.destinations_lock = threading.Lock()

def find_connection_type_in_debug_log(to_addr, to_port):
"""
Scan the debug log of tx_originator for a connection attempt to to_addr:to_port.
Return the connection type (outbound-full-relay, private-broadcast, etc) or
None if there is no connection attempt to to_addr:to_port.
"""
with open(self.tx_originator_debug_log_path, mode="r", encoding="utf-8") as debug_log:
for line in debug_log.readlines():
match = re.match(f".*trying v. connection \\((.+)\\) to \\[?{to_addr}]?:{to_port},.*", line)
if match:
return match.group(1)
return None

def destinations_factory(requested_to_addr, requested_to_port):
"""
Instruct the SOCKS5 proxy to redirect connections:
* The first automatic outbound connection -> P2PDataStore
* The first private broadcast connection -> nodes[1]
* Anything else -> P2PInterface
"""
conn_type = None
def found_connection_in_debug_log():
nonlocal conn_type
conn_type = find_connection_type_in_debug_log(requested_to_addr, requested_to_port)
return conn_type is not None

self.wait_until(found_connection_in_debug_log)

with self.destinations_lock:
i = len(self.destinations)
actual_to_addr = ""
actual_to_port = 0
listener = None
if i == NUM_INITIAL_CONNECTIONS:
target_name = ""
if conn_type == "private-broadcast" and not any(dest["conn_type"] == "private-broadcast" for dest in self.destinations):
# Instruct the SOCKS5 server to redirect the first private
# broadcast connection from nodes[0] to nodes[1]
actual_to_addr = "127.0.0.1" # nodes[1] listen address
actual_to_port = tor_port(1) # nodes[1] listen port for Tor
self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} to "
f"{format_addr_port(actual_to_addr, actual_to_port)} (nodes[1])")
target_name = "nodes[1]"
else:
# Create a Python P2P listening node and instruct the SOCKS5 proxy to
# redirect the connection to it. The first outbound connection is used
# later to serve GETDATA, thus make it P2PDataStore().
listener = P2PDataStore() if i == 0 else P2PInterface()
if conn_type == "outbound-full-relay" and not any(dest["conn_type"] == "outbound-full-relay" for dest in self.destinations):
listener = P2PDataStore()
target_name = "Python P2PDataStore"
else:
listener = P2PInterface()
target_name = "Python P2PInterface"
listener.peer_connect_helper(dstaddr="0.0.0.0", dstport=0, net=self.chain, timeout_factor=self.options.timeout_factor)
listener.peer_connect_send_version(services=P2P_SERVICES)

Expand All @@ -221,11 +251,14 @@ def on_listen_done(addr, port):
callback=on_listen_done)
# Wait until the callback has been called.
self.wait_until(lambda: actual_to_port != 0)
self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} to "
f"{format_addr_port(actual_to_addr, actual_to_port)} (a Python node)")

self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} ({conn_type}) for "
f"{format_addr_port(requested_to_addr, requested_to_port)} to "
f"{format_addr_port(actual_to_addr, actual_to_port)} ({target_name})")

self.destinations.append({
"requested_to": format_addr_port(requested_to_addr, requested_to_port),
"conn_type": conn_type,
"node": listener,
})
assert_equal(len(self.destinations), i + 1)
Expand Down Expand Up @@ -263,16 +296,26 @@ def setup_network(self):
self.setup_nodes()

def check_broadcasts(self, label, tx, broadcasts_to_expect, skip_destinations):
def wait_and_get_destination(n):
"""Wait for self.destinations[] to have at least n elements and return the 'n'th."""
def get_destinations_len():
with self.destinations_lock:
return len(self.destinations)
self.wait_until(lambda: get_destinations_len() > n)
with self.destinations_lock:
return self.destinations[n]

broadcasts_done = 0
i = skip_destinations - 1
while broadcasts_done < broadcasts_to_expect:
i += 1
self.log.debug(f"{label}: waiting for outbound connection i={i}")
# At this point the connection may not yet have been established (A),
# may be active (B), or may have already been closed (C).
self.wait_until(lambda: len(self.destinations) > i)
dest = self.destinations[i]
dest = wait_and_get_destination(i)
peer = dest["node"]
if peer is None:
continue # That is the first private broadcast connection, redirected to nodes[1]
peer.wait_until(lambda: peer.message_count["version"] == 1, check_connected=False)
# Now it is either (B) or (C).
if peer.last_message["version"].nServices != 0:
Expand Down Expand Up @@ -314,37 +357,32 @@ def check_broadcasts(self, label, tx, broadcasts_to_expect, skip_destinations):

def run_test(self):
tx_originator = self.nodes[0]
self.tx_originator_debug_log_path = tx_originator.debug_log_path
tx_receiver = self.nodes[1]
far_observer = tx_receiver.add_p2p_connection(P2PInterface())

wallet = MiniWallet(tx_originator)

# Fill tx_originator's addrman.
for addr in ADDRMAN_ADDRESSES:
res = tx_originator.addpeeraddress(address=addr, port=8333, tried=False)
res = tx_originator.addpeeraddress(address=addr, port=0 if addr.endswith(".i2p") else 8333, tried=False)
if not res["success"]:
self.log.debug(f"Could not add {addr} to tx_originator's addrman (collision?)")

self.wait_until(lambda: len(self.destinations) == NUM_INITIAL_CONNECTIONS)

# The next opened connection by tx_originator should be "private broadcast"
# for sending the transaction. The SOCKS5 proxy should redirect it to tx_receiver.

txs = wallet.create_self_transfer_chain(chain_length=3)
self.log.info(f"Created txid={txs[0]['txid']}: for basic test")
self.log.info(f"Created txid={txs[1]['txid']}: for broadcast with dependency in mempool + rebroadcast")
self.log.info(f"Created txid={txs[2]['txid']}: for broadcast with dependency not in mempool")
tx_originator.sendrawtransaction(hexstring=txs[0]["hex"], maxfeerate=0.1)

self.log.debug(f"Waiting for outbound connection i={NUM_INITIAL_CONNECTIONS}, "
"must be the first private broadcast connection")
self.log.info("First private broadcast: waiting for the transaction to reach the recipient")
self.wait_until(lambda: len(tx_receiver.getrawmempool()) > 0)
self.log.info("First private broadcast: the recipient received the transaction")
far_observer.wait_for_tx(txs[0]["txid"])
self.log.info(f"Outbound connection i={NUM_INITIAL_CONNECTIONS}: "
"the private broadcast target received and further relayed the transaction")
self.log.info("First private broadcast: the recipient further relayed the transaction")

# One already checked above, check the other NUM_PRIVATE_BROADCAST_PER_TX - 1 broadcasts.
self.check_broadcasts("Basic", txs[0], NUM_PRIVATE_BROADCAST_PER_TX - 1, NUM_INITIAL_CONNECTIONS + 1)
self.check_broadcasts("Basic", txs[0], NUM_PRIVATE_BROADCAST_PER_TX - 1, 0)

self.log.info("Resending the same transaction via RPC again (it is not in the mempool yet)")
ignoring_msg = f"Ignoring unnecessary request to schedule an already scheduled transaction: txid={txs[0]['txid']}, wtxid={txs[0]['wtxid']}"
Expand All @@ -364,8 +402,32 @@ def run_test(self):
wtxid_int = int(txs[0]["wtxid"], 16)
inv = CInv(MSG_WTX, wtxid_int)

tx_returner = None # First outbound-full-relay, will be P2PDataStore.
other_peer = None # Any other outbound-full-relay, we use the second one.

def set_tx_returner_and_other():
nonlocal tx_returner
nonlocal other_peer
tx_returner = None
other_peer = None
with self.destinations_lock:
for dest in self.destinations:
if dest["conn_type"] == "outbound-full-relay" and dest["node"] is not None:
if tx_returner is None:
assert(type(dest["node"]) is P2PDataStore)
tx_returner = dest["node"]
else:
assert(type(dest["node"]) is P2PInterface)
other_peer = dest["node"]
return True
return False

self.wait_until(set_tx_returner_and_other)

tx_returner.wait_for_connect()
other_peer.wait_for_connect()

self.log.info("Sending INV and waiting for GETDATA from node")
tx_returner = self.destinations[0]["node"] # Will return the transaction back to the originator.
tx_returner.tx_store[wtxid_int] = txs[0]["tx"]
assert "getdata" not in tx_returner.last_message
received_back_msg = f"Received our privately broadcast transaction (txid={txs[0]['txid']}) from the network"
Expand All @@ -375,7 +437,7 @@ def run_test(self):
self.wait_until(lambda: len(tx_originator.getrawmempool()) > 0)

self.log.info("Waiting for normal broadcast to another peer")
self.destinations[1]["node"].wait_for_inv([inv])
other_peer.wait_for_inv([inv])

self.log.info("Checking getprivatebroadcastinfo no longer reports the transaction after it is received back")
pbinfo = tx_originator.getprivatebroadcastinfo()
Expand Down
Loading