The returned instance can be used to cancel the task. Note that {@link Cancellable#cancel()}
+ * doesn't interrupt already running task.
+ *
+ *
Whenever the {@code runnable} throws exception it is notified to the {@code
+ * exceptionHandler} and the task recurring executions are not interrupted
+ */
+ default Cancellable runWithFixedDelay(
+ final ExceptionThrowingRunnable runnable,
+ final Duration delay,
+ final Consumer exceptionHandler) {
+ Preconditions.checkNotNull(exceptionHandler);
+
+ Cancellable cancellable = FutureUtil.createCancellable();
+ FutureUtil.runWithFixedDelay(this, runnable, cancellable, delay, exceptionHandler);
+ return cancellable;
+ }
+
+ /**
+ * Execute the future supplier until it completes normally up to some maximum number of retries.
+ *
+ * @param action The action to run
+ * @param retryDelay The time to wait before retrying
+ * @param maxRetries The maximum number of retries. A value of 0 means the action is run only once
+ * (no retries).
+ * @param The value returned by the action future
+ * @return A future that resolves with the first successful result, or else an error if the
+ * maximum retries are exhausted.
+ */
+ default SafeFuture runWithRetry(
+ final ExceptionThrowingFutureSupplier action,
+ final Duration retryDelay,
+ final int maxRetries) {
+
+ return SafeFuture.of(action)
+ .exceptionallyCompose(
+ err -> {
+ if (maxRetries > 0) {
+ // Retry after delay, decrementing the remaining available retries
+ final int remainingRetries = maxRetries - 1;
+ return runAfterDelay(
+ () -> runWithRetry(action, retryDelay, remainingRetries), retryDelay);
+ } else {
+ return SafeFuture.failedFuture(err);
+ }
+ });
+ }
+}
diff --git a/src/org/minima/system/network/base/Cancellable.java b/src/org/minima/system/network/base/Cancellable.java
new file mode 100644
index 000000000..9b65a8d7a
--- /dev/null
+++ b/src/org/minima/system/network/base/Cancellable.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2020 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.minima.system.network.base;
+
+public interface Cancellable {
+
+ void cancel();
+
+ boolean isCancelled();
+}
diff --git a/src/org/minima/system/network/base/ConnectionManager.java b/src/org/minima/system/network/base/ConnectionManager.java
new file mode 100644
index 000000000..b412193c5
--- /dev/null
+++ b/src/org/minima/system/network/base/ConnectionManager.java
@@ -0,0 +1,236 @@
+/*
+ * Copyright 2020 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.minima.system.network.base;
+
+import static java.util.stream.Collectors.toList;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.minima.system.network.base.metrics.Counter;
+import org.minima.system.network.base.metrics.LabelledMetric;
+import org.minima.system.network.base.metrics.MetricsSystem;
+import org.minima.system.network.base.metrics.TekuMetricCategory;
+import org.minima.system.network.base.peer.DisconnectReason;
+import org.minima.system.network.base.peer.DiscoveryPeer;
+import org.minima.system.network.base.peer.Peer;
+// // import org.hyperledger.besu.plugin.services.MetricsSystem;
+// // import org.hyperledger.besu.plugin.services.metrics.Counter;
+// // import org.hyperledger.besu.plugin.services.metrics.LabelledMetric;
+// import tech.pegasys.teku.infrastructure.async.AsyncRunner;
+// import tech.pegasys.teku.infrastructure.async.Cancellable;
+// import tech.pegasys.teku.infrastructure.async.SafeFuture;
+// import tech.pegasys.teku.infrastructure.metrics.TekuMetricCategory;
+// import tech.pegasys.teku.networking.p2p.connection.PeerPools.PeerPool;
+// import tech.pegasys.teku.networking.p2p.discovery.DiscoveryPeer;
+// import tech.pegasys.teku.networking.p2p.discovery.DiscoveryService;
+// import tech.pegasys.teku.networking.p2p.network.P2PNetwork;
+// import tech.pegasys.teku.networking.p2p.network.PeerAddress;
+// import tech.pegasys.teku.networking.p2p.peer.DisconnectReason;
+// import tech.pegasys.teku.networking.p2p.peer.Peer;
+// import tech.pegasys.teku.service.serviceutils.Service;
+import org.minima.system.network.base.peer.PeerAddress;
+import org.minima.system.network.base.peer.PeerPools;
+import org.minima.system.network.base.peer.PeerSelectionStrategy;
+import org.minima.system.network.base.peer.PeerPools.PeerPool;
+
+public class ConnectionManager extends Service {
+ private static final Logger LOG = LogManager.getLogger(ConnectionManager.class);
+ private static final Duration RECONNECT_TIMEOUT = Duration.ofSeconds(20);
+ private static final Duration DISCOVERY_INTERVAL = Duration.ofSeconds(30);
+ private final AsyncRunner asyncRunner;
+ private final P2PNetwork extends Peer> network;
+ private final Set staticPeers;
+ private final DiscoveryService discoveryService;
+ private final PeerSelectionStrategy peerSelectionStrategy;
+ private final Counter attemptedConnectionCounter;
+ private final Counter successfulConnectionCounter;
+ private final Counter failedConnectionCounter;
+ private final PeerPools peerPools = new PeerPools();
+ private final Collection> peerPredicates = new CopyOnWriteArrayList<>();
+
+ private volatile long peerConnectedSubscriptionId;
+ private volatile Cancellable periodicPeerSearch;
+
+ public ConnectionManager(
+ final MetricsSystem metricsSystem,
+ final DiscoveryService discoveryService,
+ final AsyncRunner asyncRunner,
+ final P2PNetwork extends Peer> network,
+ final PeerSelectionStrategy peerSelectionStrategy,
+ final List peerAddresses) {
+ this.asyncRunner = asyncRunner;
+ this.network = network;
+ this.staticPeers = new HashSet<>(peerAddresses);
+ this.discoveryService = discoveryService;
+ this.peerSelectionStrategy = peerSelectionStrategy;
+
+ final LabelledMetric connectionAttemptCounter =
+ metricsSystem.createLabelledCounter(
+ TekuMetricCategory.NETWORK,
+ "peer_connection_attempt_count",
+ "Total number of outbound connection attempts made",
+ "status");
+ attemptedConnectionCounter = connectionAttemptCounter.labels("attempted");
+ successfulConnectionCounter = connectionAttemptCounter.labels("successful");
+ failedConnectionCounter = connectionAttemptCounter.labels("failed");
+ }
+
+ @Override
+ protected SafeFuture> doStart() {
+ LOG.trace("Starting discovery manager");
+ synchronized (this) {
+ staticPeers.forEach(this::createPersistentConnection);
+ }
+ periodicPeerSearch =
+ asyncRunner.runWithFixedDelay(
+ this::searchForPeers,
+ DISCOVERY_INTERVAL,
+ error -> LOG.error("Error while searching for peers", error));
+ connectToKnownPeers();
+ searchForPeers();
+ peerConnectedSubscriptionId = network.subscribeConnect(this::onPeerConnected);
+ return SafeFuture.COMPLETE;
+ }
+
+ private void connectToKnownPeers() {
+ peerSelectionStrategy
+ .selectPeersToConnect(
+ network,
+ peerPools,
+ () -> discoveryService.streamKnownPeers().filter(this::isPeerValid).collect(toList()))
+ .forEach(this::attemptConnection);
+ }
+
+ private void searchForPeers() {
+ if (!isRunning()) {
+ LOG.debug("Not running so not searching for peers");
+ return;
+ }
+ LOG.debug("Searching for peers");
+ discoveryService
+ .searchForPeers()
+ .orTimeout(10, TimeUnit.SECONDS)
+ .finish(
+ this::connectToKnownPeers,
+ error -> {
+ LOG.debug("Discovery failed", error);
+ connectToKnownPeers();
+ });
+ }
+
+ private void attemptConnection(final PeerAddress peerAddress) {
+ LOG.debug("Attempting to connect to {}", peerAddress.getId());
+ attemptedConnectionCounter.inc();
+ network
+ .connect(peerAddress)
+ .finish(
+ peer -> {
+ LOG.debug("Successfully connected to peer {}", peer.getId());
+ successfulConnectionCounter.inc();
+ peer.subscribeDisconnect(
+ (reason, locallyInitiated) -> peerPools.forgetPeer(peer.getId()));
+ },
+ error -> {
+ LOG.debug(() -> "Failed to connect to peer: " + peerAddress.getId(), error);
+ failedConnectionCounter.inc();
+ peerPools.forgetPeer(peerAddress.getId());
+ });
+ }
+
+ private void onPeerConnected(final Peer peer) {
+ peerSelectionStrategy
+ .selectPeersToDisconnect(network, peerPools)
+ .forEach(
+ peerToDrop ->
+ peerToDrop.disconnectCleanly(DisconnectReason.TOO_MANY_PEERS).reportExceptions());
+ }
+
+ @Override
+ protected SafeFuture> doStop() {
+ network.unsubscribeConnect(peerConnectedSubscriptionId);
+ final Cancellable peerSearchTask = this.periodicPeerSearch;
+ if (peerSearchTask != null) {
+ peerSearchTask.cancel();
+ }
+ return SafeFuture.COMPLETE;
+ }
+
+ public synchronized void addStaticPeer(final PeerAddress peerAddress) {
+ if (!staticPeers.contains(peerAddress)) {
+ staticPeers.add(peerAddress);
+ createPersistentConnection(peerAddress);
+ }
+ }
+
+ private void createPersistentConnection(final PeerAddress peerAddress) {
+ maintainPersistentConnection(peerAddress).reportExceptions();
+ }
+
+ private SafeFuture maintainPersistentConnection(final PeerAddress peerAddress) {
+ if (!isRunning()) {
+ // We've been stopped so halt the process.
+ return new SafeFuture<>();
+ }
+ LOG.debug("Connecting to peer {}", peerAddress);
+ peerPools.addPeerToPool(peerAddress.getId(), PeerPool.STATIC);
+ attemptedConnectionCounter.inc();
+ return network
+ .connect(peerAddress)
+ .thenApply(
+ peer -> {
+ LOG.debug("Connection to peer {} was successful", peer.getId());
+ successfulConnectionCounter.inc();
+ peer.subscribeDisconnect(
+ (reason, locallyInitiated) -> {
+ LOG.debug(
+ "Peer {} disconnected. Will try to reconnect in {} sec",
+ peerAddress,
+ RECONNECT_TIMEOUT.toSeconds());
+ asyncRunner
+ .runAfterDelay(
+ () -> maintainPersistentConnection(peerAddress), RECONNECT_TIMEOUT)
+ .reportExceptions();
+ });
+ return peer;
+ })
+ .exceptionallyCompose(
+ error -> {
+ LOG.debug(
+ "Connection to {} failed: {}. Will retry in {} sec",
+ peerAddress,
+ error,
+ RECONNECT_TIMEOUT.toSeconds());
+ failedConnectionCounter.inc();
+ return asyncRunner.runAfterDelay(
+ () -> maintainPersistentConnection(peerAddress), RECONNECT_TIMEOUT);
+ });
+ }
+
+ public void addPeerPredicate(final Predicate predicate) {
+ peerPredicates.add(predicate);
+ }
+
+ private boolean isPeerValid(DiscoveryPeer peer) {
+ return !peer.getNodeAddress().getAddress().isAnyLocalAddress()
+ && peerPredicates.stream().allMatch(predicate -> predicate.test(peer));
+ }
+}
diff --git a/src/org/minima/system/network/base/DelayedExecutorAsyncRunner.java b/src/org/minima/system/network/base/DelayedExecutorAsyncRunner.java
new file mode 100644
index 000000000..eff7cf3b3
--- /dev/null
+++ b/src/org/minima/system/network/base/DelayedExecutorAsyncRunner.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2020 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.minima.system.network.base;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+/**
+ * An AsyncRunner that uses the common ForkJoinPool so that it is guaranteed not to leak threads
+ * even if the test doesn't shut it down.
+ */
+public class DelayedExecutorAsyncRunner implements AsyncRunner {
+ private static final Logger LOG = LogManager.getLogger();
+ private final ExecutorFactory executorFactory;
+
+ private DelayedExecutorAsyncRunner(ExecutorFactory executorFactory) {
+ this.executorFactory = executorFactory;
+ }
+
+ public static DelayedExecutorAsyncRunner create() {
+ return new DelayedExecutorAsyncRunner(CompletableFuture::delayedExecutor);
+ }
+
+ @Override
+ public SafeFuture runAsync(final ExceptionThrowingFutureSupplier action) {
+ final Executor executor = getAsyncExecutor();
+ return runAsync(action, executor);
+ }
+
+ @Override
+ public SafeFuture runAfterDelay(
+ ExceptionThrowingFutureSupplier action, Duration delay) {
+ final Executor executor = getDelayedExecutor(delay.toMillis(), TimeUnit.MILLISECONDS);
+ return runAsync(action, executor);
+ }
+
+ @Override
+ public void shutdown() {}
+
+ @VisibleForTesting
+ SafeFuture runAsync(
+ final ExceptionThrowingFutureSupplier action, final Executor executor) {
+ final SafeFuture result = new SafeFuture<>();
+ try {
+ executor.execute(() -> SafeFuture.of(action).propagateTo(result));
+ } catch (final RejectedExecutionException ex) {
+ LOG.debug("shutting down ", ex);
+ } catch (final Throwable t) {
+ result.completeExceptionally(t);
+ }
+ return result;
+ }
+
+ private Executor getAsyncExecutor() {
+ return getDelayedExecutor(-1, TimeUnit.SECONDS);
+ }
+
+ private Executor getDelayedExecutor(long delayAmount, TimeUnit delayUnit) {
+ return executorFactory.create(delayAmount, delayUnit);
+ }
+
+ private interface ExecutorFactory {
+ Executor create(long delayAmount, TimeUnit delayUnit);
+ }
+}
diff --git a/src/org/minima/system/network/base/DelegatingP2PNetwork.java b/src/org/minima/system/network/base/DelegatingP2PNetwork.java
new file mode 100644
index 000000000..7733df2bc
--- /dev/null
+++ b/src/org/minima/system/network/base/DelegatingP2PNetwork.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.minima.system.network.base;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+// import org.apache.tuweni.bytes.Bytes;
+// import tech.pegasys.teku.infrastructure.async.SafeFuture;
+// import tech.pegasys.teku.networking.p2p.discovery.DiscoveryPeer;
+// import tech.pegasys.teku.networking.p2p.gossip.TopicChannel;
+// import tech.pegasys.teku.networking.p2p.gossip.TopicHandler;
+// import tech.pegasys.teku.networking.p2p.gossip.config.GossipTopicsScoringConfig;
+// import tech.pegasys.teku.networking.p2p.peer.NodeId;
+// import tech.pegasys.teku.networking.p2p.peer.Peer;
+
+import org.apache.tuweni.bytes.Bytes;
+import org.minima.system.network.base.gossip.TopicChannel;
+import org.minima.system.network.base.gossip.TopicHandler;
+import org.minima.system.network.base.gossip.config.GossipTopicsScoringConfig;
+import org.minima.system.network.base.peer.DiscoveryPeer;
+import org.minima.system.network.base.peer.NodeId;
+import org.minima.system.network.base.peer.Peer;
+import org.minima.system.network.base.peer.PeerAddress;
+
+public abstract class DelegatingP2PNetwork implements P2PNetwork {
+ private final P2PNetwork> network;
+
+ protected DelegatingP2PNetwork(final P2PNetwork> network) {
+ this.network = network;
+ }
+
+ @Override
+ public SafeFuture connect(final PeerAddress peer) {
+ return network.connect(peer);
+ }
+
+ @Override
+ public PeerAddress createPeerAddress(final DiscoveryPeer discoveryPeer) {
+ return network.createPeerAddress(discoveryPeer);
+ }
+
+ @Override
+ public NodeId parseNodeId(final String nodeId) {
+ return network.parseNodeId(nodeId);
+ }
+
+ @Override
+ public boolean isConnected(final PeerAddress peerAddress) {
+ return network.isConnected(peerAddress);
+ }
+
+ @Override
+ public Bytes getPrivateKey() {
+ return network.getPrivateKey();
+ }
+
+ @Override
+ public PeerAddress createPeerAddress(final String peerAddress) {
+ return network.createPeerAddress(peerAddress);
+ }
+
+ @Override
+ public int getPeerCount() {
+ return network.getPeerCount();
+ }
+
+ @Override
+ public String getNodeAddress() {
+ return network.getNodeAddress();
+ }
+
+ @Override
+ public NodeId getNodeId() {
+ return network.getNodeId();
+ }
+
+ @Override
+ public Optional getEnr() {
+ return network.getEnr();
+ }
+
+ @Override
+ public Optional getDiscoveryAddress() {
+ return network.getDiscoveryAddress();
+ }
+
+ @Override
+ public int getListenPort() {
+ return network.getListenPort();
+ }
+
+ @Override
+ public SafeFuture> start() {
+ return network.start();
+ }
+
+ @Override
+ public SafeFuture> stop() {
+ return network.stop();
+ }
+
+ @Override
+ public SafeFuture> gossip(final String topic, final Bytes data) {
+ return network.gossip(topic, data);
+ }
+
+ @Override
+ public TopicChannel subscribe(final String topic, final TopicHandler topicHandler) {
+ return network.subscribe(topic, topicHandler);
+ }
+
+ @Override
+ public Map> getSubscribersByTopic() {
+ return network.getSubscribersByTopic();
+ }
+
+ @Override
+ public void updateGossipTopicScoring(final GossipTopicsScoringConfig config) {
+ network.updateGossipTopicScoring(config);
+ }
+}
diff --git a/src/org/minima/system/network/base/DiscV5Service.java b/src/org/minima/system/network/base/DiscV5Service.java
new file mode 100644
index 000000000..3e85ae9cf
--- /dev/null
+++ b/src/org/minima/system/network/base/DiscV5Service.java
@@ -0,0 +1,174 @@
+/*
+ * Copyright 2020 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.minima.system.network.base;
+
+//import static tech.pegasys.teku.util.config.Constants.ATTESTATION_SUBNET_COUNT;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import org.apache.tuweni.bytes.Bytes;
+import org.minima.system.network.base.peer.DiscoveryPeer;
+import org.minima.system.network.base.peer.MultiaddrUtil;
+import org.minima.system.network.base.ssz.SszBitvector;
+import org.apache.tuweni.units.bigints.UInt64;
+import org.ethereum.beacon.discovery.DiscoverySystem;
+import org.ethereum.beacon.discovery.DiscoverySystemBuilder;
+import org.ethereum.beacon.discovery.schema.EnrField;
+import org.ethereum.beacon.discovery.schema.NodeRecord;
+import org.ethereum.beacon.discovery.schema.NodeRecordBuilder;
+import org.ethereum.beacon.discovery.schema.NodeRecordInfo;
+import org.ethereum.beacon.discovery.schema.NodeStatus;
+import org.ethereum.beacon.discovery.storage.NewAddressHandler;
+//import tech.pegasys.teku.infrastructure.async.SafeFuture;
+
+// import tech.pegasys.teku.networking.p2p.discovery.DiscoveryConfig;
+// import tech.pegasys.teku.networking.p2p.discovery.DiscoveryPeer;
+// import tech.pegasys.teku.networking.p2p.discovery.DiscoveryService;
+// import tech.pegasys.teku.networking.p2p.libp2p.MultiaddrUtil;
+// import tech.pegasys.teku.networking.p2p.network.config.NetworkConfig;
+// import tech.pegasys.teku.service.serviceutils.Service;
+// import tech.pegasys.teku.ssz.collections.SszBitvector;
+// import tech.pegasys.teku.ssz.schema.collections.SszBitvectorSchema;
+// import tech.pegasys.teku.storage.store.KeyValueStore;
+import org.minima.system.network.base.ssz.SszBitvectorSchema;
+
+public class DiscV5Service extends Service implements DiscoveryService {
+ private static final String SEQ_NO_STORE_KEY = "local-enr-seqno";
+ static final SszBitvectorSchema SUBNET_SUBSCRIPTIONS_SCHEMA =
+ SszBitvectorSchema.create(DiscoveryNetwork.ATTESTATION_SUBNET_COUNT);
+
+ public static DiscoveryService create(
+ final DiscoveryConfig discoConfig,
+ final NetworkConfig p2pConfig,
+ final KeyValueStore kvStore,
+ final Bytes privateKey) {
+ return new DiscV5Service(discoConfig, p2pConfig, kvStore, privateKey);
+ }
+
+ private final DiscoverySystem discoverySystem;
+ private final KeyValueStore kvStore;
+
+ private static final Logger LOG = LogManager.getLogger(DiscV5Service.class);
+
+ private DiscV5Service(
+ final DiscoveryConfig discoConfig,
+ NetworkConfig p2pConfig,
+ KeyValueStore kvStore,
+ final Bytes privateKey) {
+ final String listenAddress = p2pConfig.getNetworkInterface();
+ final int listenPort = p2pConfig.getListenPort();
+ final String advertisedAddress = p2pConfig.getAdvertisedIp();
+ final int advertisedPort = p2pConfig.getAdvertisedPort();
+ final List bootnodes = discoConfig.getBootnodes();
+ final UInt64 seqNo =
+ kvStore.get(SEQ_NO_STORE_KEY).map(UInt64::fromBytes).orElse(UInt64.ZERO).add(1);
+ final NewAddressHandler maybeUpdateNodeRecordHandler =
+ maybeUpdateNodeRecord(p2pConfig.hasUserExplicitlySetAdvertisedIp());
+ discoverySystem =
+ new DiscoverySystemBuilder()
+ .listen(listenAddress, listenPort)
+ .privateKey(privateKey)
+ .bootnodes(bootnodes.toArray(new String[0]))
+ .localNodeRecord(
+ new NodeRecordBuilder()
+ .privateKey(privateKey)
+ .address(advertisedAddress, advertisedPort)
+ .seq(seqNo)
+ .build())
+ .newAddressHandler(maybeUpdateNodeRecordHandler)
+ .localNodeRecordListener(this::localNodeRecordUpdated)
+ .build();
+ this.kvStore = kvStore;
+ }
+
+ private NewAddressHandler maybeUpdateNodeRecord(boolean userExplicitlySetAdvertisedIpOrPort) {
+ return (oldRecord, proposedNewRecord) -> {
+ if (userExplicitlySetAdvertisedIpOrPort) {
+ return Optional.of(oldRecord);
+ } else {
+ return Optional.of(proposedNewRecord);
+ }
+ };
+ }
+
+ private void localNodeRecordUpdated(NodeRecord oldRecord, NodeRecord newRecord) {
+ LOG.info("Updating NodeRecord for " + newRecord.getNodeId() + "(" + newRecord.getTcpAddress() + ")");
+ kvStore.put(SEQ_NO_STORE_KEY, newRecord.getSeq().toBytes());
+ }
+
+ @Override
+ protected SafeFuture> doStart() {
+ LOG.info("Starting discovery system");
+ return SafeFuture.of(discoverySystem.start());
+ }
+
+ @Override
+ protected SafeFuture> doStop() {
+ LOG.info("Stopping discovery system");
+ discoverySystem.stop();
+ return SafeFuture.completedFuture(null);
+ }
+
+ @Override
+ public Stream streamKnownPeers() {
+// LOG.info("Returning all active nodes as known peers - " + activeNodes().count());
+ return activeNodes().map(NodeRecordConverter::convertToDiscoveryPeer).flatMap(Optional::stream);
+ }
+
+ @Override
+ public SafeFuture searchForPeers() {
+ LOG.info("Searching for new peers");
+ return SafeFuture.of(discoverySystem.searchForNewPeers());
+ }
+
+ @Override
+ public Optional getEnr() {
+ return Optional.of(discoverySystem.getLocalNodeRecord().asEnr());
+ }
+
+ @Override
+ public Optional getDiscoveryAddress() {
+ final NodeRecord nodeRecord = discoverySystem.getLocalNodeRecord();
+ if (nodeRecord.getUdpAddress().isEmpty()) {
+ return Optional.empty();
+ }
+ final DiscoveryPeer discoveryPeer =
+ new DiscoveryPeer(
+ (Bytes) nodeRecord.get(EnrField.PKEY_SECP256K1),
+ nodeRecord.getUdpAddress().get(),
+ Optional.empty(),
+ SUBNET_SUBSCRIPTIONS_SCHEMA.getDefault());
+
+ return Optional.of(MultiaddrUtil.fromDiscoveryPeerAsUdp(discoveryPeer).toString());
+ }
+
+ @Override
+ public void updateCustomENRField(String fieldName, Bytes value) {
+ discoverySystem.updateCustomFieldValue(fieldName, value);
+ }
+
+ private Stream activeNodes() {
+ // LOG.info("Returning all nodes known by discovery system and active");
+ return discoverySystem
+ .streamKnownNodes()
+ .filter(record -> record.getStatus() == NodeStatus.ACTIVE)
+ .map(NodeRecordInfo::getNode);
+ }
+}
diff --git a/src/org/minima/system/network/base/DiscoveryConfig.java b/src/org/minima/system/network/base/DiscoveryConfig.java
new file mode 100644
index 000000000..0acf98428
--- /dev/null
+++ b/src/org/minima/system/network/base/DiscoveryConfig.java
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2021 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.minima.system.network.base;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Collections;
+import java.util.List;
+
+public class DiscoveryConfig {
+ private final boolean isDiscoveryEnabled;
+ private final List staticPeers;
+ private final List bootnodes;
+ private final int minPeers;
+ private final int maxPeers;
+ private final int minRandomlySelectedPeers;
+
+ private DiscoveryConfig(
+ final boolean isDiscoveryEnabled,
+ final List staticPeers,
+ final List bootnodes,
+ final int minPeers,
+ final int maxPeers,
+ final int minRandomlySelectedPeers) {
+ this.isDiscoveryEnabled = isDiscoveryEnabled;
+ this.staticPeers = staticPeers;
+ this.bootnodes = bootnodes;
+ this.minPeers = minPeers;
+ this.maxPeers = maxPeers;
+ this.minRandomlySelectedPeers = minRandomlySelectedPeers;
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public boolean isDiscoveryEnabled() {
+ return isDiscoveryEnabled;
+ }
+
+ public List getStaticPeers() {
+ return staticPeers;
+ }
+
+ public List getBootnodes() {
+ return bootnodes;
+ }
+
+ public int getMinPeers() {
+ return minPeers;
+ }
+
+ public int getMaxPeers() {
+ return maxPeers;
+ }
+
+ public int getMinRandomlySelectedPeers() {
+ return minRandomlySelectedPeers;
+ }
+
+ public static class Builder {
+ private Boolean isDiscoveryEnabled = true;
+ private List staticPeers = Collections.emptyList();
+ private List bootnodes = Collections.emptyList();
+ private int minPeers = 64;
+ private int maxPeers = 74;
+ private int minRandomlySelectedPeers = 2;
+
+ private Builder() {}
+
+ public Builder isDiscoveryEnabled(final Boolean discoveryEnabled) {
+ checkNotNull(discoveryEnabled);
+ isDiscoveryEnabled = discoveryEnabled;
+ return this;
+ }
+
+ public DiscoveryConfig build() {
+ return new DiscoveryConfig(
+ isDiscoveryEnabled, staticPeers, bootnodes, minPeers, maxPeers, minRandomlySelectedPeers);
+ }
+
+ public Builder staticPeers(final List staticPeers) {
+ checkNotNull(staticPeers);
+ this.staticPeers = staticPeers;
+ return this;
+ }
+
+ public Builder bootnodes(final List bootnodes) {
+ checkNotNull(bootnodes);
+ this.bootnodes = bootnodes;
+ return this;
+ }
+
+ public Builder minPeers(final Integer minPeers) {
+ checkNotNull(minPeers);
+ this.minPeers = minPeers;
+ return this;
+ }
+
+ public Builder maxPeers(final Integer maxPeers) {
+ checkNotNull(maxPeers);
+ this.maxPeers = maxPeers;
+ return this;
+ }
+
+ public Builder minRandomlySelectedPeers(final Integer minRandomlySelectedPeers) {
+ checkNotNull(minRandomlySelectedPeers);
+ this.minRandomlySelectedPeers = minRandomlySelectedPeers;
+ return this;
+ }
+ }
+}
diff --git a/src/org/minima/system/network/base/DiscoveryNetwork.java b/src/org/minima/system/network/base/DiscoveryNetwork.java
new file mode 100644
index 000000000..24ee7cfa8
--- /dev/null
+++ b/src/org/minima/system/network/base/DiscoveryNetwork.java
@@ -0,0 +1,255 @@
+
+/*
+ * Copyright 2020 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.minima.system.network.base;
+
+import static java.util.stream.Collectors.toList;
+//import static tech.pegasys.teku.util.config.Constants.ATTESTATION_SUBNET_COUNT;
+
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import com.google.common.io.ByteSink;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.minima.system.network.base.metrics.MetricsSystem;
+import org.minima.system.network.base.peer.DiscoveryPeer;
+import org.minima.system.network.base.peer.NodeId;
+import org.apache.tuweni.bytes.Bytes;
+import org.apache.tuweni.bytes.Bytes32;
+// import org.hyperledger.besu.plugin.services.MetricsSystem;
+// import tech.pegasys.teku.infrastructure.async.AsyncRunner;
+// import tech.pegasys.teku.infrastructure.async.SafeFuture;
+// import tech.pegasys.teku.infrastructure.logging.StatusLogger;
+// import tech.pegasys.teku.infrastructure.unsigned.UInt64;
+// import tech.pegasys.teku.networking.p2p.connection.ConnectionManager;
+// import tech.pegasys.teku.networking.p2p.connection.PeerSelectionStrategy;
+// import tech.pegasys.teku.networking.p2p.discovery.discv5.DiscV5Service;
+// import tech.pegasys.teku.networking.p2p.discovery.noop.NoOpDiscoveryService;
+// import tech.pegasys.teku.networking.p2p.network.DelegatingP2PNetwork;
+// import tech.pegasys.teku.networking.p2p.network.P2PNetwork;
+// import tech.pegasys.teku.networking.p2p.network.config.NetworkConfig;
+// import tech.pegasys.teku.networking.p2p.peer.NodeId;
+// import tech.pegasys.teku.networking.p2p.peer.Peer;
+// import tech.pegasys.teku.networking.p2p.peer.PeerConnectedSubscriber;
+// import tech.pegasys.teku.ssz.schema.collections.SszBitvectorSchema;
+// import tech.pegasys.teku.ssz.type.Bytes4;
+import org.minima.system.network.base.peer.Peer;
+import org.minima.system.network.base.peer.PeerConnectedSubscriber;
+import org.minima.system.network.base.peer.PeerSelectionStrategy;
+import org.minima.system.network.base.ssz.SszBitvectorSchema;
+
+// import tech.pegasys.teku.storage.store.KeyValueStore;
+
+public class DiscoveryNetwork
extends DelegatingP2PNetwork
{
+ private static final Logger LOG = LogManager.getLogger(DiscoveryNetwork.class);
+
+ public static final String ATTESTATION_SUBNET_ENR_FIELD = "attnets";
+ public static final String ETH2_ENR_FIELD = "eth2";
+
+ // from tech.pegasys.teku.util.config.Constants.ATTESTATION_SUBNET_COUNT
+ public static final int ATTESTATION_SUBNET_COUNT = 64;
+
+ private final P2PNetwork
p2pNetwork;
+ private final DiscoveryService discoveryService;
+ private final ConnectionManager connectionManager;
+
+ private volatile Optional enrForkId = Optional.empty();
+
+ private String mENR;
+
+ DiscoveryNetwork(
+ final P2PNetwork
p2pNetwork,
+ final DiscoveryService discoveryService,
+ final ConnectionManager connectionManager) {
+ super(p2pNetwork);
+ this.p2pNetwork = p2pNetwork;
+ this.discoveryService = discoveryService;
+ this.connectionManager = connectionManager;
+ initialize();
+ }
+
+ public void initialize() {
+ //TODO: set ENR data set by setpregenesisforkinfo
+ //setPreGenesisForkInfo();
+ //TODO: log this in another way
+ // getEnr().ifPresent(StatusLogger.STATUS_LOG::listeningForDiscv5PreGenesis);
+
+ // Set connection manager peer predicate so that we don't attempt to connect peers with
+ // different fork digests
+// connectionManager.addPeerPredicate(this::dontConnectPeersWithDifferentForkDigests);
+ }
+
+ public static
DiscoveryNetwork
create(
+ final MetricsSystem metricsSystem,
+ final AsyncRunner asyncRunner,
+ final KeyValueStore kvStore,
+ final P2PNetwork
p2pNetwork,
+ final PeerSelectionStrategy peerSelectionStrategy,
+ final DiscoveryConfig discoveryConfig,
+ final NetworkConfig p2pConfig
+ ) {
+ final DiscoveryService discoveryService =
+ createDiscoveryService(discoveryConfig, p2pConfig, kvStore, Bytes.wrap(p2pNetwork.getPrivateKey()));
+ final ConnectionManager connectionManager =
+ new ConnectionManager(
+ metricsSystem,
+ discoveryService,
+ asyncRunner,
+ p2pNetwork,
+ peerSelectionStrategy,
+ discoveryConfig.getStaticPeers().stream()
+ .map(p2pNetwork::createPeerAddress)
+ .collect(toList()));
+ return new DiscoveryNetwork<>(p2pNetwork, discoveryService, connectionManager);
+ }
+
+ private static DiscoveryService createDiscoveryService(
+ final DiscoveryConfig discoConfig,
+ final NetworkConfig p2pConfig,
+ final KeyValueStore kvStore,
+ final Bytes privateKey) {
+ final DiscoveryService discoveryService;
+ if (discoConfig.isDiscoveryEnabled()) {
+ System.out.println("P2P: Starting DiscV5 service");
+ discoveryService = DiscV5Service.create(discoConfig, p2pConfig, kvStore, privateKey);
+ //discoveryService = new NoOpDiscoveryService();
+ } else {
+ System.out.println("P2P: Starting NoOp Disc service");
+ discoveryService = new NoOpDiscoveryService();
+ }
+ return discoveryService;
+ }
+
+ public String getENR() {
+ return mENR;
+ }
+
+ @Override
+ public SafeFuture> start() {
+ return SafeFuture.allOfFailFast(p2pNetwork.start(), discoveryService.start())
+ .thenCompose(__ -> connectionManager.start())
+ .thenRun(() -> getEnr().ifPresent(
+ enr -> {
+ LOG.warn("logwarn: listening for discv5: " + enr);
+ System.out.println("sysout: listening for discv5: " + enr);
+ mENR = enr;
+ }));
+ } //::listeningForDiscv5
+
+ @Override
+ public SafeFuture> stop() {
+ return connectionManager
+ .stop()
+ .handleComposed(
+ (__, err) -> {
+ if (err != null) {
+ LOG.warn("Error shutting down connection manager", err);
+ }
+ return SafeFuture.allOf(p2pNetwork.stop(), discoveryService.stop());
+ });
+ }
+
+ public void addStaticPeer(final String peerAddress) {
+ connectionManager.addStaticPeer(p2pNetwork.createPeerAddress(peerAddress));
+ }
+
+ @Override
+ public Optional getEnr() {
+ return discoveryService.getEnr();
+ }
+
+ @Override
+ public Optional getDiscoveryAddress() {
+ return discoveryService.getDiscoveryAddress();
+ }
+
+ public void setLongTermAttestationSubnetSubscriptions(Iterable subnetIds) {
+ discoveryService.updateCustomENRField(
+ ATTESTATION_SUBNET_ENR_FIELD,
+ SszBitvectorSchema.create(ATTESTATION_SUBNET_COUNT).ofBits(subnetIds).sszSerialize());
+ }
+
+ // public void setPreGenesisForkInfo() {
+ // final Bytes4 genesisForkVersion = spec.getGenesisSpecConfig().getGenesisForkVersion();
+ // final EnrForkId enrForkId =
+ // new EnrForkId(
+ // spec.getGenesisBeaconStateUtil().computeForkDigest(genesisForkVersion, Bytes32.ZERO),
+ // genesisForkVersion,
+ // SpecConfig.FAR_FUTURE_EPOCH);
+ // discoveryService.updateCustomENRField(ETH2_ENR_FIELD, enrForkId.sszSerialize());
+ // this.enrForkId = Optional.of(enrForkId);
+ // }
+
+ // public void setForkInfo(final ForkInfo currentForkInfo, final Optional nextForkInfo) {
+ // // If no future fork is planned, set next_fork_version = current_fork_version to signal this
+ // final Bytes4 nextVersion =
+ // nextForkInfo
+ // .map(Fork::getCurrent_version)
+ // .orElse(currentForkInfo.getFork().getCurrent_version());
+ // // If no future fork is planned, set next_fork_epoch = FAR_FUTURE_EPOCH to signal this
+ // final UInt64 nextForkEpoch =
+ // nextForkInfo.map(Fork::getEpoch).orElse(SpecConfig.FAR_FUTURE_EPOCH);
+
+ // final Bytes4 forkDigest = currentForkInfo.getForkDigest();
+ // final EnrForkId enrForkId = new EnrForkId(forkDigest, nextVersion, nextForkEpoch);
+ // final Bytes encodedEnrForkId = enrForkId.sszSerialize();
+
+ // discoveryService.updateCustomENRField(ETH2_ENR_FIELD, encodedEnrForkId);
+ // this.enrForkId = Optional.of(enrForkId);
+ // }
+
+ // private boolean dontConnectPeersWithDifferentForkDigests(DiscoveryPeer peer) {
+ // return enrForkId
+ // .map(EnrForkId::getForkDigest)
+ // .flatMap(
+ // localForkDigest ->
+ // peer.getEnrForkId()
+ // .map(EnrForkId::getForkDigest)
+ // .map(peerForkDigest -> peerForkDigest.equals(localForkDigest)))
+ // .orElse(false);
+ // }
+
+ @Override
+ public long subscribeConnect(final PeerConnectedSubscriber
subscriber) {
+ return p2pNetwork.subscribeConnect(subscriber);
+ }
+
+ @Override
+ public void unsubscribeConnect(final long subscriptionId) {
+ p2pNetwork.unsubscribeConnect(subscriptionId);
+ }
+
+ @Override
+ public Optional
streamPeers() {
+ return p2pNetwork.streamPeers();
+ }
+
+ public Stream streamKnownDiscoveryPeers() {
+ return discoveryService.streamKnownPeers();
+ }
+
+ public int getP2PPeerCount() {
+ LibP2PNetwork net = (LibP2PNetwork) p2pNetwork;
+ return net.getPeerCount();
+ }
+
+}
+
diff --git a/src/org/minima/system/network/base/DiscoveryNetworkFactory.java b/src/org/minima/system/network/base/DiscoveryNetworkFactory.java
new file mode 100644
index 000000000..c63c773f3
--- /dev/null
+++ b/src/org/minima/system/network/base/DiscoveryNetworkFactory.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2020 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.minima.system.network.base;
+
+import java.net.BindException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import io.libp2p.core.crypto.PrivKey;
+
+// import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
+// import tech.pegasys.teku.infrastructure.async.DelayedExecutorAsyncRunner;
+// import tech.pegasys.teku.infrastructure.async.SafeFuture;
+// import tech.pegasys.teku.infrastructure.async.Waiter;
+// import tech.pegasys.teku.infrastructure.time.StubTimeProvider;
+// import tech.pegasys.teku.network.p2p.jvmlibp2p.PrivateKeyGenerator;
+// import tech.pegasys.teku.network.p2p.peer.SimplePeerSelectionStrategy;
+// import tech.pegasys.teku.networking.p2p.connection.PeerSelectionStrategy;
+// import tech.pegasys.teku.networking.p2p.connection.TargetPeerRange;
+// import tech.pegasys.teku.networking.p2p.discovery.DiscoveryConfig;
+// import tech.pegasys.teku.networking.p2p.discovery.DiscoveryNetwork;
+// import tech.pegasys.teku.networking.p2p.libp2p.LibP2PNetwork;
+// import tech.pegasys.teku.networking.p2p.network.config.NetworkConfig;
+// import tech.pegasys.teku.networking.p2p.peer.Peer;
+// import tech.pegasys.teku.networking.p2p.reputation.ReputationManager;
+// import tech.pegasys.teku.spec.Spec;
+// import tech.pegasys.teku.spec.SpecFactory;
+// import tech.pegasys.teku.storage.store.MemKeyValueStore;
+// import tech.pegasys.teku.util.config.Constants;
+import org.minima.system.network.base.metrics.NoOpMetricsSystem;
+import org.minima.system.network.base.peer.Peer;
+import org.minima.system.network.base.peer.PeerSelectionStrategy;
+import org.minima.system.network.base.peer.ReputationManager;
+import org.minima.system.network.base.peer.SimplePeerSelectionStrategy;
+import org.minima.system.network.base.peer.TargetPeerRange;
+
+public class DiscoveryNetworkFactory {
+
+ protected static final Logger LOG = LogManager.getLogger(DiscoveryNetworkFactory.class);
+ protected static final NoOpMetricsSystem METRICS_SYSTEM = new NoOpMetricsSystem();
+ private static final int MIN_PORT = 9000;
+ private static final int MAX_PORT = 12000;
+
+ private final List> networks = new ArrayList<>();
+
+ // from tech.pegasys.teku.util.config.Constants
+ public final class Constants {
+ public static final int REPUTATION_MANAGER_CAPACITY = 1024;
+ public static final int ATTESTATION_SUBNET_COUNT = 64;
+ }
+
+ public DiscoveryNetworkBuilder builder() {
+ return new DiscoveryNetworkBuilder();
+ }
+
+ public void stopAll() throws InterruptedException, ExecutionException, TimeoutException {
+ Waiter.waitFor(
+ SafeFuture.allOf(networks.stream().map(DiscoveryNetwork::stop).toArray(SafeFuture[]::new)));
+ }
+
+ public class DiscoveryNetworkBuilder {
+ private final List staticPeers = new ArrayList<>();
+ private final List bootnodes = new ArrayList<>();
+ private PrivKey privKey;
+
+ private DiscoveryNetworkBuilder() {}
+
+ public DiscoveryNetworkBuilder staticPeer(final String staticPeer) {
+ this.staticPeers.add(staticPeer);
+ return this;
+ }
+
+ public DiscoveryNetworkBuilder bootnode(final String bootnode) {
+ this.bootnodes.add(bootnode);
+ return this;
+ }
+
+ public DiscoveryNetworkBuilder setPrivKey(final PrivKey privKey) {
+ this.privKey = privKey;
+ return this;
+ }
+ public DiscoveryNetwork buildAndStart(int _port) throws Exception {
+ int attempt = 1;
+ while (true) {
+ final int port;
+ if(_port == 0) {
+ final Random random = new Random();
+ port = MIN_PORT + random.nextInt(MAX_PORT - MIN_PORT);
+ } else {
+ port = _port;
+ }
+ final DiscoveryConfig discoveryConfig =
+ DiscoveryConfig.builder().staticPeers(staticPeers).bootnodes(bootnodes).build();
+ final NetworkConfig config =
+ NetworkConfig.builder().listenPort(port).networkInterface("0.0.0.0").build();
+ final NoOpMetricsSystem metricsSystem = new NoOpMetricsSystem();
+ final ReputationManager reputationManager =
+ new ReputationManager(
+ metricsSystem,
+ StubTimeProvider.withTimeInSeconds(1000),
+ Constants.REPUTATION_MANAGER_CAPACITY);
+ final PeerSelectionStrategy peerSelectionStrategy =
+ new SimplePeerSelectionStrategy(new TargetPeerRange(20, 30, 0));
+
+ final DiscoveryNetwork network =
+ DiscoveryNetwork.create(
+ metricsSystem,
+ DelayedExecutorAsyncRunner.create(),
+ new MemKeyValueStore<>(),
+ new LibP2PNetwork(
+ DelayedExecutorAsyncRunner.create(),
+ config,
+ privKey,
+ reputationManager,
+ METRICS_SYSTEM,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ (__1, __2) -> {
+ throw new UnsupportedOperationException();
+ },
+ topic -> true),
+ peerSelectionStrategy,
+ discoveryConfig,
+ config);
+ try {
+ network.start().get(5, TimeUnit.SECONDS);
+ networks.add(network);
+ return network;
+ } catch (final ExecutionException e) {
+ if (e.getCause() instanceof BindException) {
+ if (attempt > 10) {
+ throw new RuntimeException("Failed to find a free port after multiple attempts", e);
+ }
+ LOG.info(
+ "Port conflict detected, retrying with a new port. Original message: {}",
+ e.getMessage());
+ attempt++;
+ Waiter.waitFor(network.stop());
+ } else {
+ throw e;
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/src/org/minima/system/network/base/DiscoveryService.java b/src/org/minima/system/network/base/DiscoveryService.java
new file mode 100644
index 000000000..184eb9935
--- /dev/null
+++ b/src/org/minima/system/network/base/DiscoveryService.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2020 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.minima.system.network.base;
+
+import java.util.Optional;
+import java.util.stream.Stream;
+import org.apache.tuweni.bytes.Bytes;
+//import tech.pegasys.teku.infrastructure.async.SafeFuture;
+import org.minima.system.network.base.peer.DiscoveryPeer;
+
+public interface DiscoveryService {
+
+ SafeFuture> start();
+
+ SafeFuture> stop();
+
+ Stream streamKnownPeers();
+
+ SafeFuture searchForPeers();
+
+ Optional getEnr();
+
+ Optional getDiscoveryAddress();
+
+ void updateCustomENRField(String fieldName, Bytes value);
+}
diff --git a/src/org/minima/system/network/base/EnrForkId.java b/src/org/minima/system/network/base/EnrForkId.java
new file mode 100644
index 000000000..08bf7670f
--- /dev/null
+++ b/src/org/minima/system/network/base/EnrForkId.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2020 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.minima.system.network.base;
+
+import org.minima.system.network.base.ssz.Bytes4;
+import org.minima.system.network.base.ssz.Container3;
+import org.minima.system.network.base.ssz.ContainerSchema3;
+import org.minima.system.network.base.ssz.SszBytes4;
+import org.minima.system.network.base.ssz.SszPrimitiveSchemas;
+import org.minima.system.network.base.ssz.SszUInt64;
+import org.minima.system.network.base.ssz.UInt64;
+import org.minima.system.network.base.ssz.TreeNode;
+
+// import tech.pegasys.teku.infrastructure.unsigned.UInt64;
+// import tech.pegasys.teku.ssz.containers.Container3;
+// import tech.pegasys.teku.ssz.containers.ContainerSchema3;
+// import tech.pegasys.teku.ssz.primitive.SszBytes4;
+// import tech.pegasys.teku.ssz.primitive.SszUInt64;
+// import tech.pegasys.teku.ssz.schema.SszPrimitiveSchemas;
+// import tech.pegasys.teku.ssz.tree.TreeNode;
+// import tech.pegasys.teku.ssz.type.Bytes4;
+
+public class EnrForkId extends Container3 {
+
+ public static class EnrForkIdSchema
+ extends ContainerSchema3 {
+
+ public EnrForkIdSchema() {
+ super(
+ "EnrForkId",
+ namedSchema("forkDigest", SszPrimitiveSchemas.BYTES4_SCHEMA),
+ namedSchema("nextForkVersion", SszPrimitiveSchemas.BYTES4_SCHEMA),
+ namedSchema("nextForkEpoch", SszPrimitiveSchemas.UINT64_SCHEMA));
+ }
+
+ @Override
+ public EnrForkId createFromBackingNode(TreeNode node) {
+ return new EnrForkId(this, node);
+ }
+ }
+
+ public static final EnrForkIdSchema SSZ_SCHEMA = new EnrForkIdSchema();
+
+ private EnrForkId(EnrForkIdSchema type, TreeNode backingNode) {
+ super(type, backingNode);
+ }
+
+ public EnrForkId(
+ final Bytes4 forkDigest, final Bytes4 nextForkVersion, final UInt64 nextForkEpoch) {
+ super(
+ SSZ_SCHEMA,
+ SszBytes4.of(forkDigest),
+ SszBytes4.of(nextForkVersion),
+ SszUInt64.of(nextForkEpoch));
+ }
+
+ public Bytes4 getForkDigest() {
+ return getField0().get();
+ }
+
+ public Bytes4 getNextForkVersion() {
+ return getField1().get();
+ }
+
+ public UInt64 getNextForkEpoch() {
+ return getField2().get();
+ }
+}
diff --git a/src/org/minima/system/network/base/ExceptionThrowingConsumer.java b/src/org/minima/system/network/base/ExceptionThrowingConsumer.java
new file mode 100644
index 000000000..2cc4ce007
--- /dev/null
+++ b/src/org/minima/system/network/base/ExceptionThrowingConsumer.java
@@ -0,0 +1,19 @@
+/*
+ * Copyright 2021 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.minima.system.network.base;
+
+public interface ExceptionThrowingConsumer {
+
+ void accept(V value) throws Throwable;
+}
diff --git a/src/org/minima/system/network/base/ExceptionThrowingFunction.java b/src/org/minima/system/network/base/ExceptionThrowingFunction.java
new file mode 100644
index 000000000..58bee42d1
--- /dev/null
+++ b/src/org/minima/system/network/base/ExceptionThrowingFunction.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2020 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.minima.system.network.base;
+
+public interface ExceptionThrowingFunction {
+ O apply(I value) throws Throwable;
+}
diff --git a/src/org/minima/system/network/base/ExceptionThrowingFutureSupplier.java b/src/org/minima/system/network/base/ExceptionThrowingFutureSupplier.java
new file mode 100644
index 000000000..1648b11b3
--- /dev/null
+++ b/src/org/minima/system/network/base/ExceptionThrowingFutureSupplier.java
@@ -0,0 +1,8 @@
+
+package org.minima.system.network.base;
+
+import java.util.concurrent.CompletionStage;
+
+public interface ExceptionThrowingFutureSupplier {
+ CompletionStage get() throws Throwable;
+}
diff --git a/src/org/minima/system/network/base/ExceptionThrowingRunnable.java b/src/org/minima/system/network/base/ExceptionThrowingRunnable.java
new file mode 100644
index 000000000..4ea79c94a
--- /dev/null
+++ b/src/org/minima/system/network/base/ExceptionThrowingRunnable.java
@@ -0,0 +1,18 @@
+/*
+ * Copyright 2020 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.minima.system.network.base;
+
+public interface ExceptionThrowingRunnable {
+ void run() throws Throwable;
+}
diff --git a/src/org/minima/system/network/base/ExceptionThrowingSupplier.java b/src/org/minima/system/network/base/ExceptionThrowingSupplier.java
new file mode 100644
index 000000000..b7ac9b693
--- /dev/null
+++ b/src/org/minima/system/network/base/ExceptionThrowingSupplier.java
@@ -0,0 +1,6 @@
+
+package org.minima.system.network.base;
+
+public interface ExceptionThrowingSupplier {
+ O get() throws Throwable;
+}
diff --git a/src/org/minima/system/network/base/Firewall.java b/src/org/minima/system/network/base/Firewall.java
new file mode 100644
index 000000000..91bcaf37e
--- /dev/null
+++ b/src/org/minima/system/network/base/Firewall.java
@@ -0,0 +1,66 @@
+/*
+ * Copyright 2020 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.minima.system.network.base;
+
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.WriteBufferWaterMark;
+import io.netty.handler.timeout.WriteTimeoutException;
+import io.netty.handler.timeout.WriteTimeoutHandler;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+//import tech.pegasys.teku.infrastructure.async.FutureUtil;
+
+/**
+ * The very first Netty handler in the Libp2p connection pipeline. Sets up Netty Channel options and
+ * doing other duties preventing DoS attacks
+ */
+@Sharable
+public class Firewall extends ChannelInboundHandlerAdapter {
+ private static final Logger LOG = LogManager.getLogger();
+
+ private final Duration writeTimeout;
+
+ public Firewall(Duration writeTimeout) {
+ this.writeTimeout = writeTimeout;
+ }
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) {
+ ctx.channel().config().setWriteBufferWaterMark(new WriteBufferWaterMark(100, 1024));
+ ctx.pipeline().addLast(new WriteTimeoutHandler(writeTimeout.toMillis(), TimeUnit.MILLISECONDS));
+ ctx.pipeline().addLast(new FirewallExceptionHandler());
+ }
+
+ @Override
+ public void channelWritabilityChanged(ChannelHandlerContext ctx) {
+ ctx.channel().config().setAutoRead(ctx.channel().isWritable());
+ ctx.fireChannelWritabilityChanged();
+ }
+
+ class FirewallExceptionHandler extends ChannelInboundHandlerAdapter {
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ if (cause instanceof WriteTimeoutException) {
+ LOG.debug("Firewall closed channel by write timeout. No writes during " + writeTimeout);
+ } else {
+ LOG.debug("Error in Firewall, disconnecting" + cause);
+ FutureUtil.ignoreFuture(ctx.close());
+ }
+ }
+ }
+}
diff --git a/src/org/minima/system/network/base/FutureUtil.java b/src/org/minima/system/network/base/FutureUtil.java
new file mode 100644
index 000000000..c06646e8d
--- /dev/null
+++ b/src/org/minima/system/network/base/FutureUtil.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.minima.system.network.base;
+
+import java.time.Duration;
+import java.util.concurrent.Future;
+import java.util.function.Consumer;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class FutureUtil {
+ private static final Logger LOG = LogManager.getLogger();
+
+ public static void ignoreFuture(final Future future) {}
+
+ static void runWithFixedDelay(
+ AsyncRunner runner,
+ ExceptionThrowingRunnable runnable,
+ Cancellable task,
+ final Duration duration,
+ Consumer exceptionHandler) {
+
+ runner
+ .runAfterDelay(
+ () -> {
+ if (!task.isCancelled()) {
+ try {
+ runnable.run();
+ } catch (Throwable throwable) {
+ try {
+ exceptionHandler.accept(throwable);
+ } catch (Exception e) {
+ LOG.warn("Exception in exception handler", e);
+ }
+ } finally {
+ runWithFixedDelay(runner, runnable, task, duration, exceptionHandler);
+ }
+ }
+ },
+ duration)
+ .finish(() -> {}, exceptionHandler);
+ }
+
+ static Cancellable createCancellable() {
+ return new Cancellable() {
+ private volatile boolean cancelled;
+
+ @Override
+ public void cancel() {
+ cancelled = true;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return cancelled;
+ }
+ };
+ }
+}
diff --git a/src/org/minima/system/network/base/InvalidConfigurationException.java b/src/org/minima/system/network/base/InvalidConfigurationException.java
new file mode 100644
index 000000000..b239e9db5
--- /dev/null
+++ b/src/org/minima/system/network/base/InvalidConfigurationException.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2020 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.minima.system.network.base;
+
+public class InvalidConfigurationException extends RuntimeException {
+ public InvalidConfigurationException(final String message) {
+ super(message);
+ }
+
+ public InvalidConfigurationException(final String message, final Throwable cause) {
+ super(message, cause);
+ }
+
+ public InvalidConfigurationException(final Throwable cause) {
+ super(cause.getMessage(), cause);
+ }
+}
diff --git a/src/org/minima/system/network/base/KeyValueStore.java b/src/org/minima/system/network/base/KeyValueStore.java
new file mode 100644
index 000000000..95d07ca62
--- /dev/null
+++ b/src/org/minima/system/network/base/KeyValueStore.java
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2020 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.minima.system.network.base;
+
+import java.util.Optional;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Generic simple key-value store interface Both key and value are not allowed to be null
+ *
+ * @param key type
+ * @param value type
+ */
+public interface KeyValueStore {
+
+ /** Puts a new value. If the value is {@code null} then the entry is removed if exist */
+ void put(@NotNull TKey key, @NotNull TValue value);
+
+ /** Removes entry with the specified key */
+ void remove(@NotNull TKey key);
+
+ /**
+ * Returns a value corresponding to the key or {{@link Optional#empty()}} if entry doesn't exist
+ * in the store
+ */
+ Optional get(@NotNull TKey key);
+
+ /**
+ * Performs batch store update.
+ *
+ *
The implementation may override this default method and declare it to be an atomic store
+ * update. Though this generic interface makes no restrictions on atomicity of this method
+ */
+ default void updateAll(Iterable> data) {
+ data.forEach(
+ update -> {
+ if (update.getType() == UpdateType.UPDATE) {
+ put(update.getKey(), update.getValue());
+ } else if (update.getType() == UpdateType.REMOVE) {
+ remove(update.getKey());
+ } else {
+ throw new IllegalArgumentException("Unknown type: " + update.getType());
+ }
+ });
+ }
+
+ enum UpdateType {
+ UPDATE,
+ REMOVE
+ }
+
+ /** Represents a batched update entry */
+ class EntryUpdate {
+ private final UpdateType type;
+ private final K key;
+ private final V value;
+
+ public static EntryUpdate update(K key, V value) {
+ return new EntryUpdate<>(UpdateType.UPDATE, key, value);
+ }
+
+ public static EntryUpdate remove(K key) {
+ return new EntryUpdate<>(UpdateType.REMOVE, key, null);
+ }
+
+ private EntryUpdate(UpdateType type, K key, V value) {
+ this.type = type;
+ this.key = key;
+ this.value = value;
+ }
+
+ public UpdateType getType() {
+ return type;
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ public V getValue() {
+ if (getType() != UpdateType.UPDATE) {
+ throw new IllegalStateException("No value for this update");
+ }
+ return value;
+ }
+ }
+}
diff --git a/src/org/minima/system/network/base/LRUCache.java b/src/org/minima/system/network/base/LRUCache.java
new file mode 100644
index 000000000..34da71967
--- /dev/null
+++ b/src/org/minima/system/network/base/LRUCache.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2020 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.minima.system.network.base;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+//import tech.pegasys.teku.infrastructure.collections.LimitedMap;
+
+import org.minima.system.network.base.ssz.Cache;
+
+/**
+ * Cache made around LRU-map with fixed size, removing eldest entries (by added) when the space is
+ * over
+ *
+ * @param Keys type
+ * @param Values type
+ */
+public class LRUCache implements Cache {
+
+ private final Map cacheData;
+ private final int maxCapacity;
+
+ /**
+ * Creates cache
+ *
+ * @param capacity Size of the cache
+ */
+ public LRUCache(int capacity) {
+ this(capacity, Collections.emptyMap());
+ }
+
+ private LRUCache(int capacity, Map initialCachedContent) {
+ this.maxCapacity = capacity;
+ Map cacheMap = LimitedMap.create(maxCapacity);
+ // copy safely, initialCachedContent is always a SynchronizedMap instance
+ synchronized (initialCachedContent) {
+ cacheMap.putAll(initialCachedContent);
+ }
+ this.cacheData = cacheMap;
+ }
+
+ @Override
+ public Cache copy() {
+ return new LRUCache<>(maxCapacity, cacheData);
+ }
+
+ /**
+ * Queries value from the cache. If it's not found there, fallback function is used to calculate
+ * value. After calculation result is put in cache and returned.
+ *
+ * @param key Key to query
+ * @param fallback Fallback function for calculation of the result in case of missed cache entry
+ * @return expected value result for provided key
+ */
+ @Override
+ public V get(K key, Function fallback) {
+ V result = cacheData.get(key);
+
+ if (result == null) {
+ result = fallback.apply(key);
+ if (result != null) {
+ cacheData.put(key, result);
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public Optional getCached(K key) {
+ return Optional.ofNullable(cacheData.get(key));
+ }
+
+ @Override
+ public void invalidate(K key) {
+ cacheData.remove(key);
+ }
+
+ @Override
+ public void clear() {
+ cacheData.clear();
+ }
+
+ @Override
+ public int size() {
+ return cacheData.size();
+ }
+}
diff --git a/src/org/minima/system/network/base/LibP2PNetwork.java b/src/org/minima/system/network/base/LibP2PNetwork.java
new file mode 100644
index 000000000..51381dcfd
--- /dev/null
+++ b/src/org/minima/system/network/base/LibP2PNetwork.java
@@ -0,0 +1,332 @@
+/*
+ * Copyright 2019 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.minima.system.network.base;
+
+import static org.minima.system.network.base.SafeFuture.failedFuture;
+//import static tech.pegasys.teku.infrastructure.logging.StatusLogger.STATUS_LOG;
+
+import identify.pb.IdentifyOuterClass;
+import io.libp2p.core.Host;
+import io.libp2p.core.PeerId;
+import io.libp2p.core.crypto.PrivKey;
+import io.libp2p.core.dsl.Builder.Defaults;
+import io.libp2p.core.dsl.BuilderJKt;
+import io.libp2p.core.multiformats.Multiaddr;
+import io.libp2p.core.multistream.ProtocolBinding;
+import io.libp2p.core.mux.StreamMuxerProtocol;
+import io.libp2p.etc.types.ByteArrayExtKt;
+//import io.libp2p.etc.util.P2PService.PeerHandler;
+import io.libp2p.protocol.Identify;
+import io.libp2p.protocol.Ping;
+import io.libp2p.security.noise.NoiseXXSecureChannel;
+import io.libp2p.transport.tcp.TcpTransport;
+import io.netty.handler.logging.LogLevel;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Stream;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.tuweni.bytes.Bytes;
+import org.minima.system.network.base.gossip.PreparedGossipMessageFactory;
+import org.minima.system.network.base.gossip.TopicChannel;
+import org.minima.system.network.base.gossip.TopicHandler;
+import org.minima.system.network.base.gossip.config.GossipTopicsScoringConfig;
+import org.minima.system.network.base.libp2p.gossip.GossipTopicFilter;
+import org.minima.system.network.base.libp2p.gossip.LibP2PGossipNetwork;
+import org.minima.system.network.base.metrics.MetricsSystem;
+import org.minima.system.network.base.peer.DiscoveryPeer;
+import org.minima.system.network.base.peer.LibP2PNodeId;
+import org.minima.system.network.base.peer.MultiaddrPeerAddress;
+import org.minima.system.network.base.peer.MultiaddrUtil;
+import org.minima.system.network.base.peer.NodeId;
+//import org.hyperledger.besu.plugin.services.MetricsSystem;
+// import tech.pegasys.teku.infrastructure.async.AsyncRunner;
+// import tech.pegasys.teku.infrastructure.async.SafeFuture;
+// import tech.pegasys.teku.infrastructure.version.VersionProvider;
+// import tech.pegasys.teku.networking.p2p.discovery.DiscoveryPeer;
+// import tech.pegasys.teku.networking.p2p.gossip.PreparedGossipMessageFactory;
+// import tech.pegasys.teku.networking.p2p.gossip.TopicChannel;
+// import tech.pegasys.teku.networking.p2p.gossip.TopicHandler;
+// import tech.pegasys.teku.networking.p2p.gossip.config.GossipTopicsScoringConfig;
+// import tech.pegasys.teku.networking.p2p.libp2p.gossip.GossipTopicFilter;
+// import tech.pegasys.teku.networking.p2p.libp2p.gossip.LibP2PGossipNetwork;
+// import tech.pegasys.teku.networking.p2p.libp2p.rpc.RpcHandler;
+// import tech.pegasys.teku.networking.p2p.network.P2PNetwork;
+// import tech.pegasys.teku.networking.p2p.network.PeerAddress;
+// import tech.pegasys.teku.networking.p2p.network.PeerHandler;
+// import tech.pegasys.teku.networking.p2p.network.config.NetworkConfig;
+// import tech.pegasys.teku.networking.p2p.peer.NodeId;
+// import tech.pegasys.teku.networking.p2p.peer.Peer;
+// import tech.pegasys.teku.networking.p2p.peer.PeerConnectedSubscriber;
+// import tech.pegasys.teku.networking.p2p.reputation.ReputationManager;
+// import tech.pegasys.teku.networking.p2p.rpc.RpcMethod;
+import org.minima.system.network.base.peer.Peer;
+import org.minima.system.network.base.peer.PeerAddress;
+import org.minima.system.network.base.peer.PeerConnectedSubscriber;
+import org.minima.system.network.base.peer.PeerManager;
+import org.minima.system.network.base.peer.PeerHandler;
+import org.minima.system.network.base.peer.ReputationManager;
+import org.minima.system.network.base.peer.RpcHandler;
+import org.minima.system.network.base.peer.RpcMethod;
+
+public class LibP2PNetwork implements P2PNetwork {
+
+ private static final Logger LOG = LogManager.getLogger(LibP2PNetwork.class);
+ private static final int REMOTE_OPEN_STREAMS_RATE_LIMIT = 256;
+ private static final int REMOTE_PARALLEL_OPEN_STREAMS_COUNT_LIMIT = 256;
+
+ private final PrivKey privKey;
+ private final NodeId nodeId;
+
+ private final Host host;
+ private final PeerManager peerManager;
+ private final Multiaddr advertisedAddr;
+ private final LibP2PGossipNetwork gossipNetwork;
+
+ private final AtomicReference state = new AtomicReference<>(State.IDLE);
+ private final Map rpcHandlers = new ConcurrentHashMap<>();
+ private final int listenPort;
+
+ public LibP2PNetwork(
+ final AsyncRunner asyncRunner,
+ final NetworkConfig config,
+ final PrivKey privKey,
+ final ReputationManager reputationManager,
+ final MetricsSystem metricsSystem,
+ final List rpcMethods,
+ final List peerHandlers,
+ final PreparedGossipMessageFactory defaultMessageFactory,
+ final GossipTopicFilter gossipTopicFilter) {
+
+ this.privKey = privKey;
+ this.nodeId = new LibP2PNodeId(PeerId.fromPubKey(privKey.publicKey()));
+
+ System.out.println("LibP2PNetwork - privKey = " + privKey.toString());
+ System.out.println("LibP2PNetwork - nodeId = " + nodeId);
+ advertisedAddr =
+ MultiaddrUtil.fromInetSocketAddress(
+ new InetSocketAddress(config.getAdvertisedIp(), config.getAdvertisedPort()), nodeId);
+ this.listenPort = config.getListenPort();
+
+ // Setup gossip
+ gossipNetwork =
+ LibP2PGossipNetwork.create(
+ metricsSystem,
+ config.getGossipConfig(),
+ defaultMessageFactory,
+ gossipTopicFilter,
+ config.getWireLogsConfig().isLogWireGossip());
+
+ // Setup rpc methods
+ rpcMethods.forEach(method -> rpcHandlers.put(method, new RpcHandler(asyncRunner, method)));
+
+ // Setup peers
+ peerManager = new PeerManager(metricsSystem, reputationManager, peerHandlers, rpcHandlers);
+
+ final Multiaddr listenAddr =
+ MultiaddrUtil.fromInetSocketAddress(
+ new InetSocketAddress(config.getNetworkInterface(), config.getListenPort()));
+ host =
+ BuilderJKt.hostJ(
+ Defaults.None,
+ b -> {
+ b.getIdentity().setFactory(() -> privKey);
+ b.getTransports().add(TcpTransport::new);
+ b.getSecureChannels().add(NoiseXXSecureChannel::new);
+ b.getMuxers().add(StreamMuxerProtocol.getMplex());
+
+ b.getNetwork().listen(listenAddr.toString());
+
+ b.getProtocols().addAll(getDefaultProtocols());
+ b.getProtocols().addAll(rpcHandlers.values());
+
+ if (config.getWireLogsConfig().isLogWireCipher()) {
+ b.getDebug().getBeforeSecureHandler().addLogger(LogLevel.DEBUG, "wire.ciphered");
+ }
+ Firewall firewall = new Firewall(Duration.ofSeconds(30));
+ b.getDebug().getBeforeSecureHandler().addNettyHandler(firewall);
+
+ if (config.getWireLogsConfig().isLogWirePlain()) {
+ b.getDebug().getAfterSecureHandler().addLogger(LogLevel.DEBUG, "wire.plain");
+ }
+ if (config.getWireLogsConfig().isLogWireMuxFrames()) {
+ b.getDebug().getMuxFramesHandler().addLogger(LogLevel.DEBUG, "wire.mux");
+ }
+
+ b.getConnectionHandlers().add(peerManager);
+
+ MplexFirewall mplexFirewall =
+ new MplexFirewall(
+ REMOTE_OPEN_STREAMS_RATE_LIMIT, REMOTE_PARALLEL_OPEN_STREAMS_COUNT_LIMIT);
+ b.getDebug().getMuxFramesHandler().addHandler(mplexFirewall);
+ });
+ }
+
+ private List> getDefaultProtocols() {
+ final Ping ping = new Ping();
+ IdentifyOuterClass.Identify identifyMsg =
+ IdentifyOuterClass.Identify.newBuilder()
+ .setProtocolVersion("ipfs/0.1.0")
+ .setAgentVersion("Minima_0.98.0-testss-p2p-Zulu-OpenJDK-11-AARCH64")
+ .setPublicKey(ByteArrayExtKt.toProtobuf(privKey.publicKey().bytes()))
+ .addListenAddrs(ByteArrayExtKt.toProtobuf(advertisedAddr.getBytes()))
+ .setObservedAddr(ByteArrayExtKt.toProtobuf(advertisedAddr.getBytes()))
+ .addAllProtocols(ping.getProtocolDescriptor().getAnnounceProtocols())
+ .addAllProtocols(
+ gossipNetwork.getGossip().getProtocolDescriptor().getAnnounceProtocols())
+ .build();
+ return List.of(ping, new Identify(identifyMsg), gossipNetwork.getGossip());
+ }
+
+ @Override
+ public SafeFuture> start() {
+ if (!state.compareAndSet(State.IDLE, State.RUNNING)) {
+ return SafeFuture.failedFuture(new IllegalStateException("Network already started"));
+ }
+ LOG.info("Starting libp2p network...");
+ return SafeFuture.of(host.start())
+ .thenApply(
+ i -> {
+ //STATUS_LOG.listeningForLibP2P(getNodeAddress());
+ LOG.debug("Listening for LibP2P - " + getNodeAddress());
+ return null;
+ });
+ }
+
+ @Override
+ public String getNodeAddress() {
+ return advertisedAddr.toString();
+ }
+
+ @Override
+ public SafeFuture connect(final PeerAddress peer) {
+ return peer.as(MultiaddrPeerAddress.class)
+ .map(staticPeer -> peerManager.connect(staticPeer, host.getNetwork()))
+ .orElseGet(
+ () ->
+ failedFuture(
+ new IllegalArgumentException(
+ "Unsupported peer address: " + peer.getClass().getName())));
+ }
+
+ @Override
+ public PeerAddress createPeerAddress(final String peerAddress) {
+ return MultiaddrPeerAddress.fromAddress(peerAddress);
+ }
+
+ @Override
+ public PeerAddress createPeerAddress(final DiscoveryPeer discoveryPeer) {
+ return MultiaddrPeerAddress.fromDiscoveryPeer(discoveryPeer);
+ }
+
+ @Override
+ public long subscribeConnect(final PeerConnectedSubscriber subscriber) {
+ return peerManager.subscribeConnect(subscriber);
+ }
+
+ @Override
+ public void unsubscribeConnect(final long subscriptionId) {
+ peerManager.unsubscribeConnect(subscriptionId);
+ }
+
+ @Override
+ public boolean isConnected(final PeerAddress peerAddress) {
+ return peerManager.getPeer(peerAddress.getId()).isPresent();
+ }
+
+ @Override
+ public Bytes getPrivateKey() {
+ return Bytes.wrap(privKey.raw());
+ }
+
+ @Override
+ public Optional getPeer(final NodeId id) {
+ return peerManager.getPeer(id);
+ }
+
+ @Override
+ public Stream streamPeers() {
+ return peerManager.streamPeers();
+ }
+
+ @Override
+ public NodeId parseNodeId(final String nodeId) {
+ return new LibP2PNodeId(PeerId.fromBase58(nodeId));
+ }
+
+ @Override
+ public int getPeerCount() {
+ return peerManager.getPeerCount();
+ }
+
+ @Override
+ public int getListenPort() {
+ return listenPort;
+ }
+
+ @Override
+ public SafeFuture> stop() {
+ if (!state.compareAndSet(State.RUNNING, State.STOPPED)) {
+ return SafeFuture.COMPLETE;
+ }
+ LOG.debug("JvmLibP2PNetwork.stop()");
+ return SafeFuture.of(host.stop());
+ }
+
+ @Override
+ public NodeId getNodeId() {
+ return nodeId;
+ }
+
+ @Override
+ public Optional getEnr() {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional getDiscoveryAddress() {
+ return Optional.empty();
+ }
+
+ @Override
+ public SafeFuture> gossip(final String topic, final Bytes data) {
+ return gossipNetwork.gossip(topic, data);
+ }
+
+ @Override
+ public TopicChannel subscribe(final String topic, final TopicHandler topicHandler) {
+ return gossipNetwork.subscribe(topic, topicHandler);
+ }
+
+ @Override
+ public Map> getSubscribersByTopic() {
+ return gossipNetwork.getSubscribersByTopic();
+ }
+
+ @Override
+ public void updateGossipTopicScoring(final GossipTopicsScoringConfig config) {
+ gossipNetwork.updateGossipTopicScoring(config);
+ }
+
+ @FunctionalInterface
+ public interface PrivateKeyProvider {
+ PrivKey get();
+ }
+}
diff --git a/src/org/minima/system/network/base/LibP2PParamsFactory.java b/src/org/minima/system/network/base/LibP2PParamsFactory.java
new file mode 100644
index 000000000..1fe0cfbe7
--- /dev/null
+++ b/src/org/minima/system/network/base/LibP2PParamsFactory.java
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2021 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.minima.system.network.base;
+
+import io.libp2p.core.PeerId;
+import io.libp2p.pubsub.gossip.GossipParams;
+import io.libp2p.pubsub.gossip.GossipPeerScoreParams;
+import io.libp2p.pubsub.gossip.GossipScoreParams;
+import io.libp2p.pubsub.gossip.GossipTopicScoreParams;
+import io.libp2p.pubsub.gossip.GossipTopicsScoreParams;
+import io.libp2p.pubsub.gossip.builders.GossipPeerScoreParamsBuilder;
+import java.util.Map;
+import java.util.stream.Collectors;
+import kotlin.jvm.functions.Function1;
+import org.minima.system.network.base.gossip.config.GossipConfig;
+import org.minima.system.network.base.gossip.config.GossipPeerScoringConfig;
+import org.minima.system.network.base.gossip.config.GossipScoringConfig;
+import org.minima.system.network.base.gossip.config.GossipTopicScoringConfig;
+import org.minima.system.network.base.peer.LibP2PNodeId;
+import org.minima.system.network.base.gossip.config.GossipTopicScoringConfig;
+
+// import tech.pegasys.teku.networking.p2p.gossip.config.GossipConfig;
+// import tech.pegasys.teku.networking.p2p.gossip.config.GossipPeerScoringConfig;
+// import tech.pegasys.teku.networking.p2p.gossip.config.GossipScoringConfig;
+// import tech.pegasys.teku.networking.p2p.gossip.config.GossipTopicScoringConfig;
+//import tech.pegasys.teku.networking.p2p.libp2p.LibP2PNodeId;
+
+public class LibP2PParamsFactory {
+ public static GossipParams createGossipParams(final GossipConfig gossipConfig) {
+ return GossipParams.builder()
+ .D(gossipConfig.getD())
+ .DLow(gossipConfig.getDLow())
+ .DHigh(gossipConfig.getDHigh())
+ .DLazy(gossipConfig.getDLazy())
+ // Calculate dScore and dOut based on other params
+ .DScore(gossipConfig.getD() * 2 / 3)
+ .DOut(Math.min(gossipConfig.getD() / 2, Math.max(0, gossipConfig.getDLow()) - 1))
+ .fanoutTTL(gossipConfig.getFanoutTTL())
+ .gossipSize(gossipConfig.getAdvertise())
+ .gossipHistoryLength(gossipConfig.getHistory())
+ .heartbeatInterval(gossipConfig.getHeartbeatInterval())
+ .floodPublish(true)
+ .seenTTL(gossipConfig.getSeenTTL())
+ .maxPublishedMessages(1000)
+ .maxTopicsPerPublishedMessage(1)
+ .maxSubscriptions(200)
+ .maxGraftMessages(200)
+ .maxPruneMessages(200)
+ .maxPeersPerPruneMessage(1000)
+ .maxIHaveLength(5000)
+ .maxIWantMessageIds(5000)
+ .build();
+ }
+
+ public static GossipScoreParams createGossipScoreParams(final GossipScoringConfig config) {
+ return GossipScoreParams.builder()
+ .peerScoreParams(createPeerScoreParams(config.getPeerScoringConfig()))
+ .topicsScoreParams(createTopicsScoreParams(config))
+ .gossipThreshold(config.getGossipThreshold())
+ .publishThreshold(config.getPublishThreshold())
+ .graylistThreshold(config.getGraylistThreshold())
+ .acceptPXThreshold(config.getAcceptPXThreshold())
+ .opportunisticGraftThreshold(config.getOpportunisticGraftThreshold())
+ .build();
+ }
+
+ public static GossipPeerScoreParams createPeerScoreParams(final GossipPeerScoringConfig config) {
+ final GossipPeerScoreParamsBuilder builder =
+ GossipPeerScoreParams.builder()
+ .topicScoreCap(config.getTopicScoreCap())
+ .appSpecificWeight(config.getAppSpecificWeight())
+ .ipColocationFactorWeight(config.getIpColocationFactorWeight())
+ .ipColocationFactorThreshold(config.getIpColocationFactorThreshold())
+ .behaviourPenaltyWeight(config.getBehaviourPenaltyWeight())
+ .behaviourPenaltyDecay(config.getBehaviourPenaltyDecay())
+ .behaviourPenaltyThreshold(config.getBehaviourPenaltyThreshold())
+ .decayInterval(config.getDecayInterval())
+ .decayToZero(config.getDecayToZero())
+ .retainScore(config.getRetainScore());
+
+ // Configure optional params
+ config
+ .getAppSpecificScorer()
+ .ifPresent(
+ scorer -> {
+ final Function1 super PeerId, Double> appSpecificScore =
+ peerId -> scorer.scorePeer(new LibP2PNodeId(peerId));
+ builder.appSpecificScore(appSpecificScore);
+ });
+
+ config
+ .getDirectPeerManager()
+ .ifPresent(
+ mgr -> {
+ final Function1 super PeerId, Boolean> isDirectPeer =
+ peerId -> mgr.isDirectPeer(new LibP2PNodeId(peerId));
+ builder.isDirect(isDirectPeer);
+ });
+
+ config
+ .getWhitelistManager()
+ .ifPresent(
+ mgr -> {
+ // Ip whitelisting
+ final Function1 super String, Boolean> isIpWhitelisted = mgr::isWhitelisted;
+ builder.ipWhitelisted(isIpWhitelisted);
+ });
+
+ return builder.build();
+ }
+
+ public static GossipTopicsScoreParams createTopicsScoreParams(final GossipScoringConfig config) {
+ final GossipTopicScoreParams defaultTopicParams =
+ createTopicScoreParams(config.getDefaultTopicScoringConfig());
+ final Map topicParams =
+ config.getTopicScoringConfig().entrySet().stream()
+ .collect(
+ Collectors.toMap(Map.Entry::getKey, e -> createTopicScoreParams(e.getValue())));
+ return new GossipTopicsScoreParams(defaultTopicParams, topicParams);
+ }
+
+ public static GossipTopicScoreParams createTopicScoreParams(
+ final GossipTopicScoringConfig config) {
+ return GossipTopicScoreParams.builder()
+ .topicWeight(config.getTopicWeight())
+ .timeInMeshWeight(config.getTimeInMeshWeight())
+ .timeInMeshQuantum(config.getTimeInMeshQuantum())
+ .timeInMeshCap(config.getTimeInMeshCap())
+ .firstMessageDeliveriesWeight(config.getFirstMessageDeliveriesWeight())
+ .firstMessageDeliveriesDecay(config.getFirstMessageDeliveriesDecay())
+ .firstMessageDeliveriesCap(config.getFirstMessageDeliveriesCap())
+ .meshMessageDeliveriesWeight(config.getMeshMessageDeliveriesWeight())
+ .meshMessageDeliveriesDecay(config.getMeshMessageDeliveriesDecay())
+ .meshMessageDeliveriesThreshold(config.getMeshMessageDeliveriesThreshold())
+ .meshMessageDeliveriesCap(config.getMeshMessageDeliveriesCap())
+ .meshMessageDeliveriesActivation(config.getMeshMessageDeliveriesActivation())
+ .meshMessageDeliveryWindow(config.getMeshMessageDeliveryWindow())
+ .meshFailurePenaltyWeight(config.getMeshFailurePenaltyWeight())
+ .meshFailurePenaltyDecay(config.getMeshFailurePenaltyDecay())
+ .invalidMessageDeliveriesWeight(config.getInvalidMessageDeliveriesWeight())
+ .invalidMessageDeliveriesDecay(config.getInvalidMessageDeliveriesDecay())
+ .build();
+ }
+}
diff --git a/src/org/minima/system/network/base/LimitedMap.java b/src/org/minima/system/network/base/LimitedMap.java
new file mode 100644
index 000000000..d009a3b2e
--- /dev/null
+++ b/src/org/minima/system/network/base/LimitedMap.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2020 ConsenSys AG.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.minima.system.network.base;
+
+import com.google.common.cache.CacheBuilder;
+import java.util.Map;
+
+
+/** Helper that creates a map with a maximum capacity. */
+public final class LimitedMap {
+ private LimitedMap() {}
+
+ /**
+ * Creates a limited map. The returned map is safe for concurrent access and evicts the least
+ * recently used items.
+ *
+ * @param maxSize The maximum number of elements to keep in the map.
+ * @param The key type of the map.
+ * @param The value type of the map.
+ * @return A map that will evict elements when the max size is exceeded.
+ */
+ public static Map create(final int maxSize) {
+ return defaultBuilder(maxSize).build().asMap();
+ }
+
+ /**
+ * Creates a limited map using soft references for values. The returned map is safe for concurrent
+ * access and evicts the least recently used items.
+ *
+ *
Items may be evicted before maxSize is reached if the garbage collector needs to free up
+ * memory.
+ *
+ * @param maxSize The maximum number of elements to keep in the map.
+ * @param The key type of the map.
+ * @param The value type of the map.
+ * @return A map that will evict elements when the max size is exceeded or when the GC evicts
+ * them.
+ */
+ public static Map createSoft(final int maxSize) {
+ return defaultBuilder(maxSize).softValues().build().asMap();
+ }
+
+ private static CacheBuilder