From b6e12305c50889fdb2b2ed2dc5a1edea5727b827 Mon Sep 17 00:00:00 2001 From: GRYE Date: Thu, 14 Nov 2019 17:53:11 +0300 Subject: [PATCH 1/5] Fixed fast sync tests --- .../encry/view/state/avlTree/AvlTree.scala | 1 + .../encry/modifiers/InstanceFactory.scala | 57 +++ .../DeliveryManagerTests/DMUtils.scala | 40 ++ .../view/fast/sync/FastSyncTestsUtils.scala | 97 ++--- .../sync/SnapshotChunkSerializerTest.scala | 81 ++-- .../sync/SnapshotDownloadControllerTest.scala | 347 ++++++----------- .../fast/sync/SnapshotProcessorTest.scala | 352 +++++++----------- 7 files changed, 430 insertions(+), 545 deletions(-) diff --git a/src/main/scala/encry/view/state/avlTree/AvlTree.scala b/src/main/scala/encry/view/state/avlTree/AvlTree.scala index 88d383b485..2162cf4236 100644 --- a/src/main/scala/encry/view/state/avlTree/AvlTree.scala +++ b/src/main/scala/encry/view/state/avlTree/AvlTree.scala @@ -182,6 +182,7 @@ final case class AvlTree[K : Hashable : Order, V] (rootNode: Node[K, V], storage kSer: Serializer[K], vSer: Serializer[V] ): (Option[Node[K, V]]) = node match { + case _: EmptyNode[K, V] => None case shadowNode: ShadowNode[K, V] => val restoredNode = shadowNode.restoreFullNode(storage) delete(restoredNode, key) diff --git a/src/test/scala/encry/modifiers/InstanceFactory.scala b/src/test/scala/encry/modifiers/InstanceFactory.scala index cf88e5b04f..39a3897cba 100755 --- a/src/test/scala/encry/modifiers/InstanceFactory.scala +++ b/src/test/scala/encry/modifiers/InstanceFactory.scala @@ -4,12 +4,17 @@ import encry.consensus.EncrySupplyController import encry.modifiers.mempool._ import encry.modifiers.state.Keys import encry.settings.{EncryAppSettings, NodeSettings} +import encry.storage.VersionalStorage.{StorageKey, StorageValue, StorageVersion} import encry.storage.levelDb.versionalLevelDB.{LevelDbFactory, VLDBWrapper, VersionalLevelDBCompanion} +import encry.utils.implicits.UTXO.combineAll import encry.utils.{EncryGenerator, FileHelper, NetworkTimeProvider, TestHelper} import encry.view.history.History import encry.view.history.storage.HistoryStorage +import encry.view.state.UtxoState +import encry.view.state.avlTree.AvlTree import io.iohk.iodb.LSMStore import org.encryfoundation.common.modifiers.history.{Block, Header, Payload} +import org.encryfoundation.common.modifiers.mempool.transaction.EncryAddress.Address import org.encryfoundation.common.modifiers.mempool.transaction.{Input, Transaction} import org.encryfoundation.common.modifiers.state.box.{AssetBox, EncryProposition} import org.encryfoundation.common.modifiers.state.box.Box.Amount @@ -170,6 +175,58 @@ trait InstanceFactory extends Keys with EncryGenerator { Block(header, Payload(header.id, txs)) } + def generateNextBlockAndInsert(history: History, + tree: AvlTree[StorageKey, StorageValue], + insert: Boolean, + difficultyDiff: BigInt = 0, + prevId: Option[ModifierId] = None, + txsQty: Int = 100, + additionalDifficulty: BigInt = 0, + addressOpt: Option[Address] = None): (Block, AvlTree[StorageKey, StorageValue]) = { + + import encry.utils.implicits.UTXO._ + import encry.view.state.avlTree.utils.implicits.Instances._ + + val previousHeaderId: ModifierId = + prevId.getOrElse(history.getBestHeader.map(_.id).getOrElse(Header.GenesisParentId)) + val requiredDifficulty: Difficulty = history.getBestHeader.map(parent => + history.requiredDifficultyAfter(parent).getOrElse(Difficulty @@ BigInt(0))) + .getOrElse(history.settings.constants.InitialDifficulty) + + val txs = { addressOpt match { + case Some(address) => if (txsQty != 0) genValidPaymentTxsToAddr(Scarand.nextInt(txsQty), address) else Seq.empty + case None => if (txsQty != 0) genValidPaymentTxs(Scarand.nextInt(txsQty)) else Seq.empty + } + + } ++ Seq(coinbaseAt(history.getBestHeaderHeight + 1)) + + val combinedStateChange: UtxoState.StateChange = combineAll(txs.map(UtxoState.tx2StateChange).toList) + val newStateRoot = tree.getOperationsRootHash( + combinedStateChange.outputsToDb.toList, combinedStateChange.inputsToDb.toList + ).get + + val header = genHeader.copy( + parentId = previousHeaderId, + height = history.getBestHeaderHeight + 1, + difficulty = Difficulty @@ (requiredDifficulty + difficultyDiff + additionalDifficulty), + transactionsRoot = Payload.rootHash(txs.map(_.id)), + stateRoot = newStateRoot + ) + + val block = Block(header, Payload(header.id, txs)) + + val newTree: AvlTree[StorageKey, StorageValue] = if (insert) + tree.insertAndDeleteMany( + StorageVersion !@@ block.id, + combinedStateChange.outputsToDb.toList, + combinedStateChange.inputsToDb.toList, + Height @@ block.header.height + ) else tree + + (block, newTree) + + } + def genForkOn(qty: Int, addDifficulty: BigInt = 0, from: Int, diff --git a/src/test/scala/encry/network/DeliveryManagerTests/DMUtils.scala b/src/test/scala/encry/network/DeliveryManagerTests/DMUtils.scala index 0bb95839cc..55fb3274a2 100644 --- a/src/test/scala/encry/network/DeliveryManagerTests/DMUtils.scala +++ b/src/test/scala/encry/network/DeliveryManagerTests/DMUtils.scala @@ -1,6 +1,8 @@ package encry.network.DeliveryManagerTests +import java.io.File import java.net.InetSocketAddress + import akka.actor.ActorSystem import akka.testkit.{TestActorRef, TestProbe} import encry.local.miner.Miner.{DisableMining, StartMining} @@ -10,10 +12,18 @@ import encry.network.DeliveryManager.FullBlockChainIsSynced import encry.network.NodeViewSynchronizer.ReceivableMessages.UpdatedHistory import encry.network.PeerConnectionHandler.{ConnectedPeer, Incoming} import encry.settings.EncryAppSettings +import encry.storage.VersionalStorage.{StorageKey, StorageValue} +import encry.storage.levelDb.versionalLevelDB.{LevelDbFactory, VLDBWrapper, VersionalLevelDBCompanion} +import encry.utils.FileHelper +import encry.view.fast.sync.FastSyncTestsUtils.settings import encry.view.history.History +import encry.view.state.avlTree.AvlTree import org.encryfoundation.common.modifiers.history.Block +import org.encryfoundation.common.modifiers.mempool.transaction.EncryAddress.Address import org.encryfoundation.common.network.BasicMessagesRepo.Handshake import org.encryfoundation.common.utils.TaggedTypes.ModifierId +import org.iq80.leveldb.Options + import scala.collection.mutable import scala.collection.mutable.WrappedArray @@ -44,6 +54,36 @@ object DMUtils extends InstanceFactory { (a, blocks :+ block) } + def generateBlocksWithTree(qty: Int, + history: History, + prevTreeOpt: Option[AvlTree[StorageKey, StorageValue]] = None, + addressOpt: Option[Address] = None): (History, List[Block], AvlTree[StorageKey, StorageValue]) = { + + import encry.view.state.avlTree.utils.implicits.Instances._ + + val avl: AvlTree[StorageKey, StorageValue] = prevTreeOpt match { + case Some(t) => t + case None => + val dir: File = FileHelper.getRandomTempDir + val storage: VLDBWrapper = { + val levelDBInit = LevelDbFactory.factory.open(dir, new Options) + VLDBWrapper(VersionalLevelDBCompanion(levelDBInit, settings.levelDB, keySize = 32)) + } + + AvlTree[StorageKey, StorageValue](storage) + } + + (0 until qty).foldLeft(history, List.empty[Block], avl) { + case ((prevHistory, blocks, tree), _) => + val (block: Block, newTree: AvlTree[StorageKey, StorageValue]) = + generateNextBlockAndInsert(prevHistory, tree, prevTreeOpt.isEmpty, addressOpt = addressOpt) + prevHistory.append(block.header) + prevHistory.append(block.payload) + val newHist = prevHistory.reportModifierIsValid(block) + (newHist, blocks :+ block, newTree) + } + } + def toKey(id: ModifierId): WrappedArray.ofByte = new mutable.WrappedArray.ofByte(id) def createPeer(port: Int, diff --git a/src/test/scala/encry/view/fast/sync/FastSyncTestsUtils.scala b/src/test/scala/encry/view/fast/sync/FastSyncTestsUtils.scala index 56d144c004..6d4a0cafb8 100644 --- a/src/test/scala/encry/view/fast/sync/FastSyncTestsUtils.scala +++ b/src/test/scala/encry/view/fast/sync/FastSyncTestsUtils.scala @@ -8,72 +8,57 @@ import encry.settings.TestNetSettings import encry.storage.VersionalStorage.{StorageKey, StorageValue, StorageVersion} import encry.storage.levelDb.versionalLevelDB.{LevelDbFactory, VLDBWrapper, VersionalLevelDBCompanion} import encry.utils.FileHelper -import encry.view.fast.sync.SnapshotHolder.SnapshotManifest +import encry.view.fast.sync.SnapshotHolder.{SnapshotChunk, SnapshotManifest} import encry.view.history.History -import encry.view.state.UtxoState import encry.view.state.avlTree.AvlTree import org.iq80.leveldb.Options import scorex.utils.Random import encry.view.state.avlTree.utils.implicits.Instances._ import org.encryfoundation.common.modifiers.history.Block +import org.encryfoundation.common.modifiers.mempool.transaction.EncryAddress.Address import org.encryfoundation.common.utils.Algos -import org.encryfoundation.common.utils.TaggedTypes.Height object FastSyncTestsUtils extends InstanceFactory with TestNetSettings { -// def initializeTestState( -// from: Int = 0, -// to: Int = 100 -// ): (AvlTree[StorageKey, StorageValue], -// SnapshotProcessor, -// SnapshotDownloadController, -// List[Block], -// SnapshotManifest, -// History) = { -// val firstDir: File = FileHelper.getRandomTempDir -// val firstStorage: VLDBWrapper = { -// val levelDBInit = LevelDbFactory.factory.open(firstDir, new Options) -// VLDBWrapper(VersionalLevelDBCompanion(levelDBInit, settings.levelDB, keySize = 32)) -// } -// val history = generateDummyHistory(settings) -// val (_, blocks) = DMUtils.generateBlocks(5, history) -// -// val boxes: List[(StorageKey, StorageValue)] = blocks -// .flatMap(_.payload.txs.flatMap(_.newBoxes)) -// .map { bx => -// (StorageKey !@@ bx.id, StorageValue @@ bx.bytes) -// } -// -// val firstAvl: AvlTree[StorageKey, StorageValue] = AvlTree[StorageKey, StorageValue](firstStorage) -// val secondAvl = firstAvl -// .insertAndDeleteMany( -// StorageVersion @@ Random.randomBytes(), -// boxes, -// List.empty -// ) -// -// val newBlocks = blocks.map(l => Block(l.header.copy(stateRoot = secondAvl.rootHash), l.payload)) -// val history1 = generateDummyHistory(settings) -// val historyNew1 = newBlocks.foldLeft(history1) { -// case (history, block) => -// history.append(block.header) -// history.append(block.payload) -// history.reportModifierIsValid(block) -// } -// -// val snapshotProcessor: SnapshotProcessor = SnapshotProcessor -// .create(settings, tmpDir) -// .processNewSnapshot(UtxoState(secondAvl, Height @@ 0, settings.constants), newBlocks.last).right.get -// -// val snapshotDownloadController = SnapshotDownloadController.empty(settings) -// -// (secondAvl, -// snapshotProcessor, -// snapshotDownloadController, -// newBlocks, -// snapshotProcessor.manifestById(StorageKey @@ Algos.hash(secondAvl.rootHash ++ newBlocks.last.id)).get, -// historyNew1) -// } + def initializeTestState(initialBlocksQty: Int = 90, + additionalBlocksQty: Int = 10, + addressOpt: Option[Address] = None + ): (AvlTree[StorageKey, StorageValue], + SnapshotProcessor, + SnapshotDownloadController, + List[Block], + SnapshotManifest, + History) = { + + val (history, blocks, tree) = DMUtils.generateBlocksWithTree(initialBlocksQty, generateDummyHistory(settings), None, addressOpt) + val (_, additionalBlocks, _) = DMUtils.generateBlocksWithTree(additionalBlocksQty, history, Some(tree), addressOpt) + + val chunk = SnapshotChunk(tree.rootNode, tree.rootHash) + + val snapshotProcessor: SnapshotProcessor = SnapshotProcessor + .create(settings.copy(levelDB = settings.levelDB.copy(maxVersions = 10)), tmpDir) + .initializeApplicableChunksCache(history, history.getBestBlockHeight).right.get + .updateCache(chunk) + + val mans = snapshotProcessor.potentialManifestsIds + val sp = snapshotProcessor.createNewSnapshot( + Algos.hash(blocks.last.header.stateRoot ++ blocks.last.header.id), + mans, + List(chunk) + ).right.get + + val newProc = sp.processNewBlock(additionalBlocks.last, history).right.get + + val snapshotDownloadController = SnapshotDownloadController.empty(settings) + + + (tree, + newProc, + snapshotDownloadController, + blocks ++ additionalBlocks, + newProc.actualManifest.get, + history) + } def createAvl(address: String, from: Int, to: Int): AvlTree[StorageKey, StorageValue] = { val firstDir: File = FileHelper.getRandomTempDir diff --git a/src/test/scala/encry/view/fast/sync/SnapshotChunkSerializerTest.scala b/src/test/scala/encry/view/fast/sync/SnapshotChunkSerializerTest.scala index 3c4ee73c9a..2057a2b18f 100644 --- a/src/test/scala/encry/view/fast/sync/SnapshotChunkSerializerTest.scala +++ b/src/test/scala/encry/view/fast/sync/SnapshotChunkSerializerTest.scala @@ -1,47 +1,34 @@ -//package encry.view.fastSync -// -//import java.io.File -//import SnapshotChunkProto.SnapshotChunkMessage -//import encry.view.state.avlTree.utils.implicits.Instances._ -//import encry.modifiers.InstanceFactory -//import encry.settings.TestNetSettings -//import encry.storage.VersionalStorage.{ StorageKey, StorageValue, StorageVersion } -//import encry.storage.levelDb.versionalLevelDB.{ LevelDbFactory, VLDBWrapper, VersionalLevelDBCompanion } -//import encry.utils.FileHelper -//import encry.view.fastSync.SnapshotHolder.SnapshotChunkSerializer -//import encry.view.state.UtxoState -//import encry.view.state.avlTree.AvlTree -//import org.encryfoundation.common.utils.TaggedTypes.Height -//import org.iq80.leveldb.Options -//import org.scalatest.{ Matchers, OneInstancePerTest, WordSpecLike } -//import scorex.utils.Random -//import scala.util.Try -// -//class SnapshotChunkSerializerTest -// extends WordSpecLike -// with Matchers -// with InstanceFactory -// with OneInstancePerTest -// with TestNetSettings { -// -// "SnapshotChunkSerializer" should { -// "serialize|deserialize correctly" in { -// val avl1 = createAvl("9gKDVmfsA6J4b78jDBx6JmS86Zph98NnjnUqTJBkW7zitQMReia") -// val block1 = generateGenesisBlock(Height @@ 1) -// val snapshotProcessor: SnapshotProcessor = SnapshotProcessor.create(settings, tmpDir) -// val processor1: SnapshotProcessor = -// snapshotProcessor.processNewSnapshot(UtxoState(avl1, Height @@ 0, settings.constants), block1) -// val manifest1: SnapshotHolder.SnapshotManifest = processor1.bestPotentialManifest.get -// val chunk: SnapshotChunkMessage = processor1.getChunkById(manifest1.chunksKeys.head).get -// val chunkR: SnapshotHolder.SnapshotChunk = SnapshotChunkSerializer.fromProto(chunk).get -// -// val bytesChunk: Array[Byte] = SnapshotChunkSerializer.toProto(chunkR).toByteArray -// bytesChunk.isEmpty shouldBe false -// val ser: SnapshotChunkMessage = Try(SnapshotChunkMessage.parseFrom(bytesChunk)).get -// val deser: SnapshotHolder.SnapshotChunk = SnapshotChunkSerializer.fromProto(ser).get -// -// chunkR.id.sameElements(deser.id) shouldBe true -// } -// } -// -//} +package encry.view.fast.sync + +import SnapshotChunkProto.SnapshotChunkMessage +import encry.modifiers.InstanceFactory +import encry.settings.TestNetSettings +import encry.view.fast.sync.SnapshotHolder.SnapshotChunkSerializer +import org.scalatest.{Matchers, OneInstancePerTest, WordSpecLike} +import encry.view.fast.sync.FastSyncTestsUtils._ + +import scala.util.Try + +class SnapshotChunkSerializerTest extends WordSpecLike + with Matchers + with InstanceFactory + with OneInstancePerTest + with TestNetSettings { + + "SnapshotChunkSerializer" should { + "serialize|deserialize correctly" in { + val (_, processor, _, _, manifest, _) = initializeTestState() + + val chunk: SnapshotChunkMessage = processor.getChunkById(manifest.chunksKeys.head).get + val chunkR: SnapshotHolder.SnapshotChunk = SnapshotChunkSerializer.fromProto(chunk).get + + val bytesChunk: Array[Byte] = SnapshotChunkSerializer.toProto(chunkR).toByteArray + bytesChunk.isEmpty shouldBe false + val ser: SnapshotChunkMessage = Try(SnapshotChunkMessage.parseFrom(bytesChunk)).get + val deser: SnapshotHolder.SnapshotChunk = SnapshotChunkSerializer.fromProto(ser).get + + chunkR.id.sameElements(deser.id) shouldBe true + } + } + +} diff --git a/src/test/scala/encry/view/fast/sync/SnapshotDownloadControllerTest.scala b/src/test/scala/encry/view/fast/sync/SnapshotDownloadControllerTest.scala index b2fec15984..7da38a689b 100644 --- a/src/test/scala/encry/view/fast/sync/SnapshotDownloadControllerTest.scala +++ b/src/test/scala/encry/view/fast/sync/SnapshotDownloadControllerTest.scala @@ -3,256 +3,153 @@ package encry.view.fast.sync import java.net.InetSocketAddress import akka.actor.ActorSystem -import akka.testkit.TestProbe +import akka.testkit.{TestKit, TestProbe} import encry.modifiers.InstanceFactory import encry.network.PeerConnectionHandler.{ConnectedPeer, Incoming} import encry.settings.TestNetSettings -import encry.storage.VersionalStorage.{StorageKey, StorageValue} -import encry.view.state.avlTree.utils.implicits.Instances._ -import encry.view.state.avlTree.{AvlTree, NodeSerilalizer} import io.iohk.iodb.ByteArrayWrapper import org.encryfoundation.common.network.BasicMessagesRepo.Handshake import org.scalatest.{Matchers, OneInstancePerTest, WordSpecLike} import scorex.utils.Random import FastSyncTestsUtils._ -import SnapshotChunkProto.SnapshotChunkMessage import encry.view.fast.sync.SnapshotHolder.{SnapshotChunkSerializer, SnapshotManifestSerializer} -class SnapshotDownloadControllerTest - extends WordSpecLike +class SnapshotDownloadControllerTest extends TestKit(ActorSystem("SynchronousTestingSpec")) + with WordSpecLike with Matchers with InstanceFactory with OneInstancePerTest with TestNetSettings { - implicit val system: ActorSystem = ActorSystem("SynchronousTestingSpec") - "d" should { - "f" in { - val avl1: AvlTree[StorageKey, StorageValue] = - createAvl("9gKDVmfsA6J4b78jDBx6JmS86Zph98NnjnUqTJBkW7zitQMReia", 0, 20) + "SnapshotDownloadController" should { - val chunks = AvlTree.getChunks(avl1.rootNode, 1, avl1.storage) + "process manifest function" should { - val s = chunks.map { elem => - SnapshotChunkSerializer.toProto(elem).toByteArray + "correct height, correct manifest id, correct cp condition" in { + val (_, _, downloadController, blocks, manifest, history) = initializeTestState() + + val correctController: SnapshotDownloadController = downloadController.copy( + requiredManifestHeight = blocks.last.header.height + ) + + val result = + correctController.processManifest(SnapshotManifestSerializer.toProto(manifest), createRemote(), history) + + result.isRight shouldBe true + result.right.get.notYetRequested.forall { id => + manifest.chunksKeys.exists(_.sameElements(id)) + } shouldBe true } - println(chunks.head) + } + "process request chunk function" should { + + "validation success with correct peer and correct chunk" in { - println("\n----------//////--------\n") + val (_, processor, downloadController, _, manifest, _) = initializeTestState() - val d = s.map { el => - SnapshotChunkSerializer.fromProto(SnapshotChunkMessage.parseFrom(el)).get + val correctDownloadController = downloadController + .copy(cp = Some(createRemote()), + requestedChunks = + Set(ByteArrayWrapper(processor.getChunkById(manifest.chunksKeys.head).get.id.toByteArray))) + + val result = correctDownloadController.processRequestedChunk( + processor.getChunkById(manifest.chunksKeys.head).get, + createRemote() + ) + + result.isRight shouldBe true + result.right.get._1.requestedChunks.contains(ByteArrayWrapper(manifest.chunksKeys.head)) shouldBe false } - println(d.head) + "skip chunk from unknown peer" in { + + val (_, processor, downloadController, _, manifest, _) = initializeTestState() + + val result = downloadController.copy(requestedChunks = Set(ByteArrayWrapper(manifest.chunksKeys.head))).processRequestedChunk( + processor.getChunkById(manifest.chunksKeys.head).get, + createRemote(port = 3234) + ) + + result.isRight shouldBe true + } + + "validation error with incorrect chunk" in { + + val (_, processor, downloadController, _, manifest, _) = initializeTestState() + + val snapshotDownloadController = downloadController + .copy(cp = Some(createRemote())) + + val result = snapshotDownloadController.processRequestedChunk( + processor.getChunkById(manifest.chunksKeys.head).get.copy(id = null), + createRemote() + ) + + result.isLeft shouldBe true + } + + "validation error with un-waited chunk" in { + + val (_, processor, downloadController, _, manifest, _) = initializeTestState() + + val snapshotDownloadController = downloadController + .copy( + cp = Some(createRemote()), + requestedChunks = Set.empty + ) + + val result = snapshotDownloadController.processRequestedChunk( + processor.getChunkById(manifest.chunksKeys.head).get, + createRemote() + ) + + result.isLeft shouldBe true + } + + "correctly process correct chunk" in { + val (_, processor, downloadController, _, manifest, _) = initializeTestState() + + val snapshotDownloadController = downloadController + .copy(cp = Some(createRemote()), + requestedChunks = + Set(ByteArrayWrapper(processor.getChunkById(manifest.chunksKeys.last).get.id.toByteArray))) + + val result = snapshotDownloadController.processRequestedChunk( + processor.getChunkById(manifest.chunksKeys.last).get, + createRemote() + ) + + result.isRight shouldBe true + result.right.get._1.requestedChunks.isEmpty shouldBe true + result.right.get._2.id.sameElements(SnapshotChunkSerializer.fromProto(processor.getChunkById(manifest.chunksKeys.last).get).get.id) shouldBe true + } } - } -// "SnapshotDownloadController" should { -// "process manifest function" should { -// "correct height, correct manifest id, correct cp condition" in { -// val (_, _, downloadController, blocks, manifest, history) = initializeTestState() -// -// val correctController: SnapshotDownloadController = downloadController.copy( -// requiredManifestHeight = blocks.last.header.height -// ) -// -// val result = -// correctController.processManifest(SnapshotManifestSerializer.toProto(manifest), createRemote(), history) -// -// result.isRight shouldBe true -// result.right.get._2.nonEmpty shouldBe true -// result.right.get._1.notYetRequested.forall { id => -// manifest.chunksKeys.exists(_.sameElements(id)) -// } shouldBe true -// } -// "if some manifest is already in process - skip new one" in { -// val (_, _, downloadController, blocks, manifest, history) = initializeTestState() -// -// val correctDownloadController: SnapshotDownloadController = -// downloadController.copy( -// requiredManifestHeight = blocks.last.header.height, -// cp = Some(createRemote("999")) -// ) -// -// val res = correctDownloadController.processManifest( -// SnapshotManifestSerializer.toProto(manifest), -// createRemote(), -// history -// ) -// -// res.isRight shouldBe true -// res.right.get._2.isEmpty shouldBe true -// } -// "if manifest has wrong id - ban sender(when in process manifest exists)" in { -// val (_, _, downloadController, _, manifest, history) = initializeTestState() -// -// val correctController = downloadController.copy(cp = Some(createRemote("999"))) -// -// val res = correctController.processManifest( -// SnapshotManifestSerializer.toProto(manifest), -// createRemote(), -// history -// ) -// res.isLeft shouldBe true -// } -// "if manifest has wrong id - ban sender(when in process manifest doesn't exist)" in { -// val (_, _, downloadController, _, manifest, history) = initializeTestState() -// -// val res = downloadController.processManifest( -// SnapshotManifestSerializer.toProto(manifest), -// createRemote(), -// history -// ) -// res.isLeft shouldBe true -// } -// } -// "process request chunk function" should { -// "validation success with correct peer and correct chunk" in { -// val (_, processor, downloadController, _, manifest, _) = initializeTestState() -// -// val correctDownloadController = downloadController -// .copy(cp = Some(createRemote()), -// requestedChunks = -// Set(ByteArrayWrapper(processor.getChunkById(manifest.chunksKeys.head).get.id.toByteArray))) -// -// val result = correctDownloadController.processRequestedChunk( -// processor.getChunkById(manifest.chunksKeys.head).get, -// createRemote() -// ) -// -// result.isRight shouldBe true -// result.right.get._2.nonEmpty shouldBe true -// result.right.get._1.requestedChunks.contains(ByteArrayWrapper(manifest.chunksKeys.head)) shouldBe false -// } -// "skip chunk from unknown peer" in { -// val (_, processor, downloadController, _, manifest, _) = initializeTestState() -// -// val result = downloadController.processRequestedChunk( -// processor.getChunkById(manifest.chunksKeys.head).get, -// createRemote(port = 3234) -// ) -// -// result.isRight shouldBe true -// result.right.get._2.isEmpty shouldBe true -// } -// "validation error with incorrect chunk" in { -// val (_, processor, downloadController, _, manifest, _) = initializeTestState() -// -// val snapshotDownloadController = downloadController -// .copy(cp = Some(createRemote())) -// -// val result = snapshotDownloadController.processRequestedChunk( -// processor.getChunkById(manifest.chunksKeys.head).get.copy(id = null), -// createRemote() -// ) -// -// result.isLeft shouldBe true -// } -// "validation error with un-waited chunk" in { -// val (_, processor, downloadController, _, manifest, _) = initializeTestState() -// -// val snapshotDownloadController = downloadController -// .copy( -// cp = Some(createRemote()), -// requestedChunks = Set(ByteArrayWrapper(processor.getChunkById(manifest.chunksKeys.last).get.id.toByteArray)) -// ) -// -// val result = snapshotDownloadController.processRequestedChunk( -// processor.getChunkById(manifest.chunksKeys.head).get, -// createRemote() -// ) -// -// result.isLeft shouldBe true -// } -// "correctly process correct chunk" in { -// val (_, processor, downloadController, _, manifest, _) = initializeTestState() -// -// val snapshotDownloadController = downloadController -// .copy(cp = Some(createRemote()), -// requestedChunks = -// Set(ByteArrayWrapper(processor.getChunkById(manifest.chunksKeys.last).get.id.toByteArray))) -// -// val result = snapshotDownloadController.processRequestedChunk( -// processor.getChunkById(manifest.chunksKeys.last).get, -// createRemote() -// ) -// -// result.isRight shouldBe true -// result.right.get._1.requestedChunks.isEmpty shouldBe true -// result.right.get._2.nonEmpty shouldBe true -// result.right.get._2.forall { node => -// processor.getChunkById(manifest.chunksKeys.last).get.chunks.exists { n => -// NodeSerilalizer -// .fromProto[StorageKey, StorageValue](n) -// .hash -// .sameElements(NodeSerilalizer.fromProto[StorageKey, StorageValue](node).hash) -// } -// } shouldBe true -// } -// } -// "process manifest has changed function" should { -// "skip manifest if new is incorrect" in { -// val (_, _, downloadController, _, manifest, history) = initializeTestState() -// -// val res = downloadController.processManifestHasChangedMessage( -// Random.randomBytes(), -// SnapshotManifestSerializer.toProto(manifest), -// history, -// createRemote() -// ) -// -// res.isLeft shouldBe true -// } -// "process if all conditions are correct" in { -// val (_, _, downloadController, blocks, manifest, history) = initializeTestState() -// -// val prevManifestId = Random.randomBytes() -// val correctDownloadProcessor = downloadController.copy( -// requiredManifestId = prevManifestId, -// requiredManifestHeight = blocks.last.header.height -// ) -// -// val res = correctDownloadProcessor.processManifestHasChangedMessage( -// prevManifestId, -// SnapshotManifestSerializer.toProto(manifest), -// history, -// createRemote() -// ) -// -// res.isRight shouldBe true -// res.right.get._2.nonEmpty shouldBe true -// res.right.get._1.notYetRequested.forall { id => -// manifest.chunksKeys.exists(_.sameElements(id)) -// } shouldBe true -// } -// } -// "process next request chunks message function" should { -// "Request new chunks if it's needed" in { -// val ids = (0 to 10).map(_ => Random.randomBytes()).toList -// val snapshotDownloadController = SnapshotDownloadController -// .empty(settings) -// .copy(notYetRequested = ids) -// val result = snapshotDownloadController.processRequestChunksMessage -// result.isRight shouldBe true -// result.right.get._2.nonEmpty shouldBe true -// result.right.get._2.size == ids.size shouldBe true -// } -// "Set fast sync done correctly" in { -// val (_, _, downloadController, _, manifest, _) = initializeTestState() -// -// val snapshotDownloadController = downloadController -// .copy(requiredManifestId = manifest.manifestId, cp = Some(createRemote())) -// -// val result = snapshotDownloadController.processRequestChunksMessage -// result.isLeft shouldBe true -// result.left.get shouldBe true -// } -// "chunksIdsToDownload update needToBeRequested correctly" in {} -// "chunksIdsToDownload set awaitingResponse correctly" in {} -// } -// } + "process manifest has changed function" should { + + "process if all conditions are correct" in { + val (_, _, downloadController, blocks, manifest, history) = initializeTestState() + + val prevManifestId = Random.randomBytes() + val correctDownloadProcessor = downloadController.copy( + requiredManifestId = prevManifestId, + requiredManifestHeight = blocks.last.header.height + ) + + val res = correctDownloadProcessor.processManifestHasChangedMessage( + SnapshotManifestSerializer.toProto(manifest.copy(manifestId = prevManifestId)), + createRemote() + ) + + res.isRight shouldBe true + res.right.get.notYetRequested.forall { id => + manifest.chunksKeys.exists(_.sameElements(id)) + } shouldBe true + } + } + + } def createRemote(host: String = "111", port: Int = 999): ConnectedPeer = { val address = new InetSocketAddress(host, port) diff --git a/src/test/scala/encry/view/fast/sync/SnapshotProcessorTest.scala b/src/test/scala/encry/view/fast/sync/SnapshotProcessorTest.scala index 46c24f4be3..3520e3320c 100644 --- a/src/test/scala/encry/view/fast/sync/SnapshotProcessorTest.scala +++ b/src/test/scala/encry/view/fast/sync/SnapshotProcessorTest.scala @@ -1,22 +1,17 @@ package encry.view.fast.sync -import java.io.File import com.typesafe.scalalogging.StrictLogging -import encry.view.state.avlTree.utils.implicits.Instances._ import encry.modifiers.InstanceFactory import encry.settings.TestNetSettings -import encry.storage.VersionalStorage.{ StorageKey, StorageValue, StorageVersion } -import encry.storage.levelDb.versionalLevelDB.{ LevelDbFactory, VLDBWrapper, VersionalLevelDBCompanion } -import encry.utils.FileHelper +import encry.storage.VersionalStorage.{StorageKey, StorageValue} import encry.network.DeliveryManagerTests.DMUtils -import encry.view.state.UtxoState +import encry.view.fast.sync.FastSyncTestsUtils._ +import encry.view.fast.sync.SnapshotHolder.SnapshotChunk import encry.view.state.avlTree.AvlTree import org.encryfoundation.common.modifiers.history.Block import org.encryfoundation.common.utils.Algos import org.encryfoundation.common.utils.TaggedTypes.Height -import org.iq80.leveldb.Options -import org.scalatest.{ Matchers, OneInstancePerTest, WordSpecLike } -import scorex.utils.Random +import org.scalatest.{Matchers, OneInstancePerTest, WordSpecLike} class SnapshotProcessorTest extends WordSpecLike @@ -26,215 +21,138 @@ class SnapshotProcessorTest with TestNetSettings with StrictLogging { -// "Snapshot processor" should { -// "process new snapshot function" should { -// "correctly create new snapshot if such doesn't exist" in { -// val snapshotProcessor: SnapshotProcessor = SnapshotProcessor.create(settings, tmpDir) -// val avl1: AvlTree[StorageKey, StorageValue] = -// createAvl("9gKDVmfsA6J4b78jDBx6JmS86Zph98NnjnUqTJBkW7zitQMReia", 0, 20) -// val avl2: AvlTree[StorageKey, StorageValue] = -// createAvl("9gKDVmfsA6J4b78jDBx6JmS86Zph98NnjnUqTJBkW7zitQMReia", 5, 25) -// val avl3: AvlTree[StorageKey, StorageValue] = -// createAvl("9gKDVmfsA6J4b78jDBx6JmS86Zph98NnjnUqTJBkW7zitQMReia", 10, 30) -// -// val block1: Block = generateGenesisBlock(Height @@ 1) -// val block2: Block = generateGenesisBlock(Height @@ 1) -// val block3: Block = generateGenesisBlock(Height @@ 1) -// -// val processor1 = -// snapshotProcessor.processNewSnapshot(UtxoState(avl1, Height @@ 0, settings.constants), block1).right.get -// -// val processor2 = -// processor1.processNewSnapshot(UtxoState(avl2, Height @@ 0, settings.constants), block2).right.get -// -// val processor3 = -// processor2.processNewSnapshot(UtxoState(avl3, Height @@ 0, settings.constants), block3).right.get -// -// val id1 = Algos.hash(avl1.rootHash ++ block1.id) -// val id2 = Algos.hash(avl2.rootHash ++ block2.id) -// val id3 = Algos.hash(avl3.rootHash ++ block3.id) -// -// val listIds = id1 :: id2 :: id3 :: Nil -// -// processor3.potentialManifestsIds.forall { id => -// listIds.exists(_.sameElements(id)) -// } shouldBe true -// -// listIds.forall { id => -// processor3.potentialManifestsIds.exists(_.sameElements(id)) -// } shouldBe true -// -// val ids1 = processor3.manifestById(StorageKey @@ id1).get.chunksKeys -// val ids2 = processor3.manifestById(StorageKey @@ id2).get.chunksKeys -// val ids3 = processor3.manifestById(StorageKey @@ id3).get.chunksKeys -// -// ids1.forall { id => -// processor3.getChunkById(id).nonEmpty -// } shouldBe true -// -// ids2.forall { id => -// processor3.getChunkById(id).nonEmpty -// } shouldBe true -// -// ids3.forall { id => -// processor3.getChunkById(id).nonEmpty -// } shouldBe true -// -// } -// "skip snapshot creation if such already exists" in { -// val snapshotProcessor: SnapshotProcessor = SnapshotProcessor.create(settings, tmpDir) -// val avl1: AvlTree[StorageKey, StorageValue] = -// createAvl("9gKDVmfsA6J4b78jDBx6JmS86Zph98NnjnUqTJBkW7zitQMReia", 0, 20) -// val avl2: AvlTree[StorageKey, StorageValue] = -// createAvl("9gKDVmfsA6J4b78jDBx6JmS86Zph98NnjnUqTJBkW7zitQMReia", 5, 25) -// -// val block1: Block = generateGenesisBlock(Height @@ 1) -// val block2: Block = generateGenesisBlock(Height @@ 1) -// -// val processor1 = -// snapshotProcessor.processNewSnapshot(UtxoState(avl1, Height @@ 0, settings.constants), block1).right.get -// -// val processor2 = -// processor1.processNewSnapshot(UtxoState(avl2, Height @@ 0, settings.constants), block2).right.get -// -// val processor3 = -// processor2.processNewSnapshot(UtxoState(avl1, Height @@ 0, settings.constants), block1).right.get -// -// val id1 = Algos.hash(avl1.rootHash ++ block1.id) -// val id2 = Algos.hash(avl2.rootHash ++ block2.id) -// -// processor3.potentialManifestsIds.size == 2 shouldBe true -// -// processor3.potentialManifestsIds.forall { id => -// id.sameElements(id1) || id.sameElements(id2) -// } shouldBe true -// } -// } -// "process new block function" should { -// "process block correctly if condition == 0" in { -// val sn = settings.copy(snapshotSettings = settings.snapshotSettings.copy(newSnapshotCreationHeight = 4), -// levelDB = settings.levelDB.copy(maxVersions = 1)) -// val snapshotProcessor: SnapshotProcessor = SnapshotProcessor.create(sn, tmpDir) -// val avl1: AvlTree[StorageKey, StorageValue] = -// createAvl("9gKDVmfsA6J4b78jDBx6JmS86Zph98NnjnUqTJBkW7zitQMReia", 0, 20) -// val avl2: AvlTree[StorageKey, StorageValue] = -// createAvl("9gKDVmfsA6J4b78jDBx6JmS86Zph98NnjnUqTJBkW7zitQMReia", 5, 25) -// -// val block1: Block = generateGenesisBlock(Height @@ 1) -// val block2: Block = generateGenesisBlock(Height @@ (4 + sn.levelDB.maxVersions)) -// -// val history = generateDummyHistory(settings) -// val (_, blocks) = DMUtils.generateBlocks(5, history) -// val newBlocks = blocks.map(b => Block(b.header.copy(stateRoot = avl2.rootHash), b.payload)) -// -// logger.info(s"${newBlocks.map(_.encodedId)}") -// -// val history1 = generateDummyHistory(settings) -// val historyNew1 = newBlocks.foldLeft(history1) { -// case (history, block) => -// history.append(block.header) -// history.append(block.payload) -// history.reportModifierIsValid(block) -// } -// -// val processor1 = -// snapshotProcessor.processNewSnapshot(UtxoState(avl1, Height @@ 0, settings.constants), block1).right.get -// -// val processor2 = -// processor1.processNewSnapshot(UtxoState(avl2, Height @@ 0, settings.constants), newBlocks.last).right.get -// -// val id1 = Algos.hash(avl1.rootHash ++ block1.id) -// val ids1 = processor2.manifestById(StorageKey @@ id1).get.chunksKeys -// -// val processor3 = -// processor2.processNewBlock(block2, historyNew1).right.get -// -// val id2 = Algos.hash(avl2.rootHash ++ newBlocks.last.id) -// -// processor3.potentialManifestsIds.isEmpty shouldBe true -// -// logger.info(s"${Algos.encode(id2)}") -// -// processor3.manifestById(StorageKey @@ id1).isEmpty shouldBe true -// processor3.manifestById(StorageKey @@ id2).nonEmpty shouldBe true -// -// val ids2 = processor3.manifestById(StorageKey @@ id2).get.chunksKeys -// -// ids2.forall { id => -// processor3.getChunkById(id).nonEmpty -// } shouldBe true -// -// val ids11 = ids1.filterNot(l => ids2.exists(_.sameElements(l))) -// -// ids11.forall { id => -// processor3.getChunkById(id).isEmpty -// } shouldBe true -// } -// "skip block if condition != 0" in { -// val sn = settings.copy(snapshotSettings = settings.snapshotSettings.copy(newSnapshotCreationHeight = 50), -// levelDB = settings.levelDB.copy(maxVersions = 1)) -// val snapshotProcessor: SnapshotProcessor = SnapshotProcessor.create(sn, tmpDir) -// val avl1: AvlTree[StorageKey, StorageValue] = -// createAvl("9gKDVmfsA6J4b78jDBx6JmS86Zph98NnjnUqTJBkW7zitQMReia", 0, 20) -// val avl2: AvlTree[StorageKey, StorageValue] = -// createAvl("9gKDVmfsA6J4b78jDBx6JmS86Zph98NnjnUqTJBkW7zitQMReia", 5, 25) -// -// val block1: Block = generateGenesisBlock(Height @@ 1) -// val block2: Block = generateGenesisBlock(Height @@ (4 + sn.levelDB.maxVersions)) -// -// val history = generateDummyHistory(settings) -// val (_, blocks) = DMUtils.generateBlocks(5, history) -// val newBlocks = blocks.map(b => Block(b.header.copy(stateRoot = avl2.rootHash), b.payload)) -// -// logger.info(s"${newBlocks.map(_.encodedId)}") -// -// val history1 = generateDummyHistory(settings) -// val historyNew1 = newBlocks.foldLeft(history1) { -// case (history, block) => -// history.append(block.header) -// history.append(block.payload) -// history.reportModifierIsValid(block) -// } -// -// val processor1 = -// snapshotProcessor.processNewSnapshot(UtxoState(avl1, Height @@ 0, settings.constants), block1).right.get -// -// val processor2 = -// processor1.processNewSnapshot(UtxoState(avl2, Height @@ 0, settings.constants), newBlocks.last).right.get -// -// val processor3 = -// processor2.processNewBlock(block2, historyNew1).right.get -// -// val id1 = Algos.hash(avl1.rootHash ++ block1.id) -// val id2 = Algos.hash(avl2.rootHash ++ newBlocks.last.id) -// -// processor3.potentialManifestsIds.size == 2 shouldBe true -// -// processor3.manifestById(StorageKey @@ id1).nonEmpty shouldBe true -// processor3.manifestById(StorageKey @@ id2).nonEmpty shouldBe true -// -// } -// } -// } - - def createAvl(address: String, from: Int, to: Int): AvlTree[StorageKey, StorageValue] = { - val firstDir: File = FileHelper.getRandomTempDir - val firstStorage: VLDBWrapper = { - val levelDBInit = LevelDbFactory.factory.open(firstDir, new Options) - VLDBWrapper(VersionalLevelDBCompanion(levelDBInit, settings.levelDB, keySize = 32)) + "Snapshot processor" should { + + "process new snapshot function" should { + + "correctly create new snapshot if such doesn't exist" in { + + val (avl1, processor1, _, blocks1, _, _) = initializeTestState() + val (avl2, processor2, _, blocks2, _, _) = initializeTestState() + val (avl3, processor3, _, blocks3, _, _) = initializeTestState() + + val id1 = Algos.hash(avl1.rootHash ++ blocks1(89).id) + val id2 = Algos.hash(avl2.rootHash ++ blocks2(89).id) + val id3 = Algos.hash(avl3.rootHash ++ blocks3(89).id) + + val listIds = id1 :: id2 :: id3 :: Nil + + processor3.potentialManifestsIds.forall { id => + listIds.exists(_.sameElements(id)) + } shouldBe true + + val ids1 = processor1.manifestById(StorageKey @@ id1).get.chunksKeys + val ids2 = processor2.manifestById(StorageKey @@ id2).get.chunksKeys + val ids3 = processor3.manifestById(StorageKey @@ id3).get.chunksKeys + + ids1.forall { id => + processor1.getChunkById(id).nonEmpty + } shouldBe true + + ids2.forall { id => + processor2.getChunkById(id).nonEmpty + } shouldBe true + + ids3.forall { id => + processor3.getChunkById(id).nonEmpty + } shouldBe true + + } + + "skip snapshot creation if such already exists" in { + val snapshotProcessor: SnapshotProcessor = SnapshotProcessor.create(settings, tmpDir) + val avl1: AvlTree[StorageKey, StorageValue] = + createAvl("9gKDVmfsA6J4b78jDBx6JmS86Zph98NnjnUqTJBkW7zitQMReia", 0, 20) + val avl2: AvlTree[StorageKey, StorageValue] = + createAvl("9gKDVmfsA6J4b78jDBx6JmS86Zph98NnjnUqTJBkW7zitQMReia", 5, 25) + + val block1: Block = generateGenesisBlock(Height @@ 1) + val block2: Block = generateGenesisBlock(Height @@ 1) + + val processor1 = snapshotProcessor + .createNewSnapshot(Algos.hash(avl1.rootHash ++ block1.header.id), snapshotProcessor.potentialManifestsIds, List(SnapshotChunk(avl1.rootNode, avl1.rootHash))).right.get + + val processor2 = processor1 + .createNewSnapshot(Algos.hash(avl2.rootHash ++ block2.header.id), processor1.potentialManifestsIds, List(SnapshotChunk(avl2.rootNode, avl2.rootHash))).right.get + + val processor3 = processor2 + .createNewSnapshot(Algos.hash(avl1.rootHash ++ block1.header.id), processor2.potentialManifestsIds, List(SnapshotChunk(avl1.rootNode, avl1.rootHash))).right.get + + val id1 = Algos.hash(avl1.rootHash ++ block1.id) + val id2 = Algos.hash(avl2.rootHash ++ block2.id) + + processor3.potentialManifestsIds.toSet.map(Algos.encode).size == 2 shouldBe true + + processor3.potentialManifestsIds.forall { id => + id.sameElements(id1) || id.sameElements(id2) + } shouldBe true + } + } + + "process new block function" should { + + "process block correctly" in { + + val sn = settings.copy(snapshotSettings = settings.snapshotSettings.copy(newSnapshotCreationHeight = 4), + levelDB = settings.levelDB.copy(maxVersions = 1)) + val snapshotProcessor: SnapshotProcessor = SnapshotProcessor.create(sn, tmpDir) + val avl1: AvlTree[StorageKey, StorageValue] = + createAvl("9gKDVmfsA6J4b78jDBx6JmS86Zph98NnjnUqTJBkW7zitQMReia", 0, 20) + val avl2: AvlTree[StorageKey, StorageValue] = + createAvl("9gKDVmfsA6J4b78jDBx6JmS86Zph98NnjnUqTJBkW7zitQMReia", 5, 25) + + val block1: Block = generateGenesisBlock(Height @@ 1) + val block2: Block = generateGenesisBlock(Height @@ (4 + sn.levelDB.maxVersions)) + + val history = generateDummyHistory(settings) + val (_, blocks) = DMUtils.generateBlocks(5, history) + val newBlocks = blocks.map(b => Block(b.header.copy(stateRoot = avl2.rootHash), b.payload)) + + logger.info(s"${newBlocks.map(_.encodedId)}") + + val history1 = generateDummyHistory(settings) + val historyNew1 = newBlocks.foldLeft(history1) { + case (history, block) => + history.append(block.header) + history.append(block.payload) + history.reportModifierIsValid(block) + } + + val processor1 = snapshotProcessor + .createNewSnapshot(Algos.hash(avl1.rootHash ++ block1.header.id), snapshotProcessor.potentialManifestsIds, List(SnapshotChunk(avl1.rootNode, avl1.rootHash))).right.get + + val processor2 = processor1 + .createNewSnapshot(Algos.hash(avl2.rootHash ++ newBlocks.last.id), snapshotProcessor.potentialManifestsIds, List(SnapshotChunk(avl2.rootNode, avl2.rootHash))).right.get + + val id1 = Algos.hash(avl1.rootHash ++ block1.id) + val ids1 = processor2.manifestById(StorageKey @@ id1).get.chunksKeys + + val processor3 = + processor2.processNewBlock(block2, historyNew1).right.get + + val id2 = Algos.hash(avl2.rootHash ++ newBlocks.last.id) + + processor3.potentialManifestsIds.isEmpty shouldBe true + + logger.info(s"${Algos.encode(id2)}") + + processor3.manifestById(StorageKey @@ id1).isEmpty shouldBe true + processor3.manifestById(StorageKey @@ id2).nonEmpty shouldBe true + + val ids2 = processor3.manifestById(StorageKey @@ id2).get.chunksKeys + + ids2.forall { id => + processor3.getChunkById(id).nonEmpty + } shouldBe true + + val ids11 = ids1.filterNot(l => ids2.exists(_.sameElements(l))) + + ids11.forall { id => + processor3.getChunkById(id).isEmpty + } shouldBe true + } + } - val boxes: IndexedSeq[(StorageKey, StorageValue)] = (from to to) - .map(i => genAssetBox(address, i, nonce = i)) - .map(bx => (StorageKey !@@ bx.id, StorageValue @@ bx.bytes)) - - val firstAvl: AvlTree[StorageKey, StorageValue] = AvlTree[StorageKey, StorageValue](firstStorage) - firstAvl - .insertAndDeleteMany( - StorageVersion @@ Random.randomBytes(), - boxes.toList, - List.empty - ) } - def tmpDir: File = FileHelper.getRandomTempDir } From 3348c08538e57e1fc67f181e19a552dfe9d5f49e Mon Sep 17 00:00:00 2001 From: GRYE Date: Fri, 15 Nov 2019 14:32:53 +0300 Subject: [PATCH 2/5] Additional fast sync tests --- .../encry/view/fast/sync/SnapshotHolder.scala | 21 ++- .../view/fast/sync/SnapshotProcessor.scala | 42 +++-- .../encry/view/state/avlTree/AvlTree.scala | 4 + .../view/fast/sync/FastSyncTestsUtils.scala | 44 +++-- .../fast/sync/SnapshotProcessorTest.scala | 37 ++-- .../fast/sync/SubtreesAssemblerTest.scala | 174 +++++++----------- 6 files changed, 153 insertions(+), 169 deletions(-) diff --git a/src/main/scala/encry/view/fast/sync/SnapshotHolder.scala b/src/main/scala/encry/view/fast/sync/SnapshotHolder.scala index 643c2b7c1a..364eb05cee 100644 --- a/src/main/scala/encry/view/fast/sync/SnapshotHolder.scala +++ b/src/main/scala/encry/view/fast/sync/SnapshotHolder.scala @@ -124,16 +124,18 @@ class SnapshotHolder(settings: EncryAppSettings, (controller, chunk) = controllerAndChunk validChunk <- snapshotProcessor.validateChunkId(chunk) processor = snapshotProcessor.updateCache(validChunk) - newProcessor <- processor.processNextApplicableChunk(processor).leftFlatMap { - case e: ApplicableChunkIsAbsent => e.processor.asRight[FastSyncException] - case t => t.asLeft[SnapshotProcessor] - } + newProcessor <- processor.processNextApplicableChunk(processor).leftFlatMap { + case e: ApplicableChunkIsAbsent => e.processor.asRight[FastSyncException] + case t => t.asLeft[SnapshotProcessor] + } } yield (newProcessor, controller)) match { case Left(error) => nodeViewSynchronizer ! BanPeer(remote, InvalidChunkMessage(error.error)) restartFastSync(history) case Right((processor, controller)) - if controller.requestedChunks.isEmpty && controller.notYetRequested.isEmpty && processor.chunksCache.nonEmpty => + if controller.requestedChunks.isEmpty && + controller.notYetRequested.isEmpty && + (processor.chunksCache.nonEmpty || processor.applicableChunks.nonEmpty) => nodeViewSynchronizer ! BanPeer(remote, InvalidChunkMessage("For request is empty, buffer is nonEmpty")) restartFastSync(history) case Right((processor, controller)) @@ -233,10 +235,11 @@ class SnapshotHolder(settings: EncryAppSettings, def workMod(history: History): Receive = { case TreeChunks(chunks, id) => //todo add collection with potentialManifestsIds to NVH - val manifestIds: Seq[Array[Byte]] = snapshotProcessor.potentialManifestsIds - if (!manifestIds.exists(_.sameElements(id))) { - snapshotProcessor.createNewSnapshot(id, manifestIds, chunks) - } else logger.info(s"Doesn't need to create snapshot") + + snapshotProcessor.createNewSnapshot(id, chunks).fold( err => + logger.warn(s"Failed to create new snapshot due to ${err.error}"), + newProc => snapshotProcessor = newProc + ) case SemanticallySuccessfulModifier(block: Block) if history.isFullChainSynced => logger.info(s"Snapshot holder got semantically successful modifier message. Started processing it.") diff --git a/src/main/scala/encry/view/fast/sync/SnapshotProcessor.scala b/src/main/scala/encry/view/fast/sync/SnapshotProcessor.scala index 07faac76c5..8e6fbb6c04 100644 --- a/src/main/scala/encry/view/fast/sync/SnapshotProcessor.scala +++ b/src/main/scala/encry/view/fast/sync/SnapshotProcessor.scala @@ -193,28 +193,30 @@ final case class SnapshotProcessor(settings: EncryAppSettings, def createNewSnapshot( id: Array[Byte], - manifestIds: Seq[Array[Byte]], newChunks: List[SnapshotChunk] - ): Either[ProcessNewSnapshotError, SnapshotProcessor] = { - //todo add only exists chunks - val manifest: SnapshotManifest = SnapshotManifest(id, newChunks.map(_.id)) - val snapshotToDB: List[(StorageKey, StorageValue)] = newChunks.map { elem => - val bytes: Array[Byte] = SnapshotChunkSerializer.toProto(elem).toByteArray - StorageKey @@ elem.id -> StorageValue @@ bytes - } - val manifestToDB: (StorageKey, StorageValue) = - StorageKey @@ manifest.manifestId -> StorageValue @@ SnapshotManifestSerializer - .toProto(manifest) - .toByteArray - val updateList: (StorageKey, StorageValue) = - PotentialManifestsIdsKey -> StorageValue @@ (manifest.manifestId :: manifestIds.toList).flatten.toArray - val toApply: List[(StorageKey, StorageValue)] = manifestToDB :: updateList :: snapshotToDB - logger.info(s"A new snapshot created successfully. Insertion started.") - Either.catchNonFatal(storage.insert(StorageVersion @@ Random.randomBytes(), toApply, List.empty)) match { - case Left(value) => ProcessNewSnapshotError(value.getMessage).asLeft[SnapshotProcessor] - case Right(_) => this.asRight[ProcessNewSnapshotError] + ): Either[ProcessNewSnapshotError, SnapshotProcessor] = + if (potentialManifestsIds.exists(_.sameElements(id))) + ProcessNewSnapshotError(s"Potential manifest with id ${Algos.encode(id)} already exists").asLeft[SnapshotProcessor] + else { + //todo add only exists chunks + val manifest: SnapshotManifest = SnapshotManifest(id, newChunks.map(_.id)) + val snapshotToDB: List[(StorageKey, StorageValue)] = newChunks.map { elem => + val bytes: Array[Byte] = SnapshotChunkSerializer.toProto(elem).toByteArray + StorageKey @@ elem.id -> StorageValue @@ bytes + } + val manifestToDB: (StorageKey, StorageValue) = + StorageKey @@ manifest.manifestId -> StorageValue @@ SnapshotManifestSerializer + .toProto(manifest) + .toByteArray + val updateList: (StorageKey, StorageValue) = + PotentialManifestsIdsKey -> StorageValue @@ (manifest.manifestId :: potentialManifestsIds.toList).flatten.toArray + val toApply: List[(StorageKey, StorageValue)] = manifestToDB :: updateList :: snapshotToDB + logger.info(s"A new snapshot created successfully. Insertion started.") + Either.catchNonFatal(storage.insert(StorageVersion @@ Random.randomBytes(), toApply, List.empty)) match { + case Left(value) => ProcessNewSnapshotError(value.getMessage).asLeft[SnapshotProcessor] + case Right(_) => this.asRight[ProcessNewSnapshotError] + } } - } private def updateActualSnapshot(history: History, height: Int): Either[ProcessNewBlockError, SnapshotProcessor] = for { diff --git a/src/main/scala/encry/view/state/avlTree/AvlTree.scala b/src/main/scala/encry/view/state/avlTree/AvlTree.scala index 2162cf4236..73ef2f7b98 100644 --- a/src/main/scala/encry/view/state/avlTree/AvlTree.scala +++ b/src/main/scala/encry/view/state/avlTree/AvlTree.scala @@ -499,6 +499,7 @@ final case class AvlTree[K : Hashable : Order, V] (rootNode: Node[K, V], storage } def selfInspectionAfterFastSync(implicit kSer: Serializer[K]): Boolean = { + @scala.annotation.tailrec def loop(nodesToProcess: List[Node[K, V]], keysToInspect: List[Array[Byte]]): List[Array[Byte]] = if (nodesToProcess.nonEmpty) nodesToProcess.head match { @@ -521,10 +522,13 @@ final case class AvlTree[K : Hashable : Order, V] (rootNode: Node[K, V], storage loop(updatedNodeToProcess ::: next, Algos.hash(kSer.toBytes(i.key).reverse) :: i.hash :: current ::: keysToInspect) case l: LeafNode[K, V] => loop(nodesToProcess.drop(1), Algos.hash(kSer.toBytes(l.key).reverse) :: l.hash :: keysToInspect) } else keysToInspect + val keys: Set[ByteArrayWrapper] = loop(List(rootNode), List.empty).map(ByteArrayWrapper(_)).toSet + val allKeysFromDB: Set[ByteArrayWrapper] = storage.getAllKeys(-1) .map(ByteArrayWrapper(_)).toSet - ByteArrayWrapper(UtxoState.bestHeightKey) - ByteArrayWrapper(AvlTree.rootNodeKey) logger.debug(s"${keys.map(l => Algos.encode(l.data))}") + (allKeysFromDB -- keys).isEmpty } diff --git a/src/test/scala/encry/view/fast/sync/FastSyncTestsUtils.scala b/src/test/scala/encry/view/fast/sync/FastSyncTestsUtils.scala index 6d4a0cafb8..ca280e7562 100644 --- a/src/test/scala/encry/view/fast/sync/FastSyncTestsUtils.scala +++ b/src/test/scala/encry/view/fast/sync/FastSyncTestsUtils.scala @@ -10,6 +10,7 @@ import encry.storage.levelDb.versionalLevelDB.{LevelDbFactory, VLDBWrapper, Vers import encry.utils.FileHelper import encry.view.fast.sync.SnapshotHolder.{SnapshotChunk, SnapshotManifest} import encry.view.history.History +import encry.view.state.UtxoState import encry.view.state.avlTree.AvlTree import org.iq80.leveldb.Options import scorex.utils.Random @@ -17,6 +18,7 @@ import encry.view.state.avlTree.utils.implicits.Instances._ import org.encryfoundation.common.modifiers.history.Block import org.encryfoundation.common.modifiers.mempool.transaction.EncryAddress.Address import org.encryfoundation.common.utils.Algos +import org.encryfoundation.common.utils.TaggedTypes.Height object FastSyncTestsUtils extends InstanceFactory with TestNetSettings { @@ -33,26 +35,46 @@ object FastSyncTestsUtils extends InstanceFactory with TestNetSettings { val (history, blocks, tree) = DMUtils.generateBlocksWithTree(initialBlocksQty, generateDummyHistory(settings), None, addressOpt) val (_, additionalBlocks, _) = DMUtils.generateBlocksWithTree(additionalBlocksQty, history, Some(tree), addressOpt) - val chunk = SnapshotChunk(tree.rootNode, tree.rootHash) + val sn = settings + .copy( + levelDB = settings.levelDB.copy(maxVersions = 5), + snapshotSettings = settings.snapshotSettings.copy(newSnapshotCreationHeight = 10) + ) val snapshotProcessor: SnapshotProcessor = SnapshotProcessor - .create(settings.copy(levelDB = settings.levelDB.copy(maxVersions = 10)), tmpDir) + .create(sn, tmpDir) .initializeApplicableChunksCache(history, history.getBestBlockHeight).right.get - .updateCache(chunk) - val mans = snapshotProcessor.potentialManifestsIds - val sp = snapshotProcessor.createNewSnapshot( - Algos.hash(blocks.last.header.stateRoot ++ blocks.last.header.id), - mans, - List(chunk) - ).right.get + val emptyTree = AvlTree[StorageKey, StorageValue] { + VLDBWrapper(VersionalLevelDBCompanion(LevelDbFactory.factory.open(tmpDir, new Options), settings.levelDB, keySize = 32)) + } + + val (newProc, newTree) = blocks.foldLeft((snapshotProcessor, emptyTree)) { case ((proc, currentTree), block) => + import encry.utils.implicits.UTXO._ + + val combinedStateChange: UtxoState.StateChange = combineAll(block.payload.txs.map(UtxoState.tx2StateChange).toList) - val newProc = sp.processNewBlock(additionalBlocks.last, history).right.get + val resultingTree = currentTree.insertAndDeleteMany( + StorageVersion !@@ block.id, + combinedStateChange.outputsToDb.toList, + combinedStateChange.inputsToDb.toList, + Height @@ block.header.height + ) + + if ((block.header.height - sn.levelDB.maxVersions) % sn.snapshotSettings.newSnapshotCreationHeight == 0) { + (proc.processNewBlock(block, history).right.get, resultingTree) + } else if (block.header.height % sn.snapshotSettings.newSnapshotCreationHeight == 0) { + val chunks: List[SnapshotChunk] = + AvlTree.getChunks(resultingTree.rootNode, currentChunkHeight = 1, resultingTree.storage) + val potentialManifestId: Array[Byte] = Algos.hash(resultingTree.rootHash ++ block.id) + (snapshotProcessor.createNewSnapshot(potentialManifestId, chunks).right.get, resultingTree) + } else (proc, resultingTree) + } val snapshotDownloadController = SnapshotDownloadController.empty(settings) - (tree, + (newTree, newProc, snapshotDownloadController, blocks ++ additionalBlocks, diff --git a/src/test/scala/encry/view/fast/sync/SnapshotProcessorTest.scala b/src/test/scala/encry/view/fast/sync/SnapshotProcessorTest.scala index 3520e3320c..c5fd4ea772 100644 --- a/src/test/scala/encry/view/fast/sync/SnapshotProcessorTest.scala +++ b/src/test/scala/encry/view/fast/sync/SnapshotProcessorTest.scala @@ -27,33 +27,25 @@ class SnapshotProcessorTest "correctly create new snapshot if such doesn't exist" in { - val (avl1, processor1, _, blocks1, _, _) = initializeTestState() - val (avl2, processor2, _, blocks2, _, _) = initializeTestState() - val (avl3, processor3, _, blocks3, _, _) = initializeTestState() + val (_, processor1, _, _, _, _) = initializeTestState() + val (_, processor2, _, _, _, _) = initializeTestState() + val (_, processor3, _, _, _, _) = initializeTestState() - val id1 = Algos.hash(avl1.rootHash ++ blocks1(89).id) - val id2 = Algos.hash(avl2.rootHash ++ blocks2(89).id) - val id3 = Algos.hash(avl3.rootHash ++ blocks3(89).id) - - val listIds = id1 :: id2 :: id3 :: Nil + val listIds = processor1.actualManifestId.get :: processor2.actualManifestId.get :: processor3.actualManifestId.get :: Nil processor3.potentialManifestsIds.forall { id => listIds.exists(_.sameElements(id)) } shouldBe true - val ids1 = processor1.manifestById(StorageKey @@ id1).get.chunksKeys - val ids2 = processor2.manifestById(StorageKey @@ id2).get.chunksKeys - val ids3 = processor3.manifestById(StorageKey @@ id3).get.chunksKeys - - ids1.forall { id => + processor1.actualManifest.get.chunksKeys.forall { id => processor1.getChunkById(id).nonEmpty } shouldBe true - ids2.forall { id => + processor2.actualManifest.get.chunksKeys.forall { id => processor2.getChunkById(id).nonEmpty } shouldBe true - ids3.forall { id => + processor3.actualManifest.get.chunksKeys.forall { id => processor3.getChunkById(id).nonEmpty } shouldBe true @@ -70,20 +62,19 @@ class SnapshotProcessorTest val block2: Block = generateGenesisBlock(Height @@ 1) val processor1 = snapshotProcessor - .createNewSnapshot(Algos.hash(avl1.rootHash ++ block1.header.id), snapshotProcessor.potentialManifestsIds, List(SnapshotChunk(avl1.rootNode, avl1.rootHash))).right.get + .createNewSnapshot(Algos.hash(avl1.rootHash ++ block1.header.id), List(SnapshotChunk(avl1.rootNode, avl1.rootHash))).right.get val processor2 = processor1 - .createNewSnapshot(Algos.hash(avl2.rootHash ++ block2.header.id), processor1.potentialManifestsIds, List(SnapshotChunk(avl2.rootNode, avl2.rootHash))).right.get + .createNewSnapshot(Algos.hash(avl2.rootHash ++ block2.header.id), List(SnapshotChunk(avl2.rootNode, avl2.rootHash))).right.get - val processor3 = processor2 - .createNewSnapshot(Algos.hash(avl1.rootHash ++ block1.header.id), processor2.potentialManifestsIds, List(SnapshotChunk(avl1.rootNode, avl1.rootHash))).right.get + processor2.createNewSnapshot(Algos.hash(avl1.rootHash ++ block1.header.id), List(SnapshotChunk(avl1.rootNode, avl1.rootHash))).isLeft shouldBe true val id1 = Algos.hash(avl1.rootHash ++ block1.id) val id2 = Algos.hash(avl2.rootHash ++ block2.id) - processor3.potentialManifestsIds.toSet.map(Algos.encode).size == 2 shouldBe true + processor2.potentialManifestsIds.size == 2 shouldBe true - processor3.potentialManifestsIds.forall { id => + processor2.potentialManifestsIds.forall { id => id.sameElements(id1) || id.sameElements(id2) } shouldBe true } @@ -119,10 +110,10 @@ class SnapshotProcessorTest } val processor1 = snapshotProcessor - .createNewSnapshot(Algos.hash(avl1.rootHash ++ block1.header.id), snapshotProcessor.potentialManifestsIds, List(SnapshotChunk(avl1.rootNode, avl1.rootHash))).right.get + .createNewSnapshot(Algos.hash(avl1.rootHash ++ block1.header.id), List(SnapshotChunk(avl1.rootNode, avl1.rootHash))).right.get val processor2 = processor1 - .createNewSnapshot(Algos.hash(avl2.rootHash ++ newBlocks.last.id), snapshotProcessor.potentialManifestsIds, List(SnapshotChunk(avl2.rootNode, avl2.rootHash))).right.get + .createNewSnapshot(Algos.hash(avl2.rootHash ++ newBlocks.last.id), List(SnapshotChunk(avl2.rootNode, avl2.rootHash))).right.get val id1 = Algos.hash(avl1.rootHash ++ block1.id) val ids1 = processor2.manifestById(StorageKey @@ id1).get.chunksKeys diff --git a/src/test/scala/encry/view/fast/sync/SubtreesAssemblerTest.scala b/src/test/scala/encry/view/fast/sync/SubtreesAssemblerTest.scala index 1731e872bc..486f99085b 100644 --- a/src/test/scala/encry/view/fast/sync/SubtreesAssemblerTest.scala +++ b/src/test/scala/encry/view/fast/sync/SubtreesAssemblerTest.scala @@ -1,24 +1,17 @@ package encry.view.fast.sync -import java.io.File -import SnapshotChunkProto.SnapshotChunkMessage import com.typesafe.scalalogging.StrictLogging import encry.modifiers.InstanceFactory import encry.settings.TestNetSettings -import encry.storage.VersionalStorage.{StorageKey, StorageValue, StorageVersion} -import encry.storage.levelDb.versionalLevelDB.{LevelDbFactory, VLDBWrapper, VersionalLevelDBCompanion} -import encry.utils.FileHelper import SnapshotHolder.SnapshotChunkSerializer -import encry.view.state.UtxoState -import encry.view.state.avlTree.{AvlTree, InternalNode, NodeSerilalizer } -import org.iq80.leveldb.Options +import encry.view.state.avlTree.InternalNode import org.scalatest.{Matchers, OneInstancePerTest, WordSpecLike} import encry.view.state.avlTree.utils.implicits.Instances._ -import org.encryfoundation.common.modifiers.history.Block -import org.encryfoundation.common.utils.TaggedTypes.Height -import scorex.utils.Random -import scala.util.{ Random => random } + import cats.syntax.either._ +import scala.util.{ Random => random } +import encry.view.fast.sync.FastSyncExceptions.{ApplicableChunkIsAbsent, FastSyncException} +import encry.view.fast.sync.FastSyncTestsUtils._ class SubtreesAssemblerTest extends WordSpecLike @@ -28,101 +21,70 @@ class SubtreesAssemblerTest with TestNetSettings with StrictLogging { -// "Trees validator" should { -// "check tree assembly(with correct nodes)" in { -// val snapshotProcessor: SnapshotProcessor = SnapshotProcessor.create(settings, tmpDir) -// val avl1: AvlTree[StorageKey, StorageValue] = -// createAvl("9gKDVmfsA6J4b78jDBx6JmS86Zph98NnjnUqTJBkW7zitQMReia", 0, 1000) -// val block1: Block = generateGenesisBlock(Height @@ 1) -// -// val processor1: SnapshotProcessor = -// snapshotProcessor.processNewSnapshot(UtxoState(avl1, Height @@ 0, settings.constants), block1) -// val manifest: SnapshotHolder.SnapshotManifest = processor1.bestPotentialManifest.get -// -// val newTree: AvlTree[StorageKey, StorageValue] = AvlTree[StorageKey, StorageValue]( -// manifest, -// VLDBWrapper( -// VersionalLevelDBCompanion(LevelDbFactory.factory.open(tmpDir, new Options), settings.levelDB, keySize = 32) -// ) -// ) -// -// val newTree1: AvlTree[StorageKey, StorageValue] = manifest.chunksKeys.foldLeft(newTree) { -// case (tree, key) => -// val chunk: SnapshotChunkMessage = processor1.getChunkById(key).get -// val nodes: SnapshotHolder.SnapshotChunk = SnapshotChunkSerializer.fromProto(chunk).get -// val treeNew: AvlTree[StorageKey, StorageValue] = -// tree.assembleTree(nodes.nodesList.map { n => -// val a = NodeSerilalizer.fromProto[StorageKey, StorageValue](n) -// a -// }) -// treeNew -// } -// -// val state = UtxoState(newTree1, Height @@ 0, settings.constants) -// -// state.validateTreeAfterFastSync() shouldBe true -// } -// "check tree assembly(with incorrect nodes)" in { -// val snapshotProcessor: SnapshotProcessor = SnapshotProcessor.create(settings, tmpDir) -// val avl1: AvlTree[StorageKey, StorageValue] = -// createAvl("9gKDVmfsA6J4b78jDBx6JmS86Zph98NnjnUqTJBkW7zitQMReia", 0, 1000) -// val block1: Block = generateGenesisBlock(Height @@ 1) -// -// val processor1: SnapshotProcessor = -// snapshotProcessor.processNewSnapshot(UtxoState(avl1, Height @@ 0, settings.constants), block1) -// val manifest: SnapshotHolder.SnapshotManifest = processor1.bestPotentialManifest.get -// -// val newTree: AvlTree[StorageKey, StorageValue] = AvlTree[StorageKey, StorageValue]( -// manifest, -// VLDBWrapper( -// VersionalLevelDBCompanion(LevelDbFactory.factory.open(tmpDir, new Options), settings.levelDB, keySize = 32) -// ) -// ) -// -// val newTree1: AvlTree[StorageKey, StorageValue] = manifest.chunksKeys.foldLeft(newTree) { -// case (tree, key) => -// val chunk: SnapshotChunkMessage = processor1.getChunkById(key).get -// val nodes: SnapshotHolder.SnapshotChunk = SnapshotChunkSerializer.fromProto(chunk).get -// val treeNew: AvlTree[StorageKey, StorageValue] = -// tree.assembleTree(nodes.nodesList.map { n => -// val a = NodeSerilalizer.fromProto[StorageKey, StorageValue](n) -// a match { -// case l@InternalNode(_, _, _, _, _, _) => -// l.copy(height = random.nextInt()) -// case l => l -// } -// }) -// treeNew -// } -// -// val state = UtxoState(newTree1, Height @@ 0, settings.constants) -// -// Either.catchNonFatal(state.validateTreeAfterFastSync()).isLeft shouldBe true -// } -// "check chunk validity" in {} -// } - - def createAvl(address: String, from: Int, to: Int): AvlTree[StorageKey, StorageValue] = { - val firstDir: File = FileHelper.getRandomTempDir - val firstStorage: VLDBWrapper = { - VLDBWrapper( - VersionalLevelDBCompanion(LevelDbFactory.factory.open(firstDir, new Options), settings.levelDB, keySize = 32) - ) + "Trees validator" should { + + "check tree assembly(with correct nodes)" in { + + val (_, processor, _, _, manifest, history) = initializeTestState() + + val sn = settings + .copy( + levelDB = settings.levelDB.copy(maxVersions = 5), + snapshotSettings = settings.snapshotSettings.copy(newSnapshotCreationHeight = 10) + ) + + val chunks = manifest + .chunksKeys + .flatMap(processor.getChunkById) + .flatMap(SnapshotChunkSerializer.fromProto(_).toOption) + + val secondProcessor: SnapshotProcessor = + SnapshotProcessor + .create(sn, tmpDir) + .initializeApplicableChunksCache(history, 80).right.get + + chunks.foldLeft(secondProcessor) { case (proc, nextChunk) => + val wUpdatedCache = proc.updateCache(nextChunk) + wUpdatedCache.processNextApplicableChunk(wUpdatedCache).leftFlatMap { + case e: ApplicableChunkIsAbsent => e.processor.asRight[FastSyncException] + case t => t.asLeft[SnapshotProcessor] + }.right.get + }.assembleUTXOState.right.get.tree.selfInspectionAfterFastSync shouldBe true + } + + "check tree assembly(with incorrect nodes)" in { + val (_, processor, _, _, manifest, history) = initializeTestState() + + val sn = settings + .copy( + levelDB = settings.levelDB.copy(maxVersions = 5), + snapshotSettings = settings.snapshotSettings.copy(newSnapshotCreationHeight = 10) + ) + + val chunks = manifest + .chunksKeys + .flatMap(processor.getChunkById) + .flatMap(SnapshotChunkSerializer.fromProto(_).toOption) + + val secondProcessor: SnapshotProcessor = + SnapshotProcessor + .create(sn, tmpDir) + .initializeApplicableChunksCache(history, 80).right.get + + chunks.foldLeft(secondProcessor) { case (proc, nextChunk) => + val node = nextChunk.node match { + case l@InternalNode(_, _, _, _, _, _) => + l.copy(height = random.nextInt()) + case l => l + } + val wUpdatedCache = proc.updateCache(nextChunk.copy(node = node)) + wUpdatedCache.processNextApplicableChunk(wUpdatedCache).leftFlatMap { + case e: ApplicableChunkIsAbsent => e.processor.asRight[FastSyncException] + case t => t.asLeft[SnapshotProcessor] + }.right.get + }.assembleUTXOState.isLeft shouldBe true } - val boxes: IndexedSeq[(StorageKey, StorageValue)] = (from until to) - .map(i => genAssetBox(address, i, nonce = i)) - .map { bx => - (StorageKey !@@ bx.id, StorageValue @@ bx.bytes) - } - - val firstAvl: AvlTree[StorageKey, StorageValue] = AvlTree[StorageKey, StorageValue](firstStorage) - firstAvl - .insertAndDeleteMany( - StorageVersion @@ Random.randomBytes(), - boxes.toList, - List.empty - ) + } - def tmpDir: File = FileHelper.getRandomTempDir } From 190fb36c31044be89f38dc0b9740bc6ae842a333 Mon Sep 17 00:00:00 2001 From: GRYE Date: Fri, 15 Nov 2019 14:42:55 +0300 Subject: [PATCH 3/5] Small refactoring --- src/main/scala/encry/view/fast/sync/SnapshotHolder.scala | 5 ++++- src/test/scala/encry/view/fast/sync/FastSyncTestsUtils.scala | 5 +++-- .../scala/encry/view/fast/sync/SubtreesAssemblerTest.scala | 4 ++-- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/main/scala/encry/view/fast/sync/SnapshotHolder.scala b/src/main/scala/encry/view/fast/sync/SnapshotHolder.scala index 364eb05cee..a4142a772c 100644 --- a/src/main/scala/encry/view/fast/sync/SnapshotHolder.scala +++ b/src/main/scala/encry/view/fast/sync/SnapshotHolder.scala @@ -136,7 +136,10 @@ class SnapshotHolder(settings: EncryAppSettings, if controller.requestedChunks.isEmpty && controller.notYetRequested.isEmpty && (processor.chunksCache.nonEmpty || processor.applicableChunks.nonEmpty) => - nodeViewSynchronizer ! BanPeer(remote, InvalidChunkMessage("For request is empty, buffer is nonEmpty")) + if (processor.chunksCache.nonEmpty) + nodeViewSynchronizer ! BanPeer(remote, InvalidChunkMessage("Requested and notYetRequested chunks are empty while chunks cache non empty")) + else if (processor.applicableChunks.nonEmpty) + nodeViewSynchronizer ! BanPeer(remote, InvalidChunkMessage("Requested and notYetRequested chunks are empty while applicable chunks non empty")) restartFastSync(history) case Right((processor, controller)) if controller.requestedChunks.isEmpty && controller.notYetRequested.isEmpty => diff --git a/src/test/scala/encry/view/fast/sync/FastSyncTestsUtils.scala b/src/test/scala/encry/view/fast/sync/FastSyncTestsUtils.scala index ca280e7562..289e8b3e72 100644 --- a/src/test/scala/encry/view/fast/sync/FastSyncTestsUtils.scala +++ b/src/test/scala/encry/view/fast/sync/FastSyncTestsUtils.scala @@ -24,7 +24,8 @@ object FastSyncTestsUtils extends InstanceFactory with TestNetSettings { def initializeTestState(initialBlocksQty: Int = 90, additionalBlocksQty: Int = 10, - addressOpt: Option[Address] = None + addressOpt: Option[Address] = None, + chunkHeight: Int = 1 ): (AvlTree[StorageKey, StorageValue], SnapshotProcessor, SnapshotDownloadController, @@ -65,7 +66,7 @@ object FastSyncTestsUtils extends InstanceFactory with TestNetSettings { (proc.processNewBlock(block, history).right.get, resultingTree) } else if (block.header.height % sn.snapshotSettings.newSnapshotCreationHeight == 0) { val chunks: List[SnapshotChunk] = - AvlTree.getChunks(resultingTree.rootNode, currentChunkHeight = 1, resultingTree.storage) + AvlTree.getChunks(resultingTree.rootNode, chunkHeight, resultingTree.storage) val potentialManifestId: Array[Byte] = Algos.hash(resultingTree.rootHash ++ block.id) (snapshotProcessor.createNewSnapshot(potentialManifestId, chunks).right.get, resultingTree) } else (proc, resultingTree) diff --git a/src/test/scala/encry/view/fast/sync/SubtreesAssemblerTest.scala b/src/test/scala/encry/view/fast/sync/SubtreesAssemblerTest.scala index 486f99085b..7cb10b7a19 100644 --- a/src/test/scala/encry/view/fast/sync/SubtreesAssemblerTest.scala +++ b/src/test/scala/encry/view/fast/sync/SubtreesAssemblerTest.scala @@ -25,7 +25,7 @@ class SubtreesAssemblerTest "check tree assembly(with correct nodes)" in { - val (_, processor, _, _, manifest, history) = initializeTestState() + val (_, processor, _, _, manifest, history) = initializeTestState(chunkHeight = 3) val sn = settings .copy( @@ -53,7 +53,7 @@ class SubtreesAssemblerTest } "check tree assembly(with incorrect nodes)" in { - val (_, processor, _, _, manifest, history) = initializeTestState() + val (_, processor, _, _, manifest, history) = initializeTestState(chunkHeight = 3) val sn = settings .copy( From a6539c7084991a3d33947dc5a5e85b350439b61c Mon Sep 17 00:00:00 2001 From: GRYE Date: Fri, 15 Nov 2019 15:18:39 +0300 Subject: [PATCH 4/5] Small refactoring --- src/main/scala/encry/network/BlackList.scala | 2 + .../encry/view/fast/sync/SnapshotHolder.scala | 29 ++++---- .../fast/sync/SubtreesAssemblerTest.scala | 66 ++++++++++++++++++- 3 files changed, 78 insertions(+), 19 deletions(-) diff --git a/src/main/scala/encry/network/BlackList.scala b/src/main/scala/encry/network/BlackList.scala index 9cbb5ea8ff..c6dba0117c 100644 --- a/src/main/scala/encry/network/BlackList.scala +++ b/src/main/scala/encry/network/BlackList.scala @@ -45,6 +45,8 @@ object BlackList { final case class InvalidResponseManifestMessage(error: String) extends BanReason final case class InvalidChunkMessage(error: String) extends BanReason final case class InvalidManifestHasChangedMessage(error: String) extends BanReason + case object NotAllChunksSentMessage extends BanReason + case object UnrequestedChunksSentMessage extends BanReason case object ExpiredNumberOfReRequestAttempts extends BanReason case object ExpiredNumberOfRequests extends BanReason final case class InvalidStateAfterFastSync(error: String) extends BanReason diff --git a/src/main/scala/encry/view/fast/sync/SnapshotHolder.scala b/src/main/scala/encry/view/fast/sync/SnapshotHolder.scala index a4142a772c..9e375cc599 100644 --- a/src/main/scala/encry/view/fast/sync/SnapshotHolder.scala +++ b/src/main/scala/encry/view/fast/sync/SnapshotHolder.scala @@ -2,12 +2,12 @@ package encry.view.fast.sync import SnapshotChunkProto.SnapshotChunkMessage import SnapshotManifestProto.SnapshotManifestProtoMessage -import akka.actor.{ Actor, ActorRef, Cancellable, Props } +import akka.actor.{Actor, ActorRef, Cancellable, Props} import com.google.protobuf.ByteString import com.typesafe.scalalogging.StrictLogging import encry.network.Broadcast -import encry.network.NetworkController.ReceivableMessages.{ DataFromPeer, RegisterMessagesHandler } -import encry.network.PeersKeeper.{ BanPeer, SendToNetwork } +import encry.network.NetworkController.ReceivableMessages.{DataFromPeer, RegisterMessagesHandler} +import encry.network.PeersKeeper.{BanPeer, SendToNetwork} import encry.settings.EncryAppSettings import SnapshotHolder._ import encry.view.state.UtxoState @@ -15,19 +15,14 @@ import org.encryfoundation.common.modifiers.history.Block import org.encryfoundation.common.network.BasicMessagesRepo._ import org.encryfoundation.common.utils.Algos import cats.syntax.option._ -import encry.network.BlackList.BanReason.{ - ExpiredNumberOfReRequestAttempts, - ExpiredNumberOfRequests, - InvalidChunkMessage, - InvalidResponseManifestMessage, - InvalidStateAfterFastSync -} -import encry.network.NodeViewSynchronizer.ReceivableMessages.{ ChangedHistory, SemanticallySuccessfulModifier } -import encry.storage.VersionalStorage.{ StorageKey, StorageValue } -import encry.view.fast.sync.FastSyncExceptions.{ ApplicableChunkIsAbsent, FastSyncException } +import encry.network.BlackList.BanReason.{ExpiredNumberOfReRequestAttempts, ExpiredNumberOfRequests, InvalidChunkMessage, InvalidResponseManifestMessage, InvalidStateAfterFastSync, NotAllChunksSentMessage, UnrequestedChunksSentMessage} +import encry.network.NodeViewSynchronizer.ReceivableMessages.{ChangedHistory, SemanticallySuccessfulModifier} +import encry.storage.VersionalStorage.{StorageKey, StorageValue} +import encry.view.fast.sync.FastSyncExceptions.{ApplicableChunkIsAbsent, FastSyncException} import encry.view.history.History -import encry.view.state.avlTree.{ Node, NodeSerilalizer } +import encry.view.state.avlTree.{Node, NodeSerilalizer} import cats.syntax.either._ + import scala.util.Try import encry.view.state.avlTree.utils.implicits.Instances._ @@ -137,9 +132,9 @@ class SnapshotHolder(settings: EncryAppSettings, controller.notYetRequested.isEmpty && (processor.chunksCache.nonEmpty || processor.applicableChunks.nonEmpty) => if (processor.chunksCache.nonEmpty) - nodeViewSynchronizer ! BanPeer(remote, InvalidChunkMessage("Requested and notYetRequested chunks are empty while chunks cache non empty")) - else if (processor.applicableChunks.nonEmpty) - nodeViewSynchronizer ! BanPeer(remote, InvalidChunkMessage("Requested and notYetRequested chunks are empty while applicable chunks non empty")) + nodeViewSynchronizer ! BanPeer(remote, UnrequestedChunksSentMessage) + else + nodeViewSynchronizer ! BanPeer(remote, NotAllChunksSentMessage) restartFastSync(history) case Right((processor, controller)) if controller.requestedChunks.isEmpty && controller.notYetRequested.isEmpty => diff --git a/src/test/scala/encry/view/fast/sync/SubtreesAssemblerTest.scala b/src/test/scala/encry/view/fast/sync/SubtreesAssemblerTest.scala index 7cb10b7a19..27729c550d 100644 --- a/src/test/scala/encry/view/fast/sync/SubtreesAssemblerTest.scala +++ b/src/test/scala/encry/view/fast/sync/SubtreesAssemblerTest.scala @@ -23,7 +23,7 @@ class SubtreesAssemblerTest "Trees validator" should { - "check tree assembly(with correct nodes)" in { + "check tree assembly(with correct nodes) with non-default chunk height" in { val (_, processor, _, _, manifest, history) = initializeTestState(chunkHeight = 3) @@ -52,7 +52,7 @@ class SubtreesAssemblerTest }.assembleUTXOState.right.get.tree.selfInspectionAfterFastSync shouldBe true } - "check tree assembly(with incorrect nodes)" in { + "check tree assembly(with incorrect nodes) with non-default chunk height" in { val (_, processor, _, _, manifest, history) = initializeTestState(chunkHeight = 3) val sn = settings @@ -85,6 +85,68 @@ class SubtreesAssemblerTest }.assembleUTXOState.isLeft shouldBe true } + "check tree assembly(with correct nodes) with default chunk height" in { + + val (_, processor, _, _, manifest, history) = initializeTestState() + + val sn = settings + .copy( + levelDB = settings.levelDB.copy(maxVersions = 5), + snapshotSettings = settings.snapshotSettings.copy(newSnapshotCreationHeight = 10) + ) + + val chunks = manifest + .chunksKeys + .flatMap(processor.getChunkById) + .flatMap(SnapshotChunkSerializer.fromProto(_).toOption) + + val secondProcessor: SnapshotProcessor = + SnapshotProcessor + .create(sn, tmpDir) + .initializeApplicableChunksCache(history, 80).right.get + + chunks.foldLeft(secondProcessor) { case (proc, nextChunk) => + val wUpdatedCache = proc.updateCache(nextChunk) + wUpdatedCache.processNextApplicableChunk(wUpdatedCache).leftFlatMap { + case e: ApplicableChunkIsAbsent => e.processor.asRight[FastSyncException] + case t => t.asLeft[SnapshotProcessor] + }.right.get + }.assembleUTXOState.right.get.tree.selfInspectionAfterFastSync shouldBe true + } + + "check tree assembly(with incorrect nodes) with default chunk height" in { + val (_, processor, _, _, manifest, history) = initializeTestState() + + val sn = settings + .copy( + levelDB = settings.levelDB.copy(maxVersions = 5), + snapshotSettings = settings.snapshotSettings.copy(newSnapshotCreationHeight = 10) + ) + + val chunks = manifest + .chunksKeys + .flatMap(processor.getChunkById) + .flatMap(SnapshotChunkSerializer.fromProto(_).toOption) + + val secondProcessor: SnapshotProcessor = + SnapshotProcessor + .create(sn, tmpDir) + .initializeApplicableChunksCache(history, 80).right.get + + chunks.foldLeft(secondProcessor) { case (proc, nextChunk) => + val node = nextChunk.node match { + case l@InternalNode(_, _, _, _, _, _) => + l.copy(height = random.nextInt()) + case l => l + } + val wUpdatedCache = proc.updateCache(nextChunk.copy(node = node)) + wUpdatedCache.processNextApplicableChunk(wUpdatedCache).leftFlatMap { + case e: ApplicableChunkIsAbsent => e.processor.asRight[FastSyncException] + case t => t.asLeft[SnapshotProcessor] + }.right.get + }.assembleUTXOState.isLeft shouldBe true + } + } } From 70520741d376de18fac14e0f1b42d949abe8b479 Mon Sep 17 00:00:00 2001 From: GRYE Date: Mon, 18 Nov 2019 17:34:41 +0300 Subject: [PATCH 5/5] Small refactoring --- .../encry/view/fast/sync/SnapshotHolder.scala | 21 +++---- .../view/fast/sync/SnapshotProcessor.scala | 62 +++++++------------ .../fast/sync/SnapshotProcessorTest.scala | 52 ++++++++++++---- 3 files changed, 75 insertions(+), 60 deletions(-) diff --git a/src/main/scala/encry/view/fast/sync/SnapshotHolder.scala b/src/main/scala/encry/view/fast/sync/SnapshotHolder.scala index 9e375cc599..8290ffb700 100644 --- a/src/main/scala/encry/view/fast/sync/SnapshotHolder.scala +++ b/src/main/scala/encry/view/fast/sync/SnapshotHolder.scala @@ -3,28 +3,28 @@ package encry.view.fast.sync import SnapshotChunkProto.SnapshotChunkMessage import SnapshotManifestProto.SnapshotManifestProtoMessage import akka.actor.{Actor, ActorRef, Cancellable, Props} +import cats.syntax.either._ +import cats.syntax.option._ import com.google.protobuf.ByteString import com.typesafe.scalalogging.StrictLogging +import encry.network.BlackList.BanReason._ import encry.network.Broadcast import encry.network.NetworkController.ReceivableMessages.{DataFromPeer, RegisterMessagesHandler} +import encry.network.NodeViewSynchronizer.ReceivableMessages.{ChangedHistory, SemanticallySuccessfulModifier} import encry.network.PeersKeeper.{BanPeer, SendToNetwork} import encry.settings.EncryAppSettings -import SnapshotHolder._ -import encry.view.state.UtxoState -import org.encryfoundation.common.modifiers.history.Block -import org.encryfoundation.common.network.BasicMessagesRepo._ -import org.encryfoundation.common.utils.Algos -import cats.syntax.option._ -import encry.network.BlackList.BanReason.{ExpiredNumberOfReRequestAttempts, ExpiredNumberOfRequests, InvalidChunkMessage, InvalidResponseManifestMessage, InvalidStateAfterFastSync, NotAllChunksSentMessage, UnrequestedChunksSentMessage} -import encry.network.NodeViewSynchronizer.ReceivableMessages.{ChangedHistory, SemanticallySuccessfulModifier} import encry.storage.VersionalStorage.{StorageKey, StorageValue} import encry.view.fast.sync.FastSyncExceptions.{ApplicableChunkIsAbsent, FastSyncException} +import encry.view.fast.sync.SnapshotHolder._ import encry.view.history.History +import encry.view.state.UtxoState +import encry.view.state.avlTree.utils.implicits.Instances._ import encry.view.state.avlTree.{Node, NodeSerilalizer} -import cats.syntax.either._ +import org.encryfoundation.common.modifiers.history.Block +import org.encryfoundation.common.network.BasicMessagesRepo._ +import org.encryfoundation.common.utils.Algos import scala.util.Try -import encry.view.state.avlTree.utils.implicits.Instances._ class SnapshotHolder(settings: EncryAppSettings, networkController: ActorRef, @@ -233,7 +233,6 @@ class SnapshotHolder(settings: EncryAppSettings, def workMod(history: History): Receive = { case TreeChunks(chunks, id) => //todo add collection with potentialManifestsIds to NVH - snapshotProcessor.createNewSnapshot(id, chunks).fold( err => logger.warn(s"Failed to create new snapshot due to ${err.error}"), newProc => snapshotProcessor = newProc diff --git a/src/main/scala/encry/view/fast/sync/SnapshotProcessor.scala b/src/main/scala/encry/view/fast/sync/SnapshotProcessor.scala index 8e6fbb6c04..a6aff4128b 100644 --- a/src/main/scala/encry/view/fast/sync/SnapshotProcessor.scala +++ b/src/main/scala/encry/view/fast/sync/SnapshotProcessor.scala @@ -1,46 +1,30 @@ package encry.view.fast.sync import java.io.File -import encry.storage.VersionalStorage.{ StorageKey, StorageValue, StorageVersion } -import encry.view.state.UtxoState -import org.encryfoundation.common.modifiers.history.Block + +import cats.syntax.either._ +import com.google.common.primitives.Ints import com.typesafe.scalalogging.StrictLogging -import encry.settings.{ EncryAppSettings, LevelDBSettings } +import encry.settings.{EncryAppSettings, LevelDBSettings} import encry.storage.VersionalStorage +import encry.storage.VersionalStorage.{StorageKey, StorageValue, StorageVersion} import encry.storage.iodb.versionalIODB.IODBWrapper -import encry.storage.levelDb.versionalLevelDB.{ LevelDbFactory, VLDBWrapper, VersionalLevelDBCompanion } -import encry.view.fast.sync.SnapshotHolder.{ - SnapshotChunk, - SnapshotChunkSerializer, - SnapshotManifest, - SnapshotManifestSerializer -} -import encry.view.state.avlTree.{ AvlTree, InternalNode, LeafNode, Node, NodeSerilalizer, ShadowNode } +import encry.storage.levelDb.versionalLevelDB.{LevelDbFactory, VLDBWrapper, VersionalLevelDBCompanion} +import encry.view.fast.sync.FastSyncExceptions._ +import encry.view.fast.sync.SnapshotHolder.{SnapshotChunk, SnapshotChunkSerializer, SnapshotManifest, SnapshotManifestSerializer} +import encry.view.history.History +import encry.view.state.UtxoState +import encry.view.state.avlTree.utils.implicits.Instances._ +import encry.view.state.avlTree._ +import io.iohk.iodb.{ByteArrayWrapper, LSMStore} +import org.encryfoundation.common.modifiers.history.Block import org.encryfoundation.common.utils.Algos +import org.encryfoundation.common.utils.TaggedTypes.Height +import org.iq80.leveldb.{DB, Options} import scorex.utils.Random -import encry.view.state.avlTree.utils.implicits.Instances._ -import io.iohk.iodb.{ ByteArrayWrapper, LSMStore } -import org.iq80.leveldb.{ DB, Options } -import cats.syntax.either._ + +import scala.collection.immutable.{HashMap, HashSet} import scala.language.postfixOps -import com.google.common.primitives.Ints -import encry.view.fast.sync.FastSyncExceptions.{ - ApplicableChunkIsAbsent, - BestHeaderAtHeightIsAbsent, - ChunkApplyError, - ChunkValidationError, - EmptyHeightKey, - EmptyRootNodeError, - FastSyncException, - InconsistentChunkId, - InitializeHeightAndRootKeysException, - ProcessNewBlockError, - ProcessNewSnapshotError, - UtxoCreationError -} -import encry.view.history.History -import org.encryfoundation.common.utils.TaggedTypes.Height -import scala.collection.immutable.{ HashMap, HashSet } final case class SnapshotProcessor(settings: EncryAppSettings, storage: VersionalStorage, @@ -194,8 +178,9 @@ final case class SnapshotProcessor(settings: EncryAppSettings, def createNewSnapshot( id: Array[Byte], newChunks: List[SnapshotChunk] - ): Either[ProcessNewSnapshotError, SnapshotProcessor] = - if (potentialManifestsIds.exists(_.sameElements(id))) + ): Either[ProcessNewSnapshotError, SnapshotProcessor] = { + val potential = potentialManifestsIds + if (potential.exists(_.sameElements(id))) ProcessNewSnapshotError(s"Potential manifest with id ${Algos.encode(id)} already exists").asLeft[SnapshotProcessor] else { //todo add only exists chunks @@ -209,14 +194,15 @@ final case class SnapshotProcessor(settings: EncryAppSettings, .toProto(manifest) .toByteArray val updateList: (StorageKey, StorageValue) = - PotentialManifestsIdsKey -> StorageValue @@ (manifest.manifestId :: potentialManifestsIds.toList).flatten.toArray + PotentialManifestsIdsKey -> StorageValue @@ (manifest.manifestId :: potential.toList).flatten.toArray val toApply: List[(StorageKey, StorageValue)] = manifestToDB :: updateList :: snapshotToDB logger.info(s"A new snapshot created successfully. Insertion started.") Either.catchNonFatal(storage.insert(StorageVersion @@ Random.randomBytes(), toApply, List.empty)) match { case Left(value) => ProcessNewSnapshotError(value.getMessage).asLeft[SnapshotProcessor] - case Right(_) => this.asRight[ProcessNewSnapshotError] + case Right(_) => this.asRight[ProcessNewSnapshotError] } } + } private def updateActualSnapshot(history: History, height: Int): Either[ProcessNewBlockError, SnapshotProcessor] = for { diff --git a/src/test/scala/encry/view/fast/sync/SnapshotProcessorTest.scala b/src/test/scala/encry/view/fast/sync/SnapshotProcessorTest.scala index c5fd4ea772..eae7e1a7e9 100644 --- a/src/test/scala/encry/view/fast/sync/SnapshotProcessorTest.scala +++ b/src/test/scala/encry/view/fast/sync/SnapshotProcessorTest.scala @@ -2,9 +2,9 @@ package encry.view.fast.sync import com.typesafe.scalalogging.StrictLogging import encry.modifiers.InstanceFactory +import encry.network.DeliveryManagerTests.DMUtils import encry.settings.TestNetSettings import encry.storage.VersionalStorage.{StorageKey, StorageValue} -import encry.network.DeliveryManagerTests.DMUtils import encry.view.fast.sync.FastSyncTestsUtils._ import encry.view.fast.sync.SnapshotHolder.SnapshotChunk import encry.view.state.avlTree.AvlTree @@ -26,31 +26,61 @@ class SnapshotProcessorTest "process new snapshot function" should { "correctly create new snapshot if such doesn't exist" in { + val sn = settings.copy(snapshotSettings = settings.snapshotSettings.copy(newSnapshotCreationHeight = 4), + levelDB = settings.levelDB.copy(maxVersions = 1)) + val snapshotProcessor: SnapshotProcessor = SnapshotProcessor.create(sn, tmpDir) + + val avl1: AvlTree[StorageKey, StorageValue] = + createAvl("9gKDVmfsA6J4b78jDBx6JmS86Zph98NnjnUqTJBkW7zitQMReia", 0, 20) + val avl2: AvlTree[StorageKey, StorageValue] = + createAvl("9gKDVmfsA6J4b78jDBx6JmS86Zph98NnjnUqTJBkW7zitQMReia", 5, 25) + val avl3: AvlTree[StorageKey, StorageValue] = + createAvl("9gKDVmfsA6J4b78jDBx6JmS86Zph98NnjnUqTJBkW7zitQMReia", 10, 30) - val (_, processor1, _, _, _, _) = initializeTestState() - val (_, processor2, _, _, _, _) = initializeTestState() - val (_, processor3, _, _, _, _) = initializeTestState() + val block1: Block = generateGenesisBlock(Height @@ 1) + val block2: Block = generateGenesisBlock(Height @@ 1) + val block3: Block = generateGenesisBlock(Height @@ 1) - val listIds = processor1.actualManifestId.get :: processor2.actualManifestId.get :: processor3.actualManifestId.get :: Nil + val processor1 = snapshotProcessor + .createNewSnapshot(Algos.hash(avl1.rootHash ++ block1.header.id), List(SnapshotChunk(avl1.rootNode, avl1.rootHash))).right.get + + val processor2 = snapshotProcessor + .createNewSnapshot(Algos.hash(avl2.rootHash ++ block2.header.id), List(SnapshotChunk(avl2.rootNode, avl2.rootHash))).right.get + + val processor3 = snapshotProcessor + .createNewSnapshot(Algos.hash(avl3.rootHash ++ block3.header.id), List(SnapshotChunk(avl3.rootNode, avl3.rootHash))).right.get + + val id1 = Algos.hash(avl1.rootHash ++ block1.id) + val id2 = Algos.hash(avl2.rootHash ++ block2.id) + val id3 = Algos.hash(avl3.rootHash ++ block3.id) + + val listIds = id1 :: id2 :: id3 :: Nil processor3.potentialManifestsIds.forall { id => listIds.exists(_.sameElements(id)) } shouldBe true - processor1.actualManifest.get.chunksKeys.forall { id => - processor1.getChunkById(id).nonEmpty + listIds.forall { id => + processor3.potentialManifestsIds.exists(_.sameElements(id)) } shouldBe true - processor2.actualManifest.get.chunksKeys.forall { id => - processor2.getChunkById(id).nonEmpty + val ids1 = processor3.manifestById(StorageKey @@ id1).get.chunksKeys + val ids2 = processor3.manifestById(StorageKey @@ id2).get.chunksKeys + val ids3 = processor3.manifestById(StorageKey @@ id3).get.chunksKeys + + ids1.forall { id => + processor3.getChunkById(id).nonEmpty } shouldBe true - processor3.actualManifest.get.chunksKeys.forall { id => + ids2.forall { id => processor3.getChunkById(id).nonEmpty } shouldBe true + ids3.forall { id => + processor3.getChunkById(id).nonEmpty + } shouldBe true } - + "skip snapshot creation if such already exists" in { val snapshotProcessor: SnapshotProcessor = SnapshotProcessor.create(settings, tmpDir) val avl1: AvlTree[StorageKey, StorageValue] =