Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions src/main/scala/encry/network/NodeViewSynchronizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,22 @@ class NodeViewSynchronizer(influxRef: Option[ActorRef],
case DataFromPeer(message, remote) => message match {
case SyncInfoNetworkMessage(syncInfo) => Option(history) match {
case Some(historyReader) =>
val ext: Seq[ModifierId] = historyReader.continuationIds(syncInfo, settings.network.syncPacketLength)
val comparison: HistoryComparisonResult = historyReader.compare(syncInfo)
logger.info(s"Comparison with $remote having starting points ${idsToString(syncInfo.startingPoints)}. " +
s"Comparison result is $comparison. Sending extension of length ${ext.length}.")
if (!(ext.nonEmpty || comparison != Younger)) logger.warn("Extension is empty while comparison is younger")
deliveryManager ! OtherNodeSyncingStatus(remote, comparison, Some(ext.map(h => Header.modifierTypeId -> h)))
peersKeeper ! OtherNodeSyncingStatus(remote, comparison, Some(ext.map(h => Header.modifierTypeId -> h)))
case _ =>
val extOpt = comparison match {
case Younger | Fork =>
val ext: Seq[ModifierId] = historyReader.continuationIds(syncInfo, settings.network.syncPacketLength)
logger.info(s"Comparison with $remote having starting points ${idsToString(syncInfo.startingPoints)}. " +
s"Comparison result is $comparison. Sending extension of length ${ext.length}.")
if (ext.isEmpty && comparison == Younger) logger.warn("Extension is empty while comparison is younger")
Some(ext.map(h => Header.modifierTypeId -> h))
case _ => None
}

peersKeeper ! OtherNodeSyncingStatus(remote, comparison, None)
deliveryManager ! OtherNodeSyncingStatus(remote, comparison, extOpt)

}

case RequestModifiersNetworkMessage((typeId, requestedIds)) if chainSynced || settings.node.offlineGeneration =>
val modifiersFromCache: Map[ModifierId, Array[Byte]] = requestedIds
.flatMap(id => modifiersRequestCache
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/encry/view/history/HistoryApi.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ trait HistoryApi extends HistoryDBApi { //scalastyle:ignore
}

def continuationIds(info: SyncInfo, size: Int): Seq[ModifierId] =
if (getBestHeaderId.isEmpty) info.startingPoints.map(_._2)
if (getBestHeaderId.isEmpty) Seq.empty
else if (info.lastHeaderIds.isEmpty) {
val heightFrom: Int = Math.min(getBestHeaderHeight, size - 1)
(for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class DeliveryManagerRequestModifiesSpec extends WordSpecLike with BeforeAndAfte
}

"RequestModifies" should {
"handle uniq modifiers from RequestFromLocal message correctly" in {
"handle unique modifiers from RequestFromLocal message correctly" in {
val (deliveryManager, cp1, _, _, _, headersIds, headersAsKey) = initialiseState()
val updatedPeersCollection: Map[InetSocketAddress, (ConnectedPeer, HistoryConsensus.Older.type, PeersPriorityStatus)] =
Map(cp1.socketAddress -> (cp1, Older, InitialPriority))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package encry.network.DeliveryManagerTests

import java.net.InetSocketAddress

import akka.actor.ActorSystem
import akka.testkit.{TestActorRef, TestKit, TestProbe}
import encry.consensus.HistoryConsensus.{Fork, Unknown}
import encry.modifiers.InstanceFactory
import encry.network.DeliveryManager.FullBlockChainIsSynced
import encry.network.DeliveryManagerTests.DMUtils.{createPeer, generateBlocks, generateDummyHistory}
import encry.network.NetworkController.ReceivableMessages.DataFromPeer
import encry.network.{DeliveryManager, NodeViewSynchronizer}
import encry.network.NodeViewSynchronizer.ReceivableMessages.{ChangedHistory, RequestFromLocal, UpdatedHistory}
import encry.network.PeerConnectionHandler.{ConnectedPeer, Incoming}
import encry.network.PeersKeeper.UpdatedPeersCollection
import encry.network.PrioritiesCalculator.PeersPriorityStatus.PeersPriorityStatus.InitialPriority
import encry.settings.TestNetSettings
import encry.view.history.History
import encry.view.mempool.MemoryPool.StopTransactionsValidation
import org.encryfoundation.common.modifiers.history.{Block, Header}
import org.encryfoundation.common.modifiers.mempool.transaction.Transaction
import org.encryfoundation.common.network.BasicMessagesRepo.{Handshake, InvNetworkMessage, ModifiersNetworkMessage, RequestModifiersNetworkMessage, SyncInfoNetworkMessage}
import org.encryfoundation.common.network.SyncInfo
import org.encryfoundation.common.utils.TaggedTypes.ModifierId
import org.scalatest.mockito.MockitoSugar
import org.scalatest.{BeforeAndAfterAll, Matchers, OneInstancePerTest, WordSpecLike}

class DeliveryManagerSpec extends TestKit(ActorSystem("RequestModifiersSpec"))
with WordSpecLike
with BeforeAndAfterAll
with MockitoSugar
with Matchers
with InstanceFactory
with OneInstancePerTest
with TestNetSettings {

override def afterAll: Unit = {
TestKit.shutdownActorSystem(system)
}

"DeliveryManager" should {
"correctly request modifiers from forked node" in {

val nvs: TestActorRef[NodeViewSynchronizer] =
TestActorRef[NodeViewSynchronizer](NodeViewSynchronizer.props(None, TestProbe().ref, settings, TestProbe().ref, TestProbe().ref))

val address = new InetSocketAddress("123.123.123.124", 9001)
val handler: TestProbe = TestProbe()
val cp: ConnectedPeer = ConnectedPeer(address, handler.ref, Incoming,
Handshake(protocolToBytes(testNetSettings.network.appVersion),
"123.123.123.124", Some(address), System.currentTimeMillis()))

val initHistory = generateDummyHistory(testNetSettings)
val otherHistory = generateDummyHistory(testNetSettings)

val (_, blocks) = generateBlocks(100, generateDummyHistory(testNetSettings))

val ourHistory = blocks.foldLeft(initHistory) { case (hst, block) =>
hst.append(block.header)
hst.append(block.payload)
hst.reportModifierIsValid(block)
}

val commonHistory = blocks.take(60).foldLeft(otherHistory) { case (hst, block) =>
hst.append(block.header)
hst.append(block.payload)
hst.reportModifierIsValid(block)
}

val (forkedHistory, forkedBlocks) = generateBlocks(40, commonHistory)

val syncInfo = SyncInfo((blocks.slice(50, 60) ++ forkedBlocks).map(_.id))

nvs ! ChangedHistory(ourHistory)

nvs ! FullBlockChainIsSynced

nvs ! UpdatedPeersCollection(Map(address -> (cp, Fork, InitialPriority)))

nvs ! DataFromPeer(SyncInfoNetworkMessage(syncInfo), cp)

handler.expectMsg(InvNetworkMessage(Header.modifierTypeId -> blocks.drop(60).map(_.id)))

val idsForLocalRequest = forkedBlocks.map(_.id).filterNot(ourHistory.isModifierDefined)

nvs ! RequestFromLocal(cp, Header.modifierTypeId, idsForLocalRequest)

handler.expectMsg(RequestModifiersNetworkMessage(Header.modifierTypeId -> idsForLocalRequest))
nvs.stop()
}

"not process transactions if number of transactions in mempool exceeds limit" in {

val downloadedModifiersValidator = TestProbe()
val address = new InetSocketAddress("123.123.123.124", 9001)
val handler: TestProbe = TestProbe()
val cp: ConnectedPeer = ConnectedPeer(address, handler.ref, Incoming,
Handshake(protocolToBytes(testNetSettings.network.appVersion),
"123.123.123.124", Some(address), System.currentTimeMillis()))

val deliveryManager: TestActorRef[DeliveryManager] =
TestActorRef[DeliveryManager](DeliveryManager
.props(None, TestProbe().ref, TestProbe().ref, TestProbe().ref, TestProbe().ref, downloadedModifiersValidator.ref, settings))

val history: History = generateDummyHistory(settings)

deliveryManager ! UpdatedHistory(history)
deliveryManager ! StopTransactionsValidation

val txs: Map[ModifierId, Array[Byte]] = genValidPaymentTxs(100).groupBy(_.id).mapValues(_.head.bytes)
val msg = ModifiersNetworkMessage(Transaction.modifierTypeId, txs)

deliveryManager ! DataFromPeer(msg, cp)
downloadedModifiersValidator.expectNoMsg()
deliveryManager.stop()
}
}

}
73 changes: 73 additions & 0 deletions src/test/scala/encry/view/history/ContinuationIdsTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package encry.view.history

import encry.modifiers.InstanceFactory
import encry.network.DeliveryManagerTests.DMUtils.generateBlocks
import encry.settings.TestNetSettings
import org.encryfoundation.common.modifiers.history.Block
import org.encryfoundation.common.network.SyncInfo
import org.scalatest.{Matchers, OneInstancePerTest, WordSpecLike}

class ContinuationIdsTest extends WordSpecLike
with Matchers
with InstanceFactory
with OneInstancePerTest
with TestNetSettings {

"History Reader" should {

"correctly compute continuation ids with empty history" in {
val history: History = generateDummyHistory(testNetSettings)
val blocks: List[Block] = generateBlocks(100, generateDummyHistory(testNetSettings))._2
val syncInfo: SyncInfo = SyncInfo(blocks.map(_.header.id))

val ids = history.continuationIds(syncInfo, 100)
ids shouldBe Seq.empty
}

"correctly compute continuation ids for empty SyncInfo" in {
val history: History = generateDummyHistory(testNetSettings)
val blocks: List[Block] = generateBlocks(100, generateDummyHistory(testNetSettings))._2
val syncInfo: SyncInfo = SyncInfo(Seq.empty)

val updatedHistory: History = blocks.foldLeft(history) { case (hst, block) =>
hst.append(block.header)
hst.append(block.payload)
hst.reportModifierIsValid(block)
}

val ids = updatedHistory.continuationIds(syncInfo, 100)
ids shouldBe blocks.map(_.header.id)
}

"correctly compute continuation ids if our best height is higher than others best height" in {
val history: History = generateDummyHistory(testNetSettings)
val blocks: List[Block] = generateBlocks(100, generateDummyHistory(testNetSettings))._2
val syncInfo: SyncInfo = SyncInfo(blocks.take(30).map(_.header.id))

val updatedHistory: History = blocks.foldLeft(history) { case (hst, block) =>
hst.append(block.header)
hst.append(block.payload)
hst.reportModifierIsValid(block)
}

val ids = updatedHistory.continuationIds(syncInfo, 100)
ids shouldBe blocks.map(_.header.id).drop(30)
}

"correctly compute continuation ids if others best height is higher than our best height" in {
val history: History = generateDummyHistory(testNetSettings)
val blocks: List[Block] = generateBlocks(100, generateDummyHistory(testNetSettings))._2
val syncInfo: SyncInfo = SyncInfo(blocks.map(_.header.id))

val updatedHistory: History = blocks.take(30).foldLeft(history) { case (hst, block) =>
hst.append(block.header)
hst.append(block.payload)
hst.reportModifierIsValid(block)
}

val ids = updatedHistory.continuationIds(syncInfo, 100)
ids shouldBe blocks.map(_.header.id).take(30)
}

}
}