Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/main/scala/encry/network/BlackList.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ object BlackList {
final case class InvalidResponseManifestMessage(error: String) extends BanReason
final case class InvalidChunkMessage(error: String) extends BanReason
final case class InvalidManifestHasChangedMessage(error: String) extends BanReason
case object NotAllChunksSentMessage extends BanReason
case object UnrequestedChunksSentMessage extends BanReason
case object ExpiredNumberOfReRequestAttempts extends BanReason
case object ExpiredNumberOfRequests extends BanReason
final case class InvalidStateAfterFastSync(error: String) extends BanReason
Expand Down
49 changes: 25 additions & 24 deletions src/main/scala/encry/view/fast/sync/SnapshotHolder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,27 @@ package encry.view.fast.sync

import SnapshotChunkProto.SnapshotChunkMessage
import SnapshotManifestProto.SnapshotManifestProtoMessage
import akka.actor.{ Actor, ActorRef, Cancellable, Props }
import akka.actor.{Actor, ActorRef, Cancellable, Props}
import com.google.protobuf.ByteString
import com.typesafe.scalalogging.StrictLogging
import encry.network.Broadcast
import encry.network.NetworkController.ReceivableMessages.{ DataFromPeer, RegisterMessagesHandler }
import encry.network.PeersKeeper.{ BanPeer, SendToNetwork }
import encry.network.NetworkController.ReceivableMessages.{DataFromPeer, RegisterMessagesHandler}
import encry.network.PeersKeeper.{BanPeer, SendToNetwork}
import encry.settings.EncryAppSettings
import SnapshotHolder._
import encry.view.state.UtxoState
import org.encryfoundation.common.modifiers.history.Block
import org.encryfoundation.common.network.BasicMessagesRepo._
import org.encryfoundation.common.utils.Algos
import cats.syntax.option._
import encry.network.BlackList.BanReason.{
ExpiredNumberOfReRequestAttempts,
ExpiredNumberOfRequests,
InvalidChunkMessage,
InvalidResponseManifestMessage,
InvalidStateAfterFastSync
}
import encry.network.NodeViewSynchronizer.ReceivableMessages.{ ChangedHistory, SemanticallySuccessfulModifier }
import encry.storage.VersionalStorage.{ StorageKey, StorageValue }
import encry.view.fast.sync.FastSyncExceptions.{ ApplicableChunkIsAbsent, FastSyncException }
import encry.network.BlackList.BanReason.{ExpiredNumberOfReRequestAttempts, ExpiredNumberOfRequests, InvalidChunkMessage, InvalidResponseManifestMessage, InvalidStateAfterFastSync, NotAllChunksSentMessage, UnrequestedChunksSentMessage}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make imports formatting

import encry.network.NodeViewSynchronizer.ReceivableMessages.{ChangedHistory, SemanticallySuccessfulModifier}
import encry.storage.VersionalStorage.{StorageKey, StorageValue}
import encry.view.fast.sync.FastSyncExceptions.{ApplicableChunkIsAbsent, FastSyncException}
import encry.view.history.History
import encry.view.state.avlTree.{ Node, NodeSerilalizer }
import encry.view.state.avlTree.{Node, NodeSerilalizer}
import cats.syntax.either._

import scala.util.Try
import encry.view.state.avlTree.utils.implicits.Instances._

Expand Down Expand Up @@ -124,17 +119,22 @@ class SnapshotHolder(settings: EncryAppSettings,
(controller, chunk) = controllerAndChunk
validChunk <- snapshotProcessor.validateChunkId(chunk)
processor = snapshotProcessor.updateCache(validChunk)
newProcessor <- processor.processNextApplicableChunk(processor).leftFlatMap {
case e: ApplicableChunkIsAbsent => e.processor.asRight[FastSyncException]
case t => t.asLeft[SnapshotProcessor]
}
newProcessor <- processor.processNextApplicableChunk(processor).leftFlatMap {
case e: ApplicableChunkIsAbsent => e.processor.asRight[FastSyncException]
case t => t.asLeft[SnapshotProcessor]
}
} yield (newProcessor, controller)) match {
case Left(error) =>
nodeViewSynchronizer ! BanPeer(remote, InvalidChunkMessage(error.error))
restartFastSync(history)
case Right((processor, controller))
if controller.requestedChunks.isEmpty && controller.notYetRequested.isEmpty && processor.chunksCache.nonEmpty =>
nodeViewSynchronizer ! BanPeer(remote, InvalidChunkMessage("For request is empty, buffer is nonEmpty"))
if controller.requestedChunks.isEmpty &&
controller.notYetRequested.isEmpty &&
(processor.chunksCache.nonEmpty || processor.applicableChunks.nonEmpty) =>
if (processor.chunksCache.nonEmpty)
nodeViewSynchronizer ! BanPeer(remote, UnrequestedChunksSentMessage)
else
nodeViewSynchronizer ! BanPeer(remote, NotAllChunksSentMessage)
restartFastSync(history)
case Right((processor, controller))
if controller.requestedChunks.isEmpty && controller.notYetRequested.isEmpty =>
Expand Down Expand Up @@ -233,10 +233,11 @@ class SnapshotHolder(settings: EncryAppSettings,
def workMod(history: History): Receive = {
case TreeChunks(chunks, id) =>
//todo add collection with potentialManifestsIds to NVH
val manifestIds: Seq[Array[Byte]] = snapshotProcessor.potentialManifestsIds
if (!manifestIds.exists(_.sameElements(id))) {
snapshotProcessor.createNewSnapshot(id, manifestIds, chunks)
} else logger.info(s"Doesn't need to create snapshot")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove empty line

snapshotProcessor.createNewSnapshot(id, chunks).fold( err =>
logger.warn(s"Failed to create new snapshot due to ${err.error}"),
newProc => snapshotProcessor = newProc
)

case SemanticallySuccessfulModifier(block: Block) if history.isFullChainSynced =>
logger.info(s"Snapshot holder got semantically successful modifier message. Started processing it.")
Expand Down
42 changes: 22 additions & 20 deletions src/main/scala/encry/view/fast/sync/SnapshotProcessor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -193,28 +193,30 @@ final case class SnapshotProcessor(settings: EncryAppSettings,

def createNewSnapshot(
id: Array[Byte],
manifestIds: Seq[Array[Byte]],
newChunks: List[SnapshotChunk]
): Either[ProcessNewSnapshotError, SnapshotProcessor] = {
//todo add only exists chunks
val manifest: SnapshotManifest = SnapshotManifest(id, newChunks.map(_.id))
val snapshotToDB: List[(StorageKey, StorageValue)] = newChunks.map { elem =>
val bytes: Array[Byte] = SnapshotChunkSerializer.toProto(elem).toByteArray
StorageKey @@ elem.id -> StorageValue @@ bytes
}
val manifestToDB: (StorageKey, StorageValue) =
StorageKey @@ manifest.manifestId -> StorageValue @@ SnapshotManifestSerializer
.toProto(manifest)
.toByteArray
val updateList: (StorageKey, StorageValue) =
PotentialManifestsIdsKey -> StorageValue @@ (manifest.manifestId :: manifestIds.toList).flatten.toArray
val toApply: List[(StorageKey, StorageValue)] = manifestToDB :: updateList :: snapshotToDB
logger.info(s"A new snapshot created successfully. Insertion started.")
Either.catchNonFatal(storage.insert(StorageVersion @@ Random.randomBytes(), toApply, List.empty)) match {
case Left(value) => ProcessNewSnapshotError(value.getMessage).asLeft[SnapshotProcessor]
case Right(_) => this.asRight[ProcessNewSnapshotError]
): Either[ProcessNewSnapshotError, SnapshotProcessor] =
if (potentialManifestsIds.exists(_.sameElements(id)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't take this value from db 2 times (here and at 212 line) potentialManifestsIds. Take it only 1 time

ProcessNewSnapshotError(s"Potential manifest with id ${Algos.encode(id)} already exists").asLeft[SnapshotProcessor]
else {
//todo add only exists chunks
val manifest: SnapshotManifest = SnapshotManifest(id, newChunks.map(_.id))
val snapshotToDB: List[(StorageKey, StorageValue)] = newChunks.map { elem =>
val bytes: Array[Byte] = SnapshotChunkSerializer.toProto(elem).toByteArray
StorageKey @@ elem.id -> StorageValue @@ bytes
}
val manifestToDB: (StorageKey, StorageValue) =
StorageKey @@ manifest.manifestId -> StorageValue @@ SnapshotManifestSerializer
.toProto(manifest)
.toByteArray
val updateList: (StorageKey, StorageValue) =
PotentialManifestsIdsKey -> StorageValue @@ (manifest.manifestId :: potentialManifestsIds.toList).flatten.toArray
val toApply: List[(StorageKey, StorageValue)] = manifestToDB :: updateList :: snapshotToDB
logger.info(s"A new snapshot created successfully. Insertion started.")
Either.catchNonFatal(storage.insert(StorageVersion @@ Random.randomBytes(), toApply, List.empty)) match {
case Left(value) => ProcessNewSnapshotError(value.getMessage).asLeft[SnapshotProcessor]
case Right(_) => this.asRight[ProcessNewSnapshotError]
}
}
}

private def updateActualSnapshot(history: History, height: Int): Either[ProcessNewBlockError, SnapshotProcessor] =
for {
Expand Down
5 changes: 5 additions & 0 deletions src/main/scala/encry/view/state/avlTree/AvlTree.scala
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ final case class AvlTree[K : Hashable : Order, V] (rootNode: Node[K, V], storage
kSer: Serializer[K],
vSer: Serializer[V]
): (Option[Node[K, V]]) = node match {
case _: EmptyNode[K, V] => None
case shadowNode: ShadowNode[K, V] =>
val restoredNode = shadowNode.restoreFullNode(storage)
delete(restoredNode, key)
Expand Down Expand Up @@ -498,6 +499,7 @@ final case class AvlTree[K : Hashable : Order, V] (rootNode: Node[K, V], storage
}

def selfInspectionAfterFastSync(implicit kSer: Serializer[K]): Boolean = {

@scala.annotation.tailrec
def loop(nodesToProcess: List[Node[K, V]], keysToInspect: List[Array[Byte]]): List[Array[Byte]] =
if (nodesToProcess.nonEmpty) nodesToProcess.head match {
Expand All @@ -520,10 +522,13 @@ final case class AvlTree[K : Hashable : Order, V] (rootNode: Node[K, V], storage
loop(updatedNodeToProcess ::: next, Algos.hash(kSer.toBytes(i.key).reverse) :: i.hash :: current ::: keysToInspect)
case l: LeafNode[K, V] => loop(nodesToProcess.drop(1), Algos.hash(kSer.toBytes(l.key).reverse) :: l.hash :: keysToInspect)
} else keysToInspect

val keys: Set[ByteArrayWrapper] = loop(List(rootNode), List.empty).map(ByteArrayWrapper(_)).toSet

val allKeysFromDB: Set[ByteArrayWrapper] = storage.getAllKeys(-1)
.map(ByteArrayWrapper(_)).toSet - ByteArrayWrapper(UtxoState.bestHeightKey) - ByteArrayWrapper(AvlTree.rootNodeKey)
logger.debug(s"${keys.map(l => Algos.encode(l.data))}")

(allKeysFromDB -- keys).isEmpty
}

Expand Down
57 changes: 57 additions & 0 deletions src/test/scala/encry/modifiers/InstanceFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@ import encry.consensus.EncrySupplyController
import encry.modifiers.mempool._
import encry.modifiers.state.Keys
import encry.settings.{EncryAppSettings, NodeSettings}
import encry.storage.VersionalStorage.{StorageKey, StorageValue, StorageVersion}
import encry.storage.levelDb.versionalLevelDB.{LevelDbFactory, VLDBWrapper, VersionalLevelDBCompanion}
import encry.utils.implicits.UTXO.combineAll
import encry.utils.{EncryGenerator, FileHelper, NetworkTimeProvider, TestHelper}
import encry.view.history.History
import encry.view.history.storage.HistoryStorage
import encry.view.state.UtxoState
import encry.view.state.avlTree.AvlTree
import io.iohk.iodb.LSMStore
import org.encryfoundation.common.modifiers.history.{Block, Header, Payload}
import org.encryfoundation.common.modifiers.mempool.transaction.EncryAddress.Address
import org.encryfoundation.common.modifiers.mempool.transaction.{Input, Transaction}
import org.encryfoundation.common.modifiers.state.box.{AssetBox, EncryProposition}
import org.encryfoundation.common.modifiers.state.box.Box.Amount
Expand Down Expand Up @@ -170,6 +175,58 @@ trait InstanceFactory extends Keys with EncryGenerator {
Block(header, Payload(header.id, txs))
}

def generateNextBlockAndInsert(history: History,
tree: AvlTree[StorageKey, StorageValue],
insert: Boolean,
difficultyDiff: BigInt = 0,
prevId: Option[ModifierId] = None,
txsQty: Int = 100,
additionalDifficulty: BigInt = 0,
addressOpt: Option[Address] = None): (Block, AvlTree[StorageKey, StorageValue]) = {

import encry.utils.implicits.UTXO._
import encry.view.state.avlTree.utils.implicits.Instances._

val previousHeaderId: ModifierId =
prevId.getOrElse(history.getBestHeader.map(_.id).getOrElse(Header.GenesisParentId))
val requiredDifficulty: Difficulty = history.getBestHeader.map(parent =>
history.requiredDifficultyAfter(parent).getOrElse(Difficulty @@ BigInt(0)))
.getOrElse(history.settings.constants.InitialDifficulty)

val txs = { addressOpt match {
case Some(address) => if (txsQty != 0) genValidPaymentTxsToAddr(Scarand.nextInt(txsQty), address) else Seq.empty
case None => if (txsQty != 0) genValidPaymentTxs(Scarand.nextInt(txsQty)) else Seq.empty
}

} ++ Seq(coinbaseAt(history.getBestHeaderHeight + 1))

val combinedStateChange: UtxoState.StateChange = combineAll(txs.map(UtxoState.tx2StateChange).toList)
val newStateRoot = tree.getOperationsRootHash(
combinedStateChange.outputsToDb.toList, combinedStateChange.inputsToDb.toList
).get

val header = genHeader.copy(
parentId = previousHeaderId,
height = history.getBestHeaderHeight + 1,
difficulty = Difficulty @@ (requiredDifficulty + difficultyDiff + additionalDifficulty),
transactionsRoot = Payload.rootHash(txs.map(_.id)),
stateRoot = newStateRoot
)

val block = Block(header, Payload(header.id, txs))

val newTree: AvlTree[StorageKey, StorageValue] = if (insert)
tree.insertAndDeleteMany(
StorageVersion !@@ block.id,
combinedStateChange.outputsToDb.toList,
combinedStateChange.inputsToDb.toList,
Height @@ block.header.height
) else tree

(block, newTree)

}

def genForkOn(qty: Int,
addDifficulty: BigInt = 0,
from: Int,
Expand Down
40 changes: 40 additions & 0 deletions src/test/scala/encry/network/DeliveryManagerTests/DMUtils.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package encry.network.DeliveryManagerTests

import java.io.File
import java.net.InetSocketAddress

import akka.actor.ActorSystem
import akka.testkit.{TestActorRef, TestProbe}
import encry.local.miner.Miner.{DisableMining, StartMining}
Expand All @@ -10,10 +12,18 @@ import encry.network.DeliveryManager.FullBlockChainIsSynced
import encry.network.NodeViewSynchronizer.ReceivableMessages.UpdatedHistory
import encry.network.PeerConnectionHandler.{ConnectedPeer, Incoming}
import encry.settings.EncryAppSettings
import encry.storage.VersionalStorage.{StorageKey, StorageValue}
import encry.storage.levelDb.versionalLevelDB.{LevelDbFactory, VLDBWrapper, VersionalLevelDBCompanion}
import encry.utils.FileHelper
import encry.view.fast.sync.FastSyncTestsUtils.settings
import encry.view.history.History
import encry.view.state.avlTree.AvlTree
import org.encryfoundation.common.modifiers.history.Block
import org.encryfoundation.common.modifiers.mempool.transaction.EncryAddress.Address
import org.encryfoundation.common.network.BasicMessagesRepo.Handshake
import org.encryfoundation.common.utils.TaggedTypes.ModifierId
import org.iq80.leveldb.Options

import scala.collection.mutable
import scala.collection.mutable.WrappedArray

Expand Down Expand Up @@ -44,6 +54,36 @@ object DMUtils extends InstanceFactory {
(a, blocks :+ block)
}

def generateBlocksWithTree(qty: Int,
history: History,
prevTreeOpt: Option[AvlTree[StorageKey, StorageValue]] = None,
addressOpt: Option[Address] = None): (History, List[Block], AvlTree[StorageKey, StorageValue]) = {

import encry.view.state.avlTree.utils.implicits.Instances._

val avl: AvlTree[StorageKey, StorageValue] = prevTreeOpt match {
case Some(t) => t
case None =>
val dir: File = FileHelper.getRandomTempDir
val storage: VLDBWrapper = {
val levelDBInit = LevelDbFactory.factory.open(dir, new Options)
VLDBWrapper(VersionalLevelDBCompanion(levelDBInit, settings.levelDB, keySize = 32))
}

AvlTree[StorageKey, StorageValue](storage)
}

(0 until qty).foldLeft(history, List.empty[Block], avl) {
case ((prevHistory, blocks, tree), _) =>
val (block: Block, newTree: AvlTree[StorageKey, StorageValue]) =
generateNextBlockAndInsert(prevHistory, tree, prevTreeOpt.isEmpty, addressOpt = addressOpt)
prevHistory.append(block.header)
prevHistory.append(block.payload)
val newHist = prevHistory.reportModifierIsValid(block)
(newHist, blocks :+ block, newTree)
}
}

def toKey(id: ModifierId): WrappedArray.ofByte = new mutable.WrappedArray.ofByte(id)

def createPeer(port: Int,
Expand Down
Loading