From 8445bd1e324fef60acfdae6ba92eaebf25f378b0 Mon Sep 17 00:00:00 2001 From: GRYE Date: Fri, 8 Nov 2019 15:43:56 +0300 Subject: [PATCH 1/6] Small NVS refactoring --- .../scala/encry/network/DeliveryManager.scala | 1 - .../encry/network/NodeViewSynchronizer.scala | 18 ++++++++++++------ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/main/scala/encry/network/DeliveryManager.scala b/src/main/scala/encry/network/DeliveryManager.scala index fbe8380f6a..97fbc562cc 100644 --- a/src/main/scala/encry/network/DeliveryManager.scala +++ b/src/main/scala/encry/network/DeliveryManager.scala @@ -115,7 +115,6 @@ class DeliveryManager(influxRef: Option[ActorRef], case OtherNodeSyncingStatus(remote, status, extOpt) => status match { - case Unknown => logger.info("Peer status is still unknown.") case Younger | Fork if isBlockChainSynced => sendInvData(remote, status, extOpt) case _ => } diff --git a/src/main/scala/encry/network/NodeViewSynchronizer.scala b/src/main/scala/encry/network/NodeViewSynchronizer.scala index 7a1e0dd2a0..e1ceb5c9a1 100644 --- a/src/main/scala/encry/network/NodeViewSynchronizer.scala +++ b/src/main/scala/encry/network/NodeViewSynchronizer.scala @@ -112,13 +112,19 @@ class NodeViewSynchronizer(influxRef: Option[ActorRef], case DataFromPeer(message, remote) => message match { case SyncInfoNetworkMessage(syncInfo) => Option(history) match { case Some(historyReader) => - val ext: Seq[ModifierId] = historyReader.continuationIds(syncInfo, settings.network.syncPacketLength) val comparison: HistoryComparisonResult = historyReader.compare(syncInfo) - logger.info(s"Comparison with $remote having starting points ${idsToString(syncInfo.startingPoints)}. " + - s"Comparison result is $comparison. Sending extension of length ${ext.length}.") - if (!(ext.nonEmpty || comparison != Younger)) logger.warn("Extension is empty while comparison is younger") - deliveryManager ! OtherNodeSyncingStatus(remote, comparison, Some(ext.map(h => Header.modifierTypeId -> h))) - peersKeeper ! OtherNodeSyncingStatus(remote, comparison, Some(ext.map(h => Header.modifierTypeId -> h))) + peersKeeper ! OtherNodeSyncingStatus(remote, comparison, None) + + comparison match { + case Younger | Fork => + val ext: Seq[ModifierId] = historyReader.continuationIds(syncInfo, settings.network.syncPacketLength) + logger.info(s"Comparison with $remote having starting points ${idsToString(syncInfo.startingPoints)}. " + + s"Comparison result is $comparison. Sending extension of length ${ext.length}.") + if (ext.isEmpty && comparison == Younger) logger.warn("Extension is empty while comparison is younger") + deliveryManager ! OtherNodeSyncingStatus(remote, comparison, Some(ext.map(h => Header.modifierTypeId -> h))) + case Unknown => logger.info(s"Peer $remote status is still unknown.") + case _ => + } case _ => } case RequestModifiersNetworkMessage((typeId, requestedIds)) if chainSynced || settings.node.offlineGeneration => From 6e7fd2f38fcf3dd4e8aa53b61d9c913ccb6a9dc7 Mon Sep 17 00:00:00 2001 From: GRYE Date: Fri, 8 Nov 2019 17:22:31 +0300 Subject: [PATCH 2/6] Additional tests for History --- .../scala/encry/view/history/HistoryApi.scala | 2 +- .../view/history/ContinuationIdsTest.scala | 73 +++++++++++++++++++ 2 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 src/test/scala/encry/view/history/ContinuationIdsTest.scala diff --git a/src/main/scala/encry/view/history/HistoryApi.scala b/src/main/scala/encry/view/history/HistoryApi.scala index 1c48406a14..c8df17e355 100644 --- a/src/main/scala/encry/view/history/HistoryApi.scala +++ b/src/main/scala/encry/view/history/HistoryApi.scala @@ -223,7 +223,7 @@ trait HistoryApi extends HistoryDBApi { //scalastyle:ignore } def continuationIds(info: SyncInfo, size: Int): Seq[ModifierId] = - if (getBestHeaderId.isEmpty) info.startingPoints.map(_._2) + if (getBestHeaderId.isEmpty) Seq.empty else if (info.lastHeaderIds.isEmpty) { val heightFrom: Int = Math.min(getBestHeaderHeight, size - 1) (for { diff --git a/src/test/scala/encry/view/history/ContinuationIdsTest.scala b/src/test/scala/encry/view/history/ContinuationIdsTest.scala new file mode 100644 index 0000000000..9fd3e41d0f --- /dev/null +++ b/src/test/scala/encry/view/history/ContinuationIdsTest.scala @@ -0,0 +1,73 @@ +package encry.view.history + +import encry.modifiers.InstanceFactory +import encry.network.DeliveryManagerTests.DMUtils.generateBlocks +import encry.settings.TestNetSettings +import org.encryfoundation.common.modifiers.history.Block +import org.encryfoundation.common.network.SyncInfo +import org.scalatest.{Matchers, OneInstancePerTest, WordSpecLike} + +class ContinuationIdsTest extends WordSpecLike + with Matchers + with InstanceFactory + with OneInstancePerTest + with TestNetSettings { + + "History Reader" should { + + "correctly compute continuation ids with empty history" in { + val history: History = generateDummyHistory(testNetSettings) + val blocks: List[Block] = generateBlocks(100, generateDummyHistory(testNetSettings))._2 + val syncInfo: SyncInfo = SyncInfo(blocks.map(_.header.id)) + + val ids = history.continuationIds(syncInfo, 100) + ids shouldBe Seq.empty + } + + "correctly compute continuation ids for empty SyncInfo" in { + val history: History = generateDummyHistory(testNetSettings) + val blocks: List[Block] = generateBlocks(100, generateDummyHistory(testNetSettings))._2 + val syncInfo: SyncInfo = SyncInfo(Seq.empty) + + val updatedHistory: History = blocks.foldLeft(history) { case (hst, block) => + hst.append(block.header) + hst.append(block.payload) + hst.reportModifierIsValid(block) + } + + val ids = updatedHistory.continuationIds(syncInfo, 100) + ids shouldBe blocks.map(_.header.id) + } + + "correctly compute continuation ids if our best height is higher than others best height" in { + val history: History = generateDummyHistory(testNetSettings) + val blocks: List[Block] = generateBlocks(100, generateDummyHistory(testNetSettings))._2 + val syncInfo: SyncInfo = SyncInfo(blocks.take(30).map(_.header.id)) + + val updatedHistory: History = blocks.foldLeft(history) { case (hst, block) => + hst.append(block.header) + hst.append(block.payload) + hst.reportModifierIsValid(block) + } + + val ids = updatedHistory.continuationIds(syncInfo, 100) + ids shouldBe blocks.map(_.header.id).drop(30) + } + + "correctly compute continuation ids if others best height is higher than our best height" in { + val history: History = generateDummyHistory(testNetSettings) + val blocks: List[Block] = generateBlocks(100, generateDummyHistory(testNetSettings))._2 + val syncInfo: SyncInfo = SyncInfo(blocks.map(_.header.id)) + + val updatedHistory: History = blocks.take(30).foldLeft(history) { case (hst, block) => + hst.append(block.header) + hst.append(block.payload) + hst.reportModifierIsValid(block) + } + + val ids = updatedHistory.continuationIds(syncInfo, 100) + ids shouldBe blocks.map(_.header.id).take(30) + } + + } +} From daca8688730a5bf412685231eb08be3d2c97a311 Mon Sep 17 00:00:00 2001 From: GRYE Date: Mon, 11 Nov 2019 14:47:14 +0300 Subject: [PATCH 3/6] Additional tests for DM --- .../DeliveryManagerRequestModifiesSpec.scala | 2 +- .../RequestModifiersSpec.scala | 89 +++++++++++++++++++ 2 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 src/test/scala/encry/network/DeliveryManagerTests/RequestModifiersSpec.scala diff --git a/src/test/scala/encry/network/DeliveryManagerTests/DeliveryManagerRequestModifiesSpec.scala b/src/test/scala/encry/network/DeliveryManagerTests/DeliveryManagerRequestModifiesSpec.scala index 190f11c8d4..2e96bba6f6 100644 --- a/src/test/scala/encry/network/DeliveryManagerTests/DeliveryManagerRequestModifiesSpec.scala +++ b/src/test/scala/encry/network/DeliveryManagerTests/DeliveryManagerRequestModifiesSpec.scala @@ -50,7 +50,7 @@ class DeliveryManagerRequestModifiesSpec extends WordSpecLike with BeforeAndAfte } "RequestModifies" should { - "handle uniq modifiers from RequestFromLocal message correctly" in { + "handle unique modifiers from RequestFromLocal message correctly" in { val (deliveryManager, cp1, _, _, _, headersIds, headersAsKey) = initialiseState() val updatedPeersCollection: Map[InetSocketAddress, (ConnectedPeer, HistoryConsensus.Older.type, PeersPriorityStatus)] = Map(cp1.socketAddress -> (cp1, Older, InitialPriority)) diff --git a/src/test/scala/encry/network/DeliveryManagerTests/RequestModifiersSpec.scala b/src/test/scala/encry/network/DeliveryManagerTests/RequestModifiersSpec.scala new file mode 100644 index 0000000000..cc0f398508 --- /dev/null +++ b/src/test/scala/encry/network/DeliveryManagerTests/RequestModifiersSpec.scala @@ -0,0 +1,89 @@ +package encry.network.DeliveryManagerTests + +import java.net.InetSocketAddress + +import akka.actor.ActorSystem +import akka.testkit.{TestActorRef, TestKit, TestProbe} +import encry.consensus.HistoryConsensus.{Fork, Unknown} +import encry.modifiers.InstanceFactory +import encry.network.DeliveryManager.FullBlockChainIsSynced +import encry.network.DeliveryManagerTests.DMUtils.{createPeer, generateBlocks} +import encry.network.NetworkController.ReceivableMessages.DataFromPeer +import encry.network.NodeViewSynchronizer +import encry.network.NodeViewSynchronizer.ReceivableMessages.{ChangedHistory, RequestFromLocal} +import encry.network.PeerConnectionHandler.{ConnectedPeer, Incoming} +import encry.network.PeersKeeper.UpdatedPeersCollection +import encry.network.PrioritiesCalculator.PeersPriorityStatus.PeersPriorityStatus.InitialPriority +import encry.settings.TestNetSettings +import encry.view.history.History +import org.encryfoundation.common.modifiers.history.{Block, Header} +import org.encryfoundation.common.network.BasicMessagesRepo.{Handshake, InvNetworkMessage, RequestModifiersNetworkMessage, SyncInfoNetworkMessage} +import org.encryfoundation.common.network.SyncInfo +import org.scalatest.mockito.MockitoSugar +import org.scalatest.{BeforeAndAfterAll, Matchers, OneInstancePerTest, WordSpecLike} + +class RequestModifiersSpec extends TestKit(ActorSystem("RequestModifiersSpec")) + with WordSpecLike + with BeforeAndAfterAll + with MockitoSugar + with Matchers + with InstanceFactory + with OneInstancePerTest + with TestNetSettings { + + override def afterAll: Unit = { + TestKit.shutdownActorSystem(system) + } + + "NodeViewSynchronizer" should { + "correctly request modifiers from forked node" in { + + val nvs: TestActorRef[NodeViewSynchronizer] = + TestActorRef[NodeViewSynchronizer](NodeViewSynchronizer.props(None, TestProbe().ref, settings, TestProbe().ref, TestProbe().ref)) + + val address = new InetSocketAddress("123.123.123.124", 9001) + val handler: TestProbe = TestProbe() + val cp: ConnectedPeer = ConnectedPeer(address, handler.ref, Incoming, + Handshake(protocolToBytes(testNetSettings.network.appVersion), + "123.123.123.124", Some(address), System.currentTimeMillis())) + + val initHistory = generateDummyHistory(testNetSettings) + val otherHistory = generateDummyHistory(testNetSettings) + + val (_, blocks) = generateBlocks(100, generateDummyHistory(testNetSettings)) + + val ourHistory = blocks.foldLeft(initHistory) { case (hst, block) => + hst.append(block.header) + hst.append(block.payload) + hst.reportModifierIsValid(block) + } + + val commonHistory = blocks.take(60).foldLeft(otherHistory) { case (hst, block) => + hst.append(block.header) + hst.append(block.payload) + hst.reportModifierIsValid(block) + } + + val (forkedHistory, forkedBlocks) = generateBlocks(40, commonHistory) + + val syncInfo = SyncInfo((blocks.slice(50, 60) ++ forkedBlocks).map(_.id)) + + nvs ! ChangedHistory(ourHistory) + + nvs ! FullBlockChainIsSynced + + nvs ! UpdatedPeersCollection(Map(address -> (cp, Fork, InitialPriority))) + + nvs ! DataFromPeer(SyncInfoNetworkMessage(syncInfo), cp) + + handler.expectMsg(InvNetworkMessage(Header.modifierTypeId -> blocks.drop(60).map(_.id))) + + val idsForLocalRequest = forkedBlocks.map(_.id).filterNot(ourHistory.isModifierDefined) + + nvs ! RequestFromLocal(cp, Header.modifierTypeId, idsForLocalRequest) + + handler.expectMsg(RequestModifiersNetworkMessage(Header.modifierTypeId -> idsForLocalRequest)) + } + } + +} From 78cc9e8a589b16c10916826f61a7ddb9d37c9aac Mon Sep 17 00:00:00 2001 From: GRYE Date: Mon, 11 Nov 2019 16:52:19 +0300 Subject: [PATCH 4/6] Additional tests for DM --- ...rsSpec.scala => DeliveryManagerSpec.scala} | 40 ++++++++++++++++--- 1 file changed, 34 insertions(+), 6 deletions(-) rename src/test/scala/encry/network/DeliveryManagerTests/{RequestModifiersSpec.scala => DeliveryManagerSpec.scala} (65%) diff --git a/src/test/scala/encry/network/DeliveryManagerTests/RequestModifiersSpec.scala b/src/test/scala/encry/network/DeliveryManagerTests/DeliveryManagerSpec.scala similarity index 65% rename from src/test/scala/encry/network/DeliveryManagerTests/RequestModifiersSpec.scala rename to src/test/scala/encry/network/DeliveryManagerTests/DeliveryManagerSpec.scala index cc0f398508..4eca23c72c 100644 --- a/src/test/scala/encry/network/DeliveryManagerTests/RequestModifiersSpec.scala +++ b/src/test/scala/encry/network/DeliveryManagerTests/DeliveryManagerSpec.scala @@ -7,22 +7,25 @@ import akka.testkit.{TestActorRef, TestKit, TestProbe} import encry.consensus.HistoryConsensus.{Fork, Unknown} import encry.modifiers.InstanceFactory import encry.network.DeliveryManager.FullBlockChainIsSynced -import encry.network.DeliveryManagerTests.DMUtils.{createPeer, generateBlocks} +import encry.network.DeliveryManagerTests.DMUtils.{createPeer, generateBlocks, generateDummyHistory} import encry.network.NetworkController.ReceivableMessages.DataFromPeer -import encry.network.NodeViewSynchronizer -import encry.network.NodeViewSynchronizer.ReceivableMessages.{ChangedHistory, RequestFromLocal} +import encry.network.{DeliveryManager, NodeViewSynchronizer} +import encry.network.NodeViewSynchronizer.ReceivableMessages.{ChangedHistory, RequestFromLocal, UpdatedHistory} import encry.network.PeerConnectionHandler.{ConnectedPeer, Incoming} import encry.network.PeersKeeper.UpdatedPeersCollection import encry.network.PrioritiesCalculator.PeersPriorityStatus.PeersPriorityStatus.InitialPriority import encry.settings.TestNetSettings import encry.view.history.History +import encry.view.mempool.MemoryPool.StopTransactionsValidation import org.encryfoundation.common.modifiers.history.{Block, Header} -import org.encryfoundation.common.network.BasicMessagesRepo.{Handshake, InvNetworkMessage, RequestModifiersNetworkMessage, SyncInfoNetworkMessage} +import org.encryfoundation.common.modifiers.mempool.transaction.Transaction +import org.encryfoundation.common.network.BasicMessagesRepo.{Handshake, InvNetworkMessage, ModifiersNetworkMessage, RequestModifiersNetworkMessage, SyncInfoNetworkMessage} import org.encryfoundation.common.network.SyncInfo +import org.encryfoundation.common.utils.TaggedTypes.ModifierId import org.scalatest.mockito.MockitoSugar import org.scalatest.{BeforeAndAfterAll, Matchers, OneInstancePerTest, WordSpecLike} -class RequestModifiersSpec extends TestKit(ActorSystem("RequestModifiersSpec")) +class DeliveryManagerSpec extends TestKit(ActorSystem("RequestModifiersSpec")) with WordSpecLike with BeforeAndAfterAll with MockitoSugar @@ -35,7 +38,7 @@ class RequestModifiersSpec extends TestKit(ActorSystem("RequestModifiersSpec")) TestKit.shutdownActorSystem(system) } - "NodeViewSynchronizer" should { + "DeliveryManager" should { "correctly request modifiers from forked node" in { val nvs: TestActorRef[NodeViewSynchronizer] = @@ -84,6 +87,31 @@ class RequestModifiersSpec extends TestKit(ActorSystem("RequestModifiersSpec")) handler.expectMsg(RequestModifiersNetworkMessage(Header.modifierTypeId -> idsForLocalRequest)) } + + "not process transactions if number of transactions in mempool exceeds limit" in { + + val downloadedModifiersValidator = TestProbe() + val address = new InetSocketAddress("123.123.123.124", 9001) + val handler: TestProbe = TestProbe() + val cp: ConnectedPeer = ConnectedPeer(address, handler.ref, Incoming, + Handshake(protocolToBytes(testNetSettings.network.appVersion), + "123.123.123.124", Some(address), System.currentTimeMillis())) + + val deliveryManager: TestActorRef[DeliveryManager] = + TestActorRef[DeliveryManager](DeliveryManager + .props(None, TestProbe().ref, TestProbe().ref, TestProbe().ref, TestProbe().ref, downloadedModifiersValidator.ref, settings)) + + val history: History = generateDummyHistory(settings) + + deliveryManager ! UpdatedHistory(history) + deliveryManager ! StopTransactionsValidation + + val txs: Map[ModifierId, Array[Byte]] = genValidPaymentTxs(100).groupBy(_.id).mapValues(_.head.bytes) + val msg = ModifiersNetworkMessage(Transaction.modifierTypeId, txs) + + deliveryManager ! DataFromPeer(msg, cp) + downloadedModifiersValidator.expectNoMsg() + } } } From b2973b37f75c1cd22d70cfe64c25053f61bd0291 Mon Sep 17 00:00:00 2001 From: GRYE Date: Mon, 11 Nov 2019 17:53:44 +0300 Subject: [PATCH 5/6] Additional tests for DM --- .../network/DeliveryManagerTests/DeliveryManagerSpec.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/test/scala/encry/network/DeliveryManagerTests/DeliveryManagerSpec.scala b/src/test/scala/encry/network/DeliveryManagerTests/DeliveryManagerSpec.scala index 4eca23c72c..6d5be27377 100644 --- a/src/test/scala/encry/network/DeliveryManagerTests/DeliveryManagerSpec.scala +++ b/src/test/scala/encry/network/DeliveryManagerTests/DeliveryManagerSpec.scala @@ -86,6 +86,7 @@ class DeliveryManagerSpec extends TestKit(ActorSystem("RequestModifiersSpec")) nvs ! RequestFromLocal(cp, Header.modifierTypeId, idsForLocalRequest) handler.expectMsg(RequestModifiersNetworkMessage(Header.modifierTypeId -> idsForLocalRequest)) + nvs.stop() } "not process transactions if number of transactions in mempool exceeds limit" in { @@ -111,6 +112,7 @@ class DeliveryManagerSpec extends TestKit(ActorSystem("RequestModifiersSpec")) deliveryManager ! DataFromPeer(msg, cp) downloadedModifiersValidator.expectNoMsg() + deliveryManager.stop() } } From 5b4932e5422bdeb8fa3b8e99ca2d377a6af7e49e Mon Sep 17 00:00:00 2001 From: GRYE Date: Mon, 18 Nov 2019 10:47:11 +0300 Subject: [PATCH 6/6] Small refactoring --- .../scala/encry/network/DeliveryManager.scala | 1 + .../encry/network/NodeViewSynchronizer.scala | 15 ++++++++------- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/src/main/scala/encry/network/DeliveryManager.scala b/src/main/scala/encry/network/DeliveryManager.scala index 97fbc562cc..fbe8380f6a 100644 --- a/src/main/scala/encry/network/DeliveryManager.scala +++ b/src/main/scala/encry/network/DeliveryManager.scala @@ -115,6 +115,7 @@ class DeliveryManager(influxRef: Option[ActorRef], case OtherNodeSyncingStatus(remote, status, extOpt) => status match { + case Unknown => logger.info("Peer status is still unknown.") case Younger | Fork if isBlockChainSynced => sendInvData(remote, status, extOpt) case _ => } diff --git a/src/main/scala/encry/network/NodeViewSynchronizer.scala b/src/main/scala/encry/network/NodeViewSynchronizer.scala index a07a06c716..f081971ce7 100644 --- a/src/main/scala/encry/network/NodeViewSynchronizer.scala +++ b/src/main/scala/encry/network/NodeViewSynchronizer.scala @@ -113,20 +113,21 @@ class NodeViewSynchronizer(influxRef: Option[ActorRef], case SyncInfoNetworkMessage(syncInfo) => Option(history) match { case Some(historyReader) => val comparison: HistoryComparisonResult = historyReader.compare(syncInfo) - peersKeeper ! OtherNodeSyncingStatus(remote, comparison, None) - - comparison match { + val extOpt = comparison match { case Younger | Fork => val ext: Seq[ModifierId] = historyReader.continuationIds(syncInfo, settings.network.syncPacketLength) logger.info(s"Comparison with $remote having starting points ${idsToString(syncInfo.startingPoints)}. " + s"Comparison result is $comparison. Sending extension of length ${ext.length}.") if (ext.isEmpty && comparison == Younger) logger.warn("Extension is empty while comparison is younger") - deliveryManager ! OtherNodeSyncingStatus(remote, comparison, Some(ext.map(h => Header.modifierTypeId -> h))) - case Unknown => logger.info(s"Peer $remote status is still unknown.") - case _ => + Some(ext.map(h => Header.modifierTypeId -> h)) + case _ => None } - case _ => + + peersKeeper ! OtherNodeSyncingStatus(remote, comparison, None) + deliveryManager ! OtherNodeSyncingStatus(remote, comparison, extOpt) + } + case RequestModifiersNetworkMessage((typeId, requestedIds)) if chainSynced || settings.node.offlineGeneration => val modifiersFromCache: Map[ModifierId, Array[Byte]] = requestedIds .flatMap(id => modifiersRequestCache