Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ services:
stdin_open: true
tty: true
networks:
- onion-network
onion-network:
ipv4_address: 10.12.137.21

config-node:
build: .
container_name: config-node
environment:
- BOOTSTRAP_HOST=bootstrap-node
- BOOTSTRAP_HOST=10.12.137.21
- NODE_PORT=12128
ports:
- "12128:12128"
Expand All @@ -30,7 +31,7 @@ services:
peer:
build: .
environment:
- BOOTSTRAP_HOST=bootstrap-node
- BOOTSTRAP_HOST=10.12.137.21
depends_on:
- bootstrap-node
stdin_open: true
Expand All @@ -40,4 +41,8 @@ services:

networks:
onion-network:
driver: bridge
driver: bridge
ipam:
config:
- subnet: 10.12.137.0/24
gateway: 10.12.137.1
43 changes: 27 additions & 16 deletions src/main/java/dev/network/NetworkManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ public void registerPeer(Peer peer) {

if (getConnectedPeerCount() >= config.getMaxConnections()) {
logger.warn("Max peers reached. Cannot register new peer: {}", peer.getPeerId());
Collections.shuffle(getKnownPeers());
peer.send(MessageBuilder.buildPeerResponseMessage(getKnownPeers().stream().limit(5).toList()));
List<PeerInfo> sample = new ArrayList<>(getKnownPeers());
Collections.shuffle(sample);
peer.send(MessageBuilder.buildPeerResponseMessage(sample.stream().limit(config.getMinConnections()).toList()));
peer.disconnect();
return;
}
Expand All @@ -118,40 +119,50 @@ public void startPeerMaintenance() {
logger.info(" >--> Connected: {} | Known: {} <--<", getConnectedPeers().size(), getKnownPeers().size());
if (getConnectedPeerCount() >= config.getMaxConnections()) return;

List<PeerInfo> candidates = new ArrayList<>(knownPeers
.stream()
.filter(peer -> !connectedPeers
.containsKey(peer.getPublicKey()) && !peer.getPublicKey()
.equals(encodedPublicKey))
.toList());
if (!config.isBootstrapNode() && getConnectedPeerCount() == 0) {
logger.info("Isolated peer. Retrying bootstrap on {}:{}", config.getBootstrapNodeHost(), config.getBootstrapNodePort());
peerExecutor.submit(() -> connectToPeer(config.getBootstrapNodeHost(), config.getBootstrapNodePort()));
return;
}

int targetConnections = config.getMinConnections();
int missing = targetConnections - getConnectedPeerCount();
if (missing <= 0) return;

List<PeerInfo> candidates = new ArrayList<>(getKnownPeers());

candidates.removeIf(peer ->
connectedPeers.containsKey(peer.getPublicKey()) ||
peer.getPublicKey().equals(encodedPublicKey));

Collections.shuffle(candidates);

for (PeerInfo info : candidates) {
logger.info("Trying to connect to peer: {}:{}", info.host, info.port);
if (connectedPeers.size() > config.getMaxConnections()) break;
for (PeerInfo info : candidates.stream().limit(missing).toList()) {
peerExecutor.submit(() -> connectToPeer(info.host, info.port));
}
}

// GPT helper understand the exponential back-off, specifically how to define timeout interval
public void connectToPeer(String ip, int port) {
logger.info("Trying to connect to peer: {}:{}", ip, port);
int attempts = config.getConnectionRetryAttempts();
int retries = 0;
while (retries++ < attempts) {

for (int retries = 1; retries < attempts; retries++) {
try {
Socket socket = new Socket();
socket.connect(new InetSocketAddress(ip, port), config.getConnectionTimeoutInMilliseconds());
logger.info("Connected to node: {}", socket.getRemoteSocketAddress());
peerExecutor.submit(new Peer(socket, queue, this, PeerDirection.OUTBOUND));
return;
} catch (IOException e) {
// if (retries == attempts) break;
long timeout = (1L << (retries - 1)) * 100;
logger.warn(e, "Failed connecting to {}:{} (attempt {}/{}). Retrying in {}ms...", ip, port, retries, attempts, timeout);
try {
long timeout = (1L << (retries - 1)) * 100;
logger.warn("Failed connecting to {}:{} (attempt {}/{}). Retrying in {}ms...", ip, port, retries, attempts, timeout);
if (retries == attempts) continue;
Thread.sleep(timeout);
} catch (InterruptedException ex) {
logger.error("Interrupted while waiting for outbound node.", ex);
// Thread.currentThread().interrupt(); // figure out why
return;
}
}
Expand Down
19 changes: 14 additions & 5 deletions src/main/java/dev/network/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;

public class Server extends Thread {
private final Logger logger;
Expand All @@ -30,16 +31,18 @@ public Server(Config config, MessageQueue queue, NetworkManager networkManager,

@Override
public void run() {
try (ServerSocket serverSocket = new ServerSocket(config.getNodePort())) {
int backlog = config.getMaxConnections();
try (ServerSocket serverSocket = new ServerSocket(config.getNodePort(), backlog)) {
logger.info("Server started and waiting for connections on port " + config.getNodePort());
if (!config.isBootstrapNode()) peerExecutor.submit(this::connectToBootstrapNodes);

while (!this.isInterrupted()) {
Socket clientSocket = serverSocket.accept();
logger.info("======= New connection: =======");
logger.info(" -> Remote IP: " + clientSocket.getInetAddress().getHostAddress());
logger.info(" -> Remote Port: " + clientSocket.getPort());
logger.info("===============================");
logger.info("""
=========== New connection: ===========
-> Remote IP: {}:{}
=======================================
""", clientSocket.getInetAddress().getHostAddress(), clientSocket.getPort());
peerExecutor.submit(new Peer(clientSocket, queue, networkManager, PeerDirection.INBOUND));
}
} catch (BindException e) {
Expand All @@ -54,6 +57,12 @@ public void run() {
}

private void connectToBootstrapNodes() {
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(0, 1000 * 30));
} catch (InterruptedException e) {
logger.alert(e, "Connection initialization cooldown timeout interrupted...");
}

logger.info("Connecting to the bootstrap node at {}:{}", config.getBootstrapNodeHost(), config.getBootstrapNodePort());
networkManager.connectToPeer(config.getBootstrapNodeHost(), config.getBootstrapNodePort());
}
Expand Down
32 changes: 0 additions & 32 deletions src/main/resources/logback.xml

This file was deleted.

2 changes: 1 addition & 1 deletion src/main/resources/node.bootstrap.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ node.port=12137
node.bootstrap=true

node.connections.max=100
node.connections.min=3
node.connections.min=10

peer.discovery.init=5
peer.discovery.delay=30
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/node.peer.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ node.port=12138
node.bootstrap=false

node.connections.max=100
node.connections.min=3
node.connections.min=10

peer.discovery.init=5
peer.discovery.delay=30
Expand Down
4 changes: 2 additions & 2 deletions src/main/resources/node.peer.test1.properties
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
node.port=12141
node.bootstrap=false

node.connections.max=5
node.connections.min=3
node.connections.max=15
node.connections.min=10

peer.discovery.init=5
peer.discovery.delay=30
Expand Down
4 changes: 2 additions & 2 deletions src/main/resources/node.peer.test2.properties
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
node.port=12142
node.bootstrap=false

node.connections.max=5
node.connections.min=3
node.connections.max=15
node.connections.min=10

peer.discovery.init=5
peer.discovery.delay=30
Expand Down
4 changes: 2 additions & 2 deletions src/main/resources/node.peer.test3.properties
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
node.port=12143
node.bootstrap=false

node.connections.max=5
node.connections.min=3
node.connections.max=15
node.connections.min=10

peer.discovery.init=5
peer.discovery.delay=30
Expand Down