Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/main/scala/encry/network/DeliveryManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ class DeliveryManager(influxRef: Option[ActorRef],

case OtherNodeSyncingStatus(remote, status, extOpt) =>
status match {
case Unknown => logger.info("Peer status is still unknown.")
case Younger | Fork if isBlockChainSynced => sendInvData(remote, status, extOpt)
case _ =>
}
Expand Down
18 changes: 12 additions & 6 deletions src/main/scala/encry/network/NodeViewSynchronizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,19 @@ class NodeViewSynchronizer(influxRef: Option[ActorRef],
case DataFromPeer(message, remote) => message match {
case SyncInfoNetworkMessage(syncInfo) => Option(history) match {
case Some(historyReader) =>
val ext: Seq[ModifierId] = historyReader.continuationIds(syncInfo, settings.network.syncPacketLength)
val comparison: HistoryComparisonResult = historyReader.compare(syncInfo)
logger.info(s"Comparison with $remote having starting points ${idsToString(syncInfo.startingPoints)}. " +
s"Comparison result is $comparison. Sending extension of length ${ext.length}.")
if (!(ext.nonEmpty || comparison != Younger)) logger.warn("Extension is empty while comparison is younger")
deliveryManager ! OtherNodeSyncingStatus(remote, comparison, Some(ext.map(h => Header.modifierTypeId -> h)))
peersKeeper ! OtherNodeSyncingStatus(remote, comparison, Some(ext.map(h => Header.modifierTypeId -> h)))
peersKeeper ! OtherNodeSyncingStatus(remote, comparison, None)

comparison match {
case Younger | Fork =>
val ext: Seq[ModifierId] = historyReader.continuationIds(syncInfo, settings.network.syncPacketLength)
logger.info(s"Comparison with $remote having starting points ${idsToString(syncInfo.startingPoints)}. " +
s"Comparison result is $comparison. Sending extension of length ${ext.length}.")
if (ext.isEmpty && comparison == Younger) logger.warn("Extension is empty while comparison is younger")
deliveryManager ! OtherNodeSyncingStatus(remote, comparison, Some(ext.map(h => Header.modifierTypeId -> h)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you sending OtherNodeSyncingStatus only when compr result is Younger | Fork?

case Unknown => logger.info(s"Peer $remote status is still unknown.")
case _ =>
}
case _ =>
}
case RequestModifiersNetworkMessage((typeId, requestedIds)) if chainSynced || settings.node.offlineGeneration =>
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/encry/view/history/HistoryApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ trait HistoryApi extends HistoryDBApi { //scalastyle:ignore
}

def continuationIds(info: SyncInfo, size: Int): Seq[ModifierId] =
if (getBestHeaderId.isEmpty) info.startingPoints.map(_._2)
if (getBestHeaderId.isEmpty) Seq.empty
else if (info.lastHeaderIds.isEmpty) {
val heightFrom: Int = Math.min(getBestHeaderHeight, size - 1)
(for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class DeliveryManagerRequestModifiesSpec extends WordSpecLike with BeforeAndAfte
}

"RequestModifies" should {
"handle uniq modifiers from RequestFromLocal message correctly" in {
"handle unique modifiers from RequestFromLocal message correctly" in {
val (deliveryManager, cp1, _, _, _, headersIds, headersAsKey) = initialiseState()
val updatedPeersCollection: Map[InetSocketAddress, (ConnectedPeer, HistoryConsensus.Older.type, PeersPriorityStatus)] =
Map(cp1.socketAddress -> (cp1, Older, InitialPriority))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package encry.network.DeliveryManagerTests

import java.net.InetSocketAddress

import akka.actor.ActorSystem
import akka.testkit.{TestActorRef, TestKit, TestProbe}
import encry.consensus.HistoryConsensus.{Fork, Unknown}
import encry.modifiers.InstanceFactory
import encry.network.DeliveryManager.FullBlockChainIsSynced
import encry.network.DeliveryManagerTests.DMUtils.{createPeer, generateBlocks, generateDummyHistory}
import encry.network.NetworkController.ReceivableMessages.DataFromPeer
import encry.network.{DeliveryManager, NodeViewSynchronizer}
import encry.network.NodeViewSynchronizer.ReceivableMessages.{ChangedHistory, RequestFromLocal, UpdatedHistory}
import encry.network.PeerConnectionHandler.{ConnectedPeer, Incoming}
import encry.network.PeersKeeper.UpdatedPeersCollection
import encry.network.PrioritiesCalculator.PeersPriorityStatus.PeersPriorityStatus.InitialPriority
import encry.settings.TestNetSettings
import encry.view.history.History
import encry.view.mempool.MemoryPool.StopTransactionsValidation
import org.encryfoundation.common.modifiers.history.{Block, Header}
import org.encryfoundation.common.modifiers.mempool.transaction.Transaction
import org.encryfoundation.common.network.BasicMessagesRepo.{Handshake, InvNetworkMessage, ModifiersNetworkMessage, RequestModifiersNetworkMessage, SyncInfoNetworkMessage}
import org.encryfoundation.common.network.SyncInfo
import org.encryfoundation.common.utils.TaggedTypes.ModifierId
import org.scalatest.mockito.MockitoSugar
import org.scalatest.{BeforeAndAfterAll, Matchers, OneInstancePerTest, WordSpecLike}

class DeliveryManagerSpec extends TestKit(ActorSystem("RequestModifiersSpec"))
with WordSpecLike
with BeforeAndAfterAll
with MockitoSugar
with Matchers
with InstanceFactory
with OneInstancePerTest
with TestNetSettings {

override def afterAll: Unit = {
TestKit.shutdownActorSystem(system)
}

"DeliveryManager" should {
"correctly request modifiers from forked node" in {

val nvs: TestActorRef[NodeViewSynchronizer] =
TestActorRef[NodeViewSynchronizer](NodeViewSynchronizer.props(None, TestProbe().ref, settings, TestProbe().ref, TestProbe().ref))

val address = new InetSocketAddress("123.123.123.124", 9001)
val handler: TestProbe = TestProbe()
val cp: ConnectedPeer = ConnectedPeer(address, handler.ref, Incoming,
Handshake(protocolToBytes(testNetSettings.network.appVersion),
"123.123.123.124", Some(address), System.currentTimeMillis()))

val initHistory = generateDummyHistory(testNetSettings)
val otherHistory = generateDummyHistory(testNetSettings)

val (_, blocks) = generateBlocks(100, generateDummyHistory(testNetSettings))

val ourHistory = blocks.foldLeft(initHistory) { case (hst, block) =>
hst.append(block.header)
hst.append(block.payload)
hst.reportModifierIsValid(block)
}

val commonHistory = blocks.take(60).foldLeft(otherHistory) { case (hst, block) =>
hst.append(block.header)
hst.append(block.payload)
hst.reportModifierIsValid(block)
}

val (forkedHistory, forkedBlocks) = generateBlocks(40, commonHistory)

val syncInfo = SyncInfo((blocks.slice(50, 60) ++ forkedBlocks).map(_.id))

nvs ! ChangedHistory(ourHistory)

nvs ! FullBlockChainIsSynced

nvs ! UpdatedPeersCollection(Map(address -> (cp, Fork, InitialPriority)))

nvs ! DataFromPeer(SyncInfoNetworkMessage(syncInfo), cp)

handler.expectMsg(InvNetworkMessage(Header.modifierTypeId -> blocks.drop(60).map(_.id)))

val idsForLocalRequest = forkedBlocks.map(_.id).filterNot(ourHistory.isModifierDefined)

nvs ! RequestFromLocal(cp, Header.modifierTypeId, idsForLocalRequest)

handler.expectMsg(RequestModifiersNetworkMessage(Header.modifierTypeId -> idsForLocalRequest))
nvs.stop()
}

"not process transactions if number of transactions in mempool exceeds limit" in {

val downloadedModifiersValidator = TestProbe()
val address = new InetSocketAddress("123.123.123.124", 9001)
val handler: TestProbe = TestProbe()
val cp: ConnectedPeer = ConnectedPeer(address, handler.ref, Incoming,
Handshake(protocolToBytes(testNetSettings.network.appVersion),
"123.123.123.124", Some(address), System.currentTimeMillis()))

val deliveryManager: TestActorRef[DeliveryManager] =
TestActorRef[DeliveryManager](DeliveryManager
.props(None, TestProbe().ref, TestProbe().ref, TestProbe().ref, TestProbe().ref, downloadedModifiersValidator.ref, settings))

val history: History = generateDummyHistory(settings)

deliveryManager ! UpdatedHistory(history)
deliveryManager ! StopTransactionsValidation

val txs: Map[ModifierId, Array[Byte]] = genValidPaymentTxs(100).groupBy(_.id).mapValues(_.head.bytes)
val msg = ModifiersNetworkMessage(Transaction.modifierTypeId, txs)

deliveryManager ! DataFromPeer(msg, cp)
downloadedModifiersValidator.expectNoMsg()
deliveryManager.stop()
}
}

}
73 changes: 73 additions & 0 deletions src/test/scala/encry/view/history/ContinuationIdsTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package encry.view.history

import encry.modifiers.InstanceFactory
import encry.network.DeliveryManagerTests.DMUtils.generateBlocks
import encry.settings.TestNetSettings
import org.encryfoundation.common.modifiers.history.Block
import org.encryfoundation.common.network.SyncInfo
import org.scalatest.{Matchers, OneInstancePerTest, WordSpecLike}

class ContinuationIdsTest extends WordSpecLike
with Matchers
with InstanceFactory
with OneInstancePerTest
with TestNetSettings {

"History Reader" should {

"correctly compute continuation ids with empty history" in {
val history: History = generateDummyHistory(testNetSettings)
val blocks: List[Block] = generateBlocks(100, generateDummyHistory(testNetSettings))._2
val syncInfo: SyncInfo = SyncInfo(blocks.map(_.header.id))

val ids = history.continuationIds(syncInfo, 100)
ids shouldBe Seq.empty
}

"correctly compute continuation ids for empty SyncInfo" in {
val history: History = generateDummyHistory(testNetSettings)
val blocks: List[Block] = generateBlocks(100, generateDummyHistory(testNetSettings))._2
val syncInfo: SyncInfo = SyncInfo(Seq.empty)

val updatedHistory: History = blocks.foldLeft(history) { case (hst, block) =>
hst.append(block.header)
hst.append(block.payload)
hst.reportModifierIsValid(block)
}

val ids = updatedHistory.continuationIds(syncInfo, 100)
ids shouldBe blocks.map(_.header.id)
}

"correctly compute continuation ids if our best height is higher than others best height" in {
val history: History = generateDummyHistory(testNetSettings)
val blocks: List[Block] = generateBlocks(100, generateDummyHistory(testNetSettings))._2
val syncInfo: SyncInfo = SyncInfo(blocks.take(30).map(_.header.id))

val updatedHistory: History = blocks.foldLeft(history) { case (hst, block) =>
hst.append(block.header)
hst.append(block.payload)
hst.reportModifierIsValid(block)
}

val ids = updatedHistory.continuationIds(syncInfo, 100)
ids shouldBe blocks.map(_.header.id).drop(30)
}

"correctly compute continuation ids if others best height is higher than our best height" in {
val history: History = generateDummyHistory(testNetSettings)
val blocks: List[Block] = generateBlocks(100, generateDummyHistory(testNetSettings))._2
val syncInfo: SyncInfo = SyncInfo(blocks.map(_.header.id))

val updatedHistory: History = blocks.take(30).foldLeft(history) { case (hst, block) =>
hst.append(block.header)
hst.append(block.payload)
hst.reportModifierIsValid(block)
}

val ids = updatedHistory.continuationIds(syncInfo, 100)
ids shouldBe blocks.map(_.header.id).take(30)
}

}
}