diff --git a/src/main/scala/encry/view/NodeViewHolder.scala b/src/main/scala/encry/view/NodeViewHolder.scala index 357b431ba2..be9b5133e9 100644 --- a/src/main/scala/encry/view/NodeViewHolder.scala +++ b/src/main/scala/encry/view/NodeViewHolder.scala @@ -23,8 +23,9 @@ import encry.view.NodeViewHolder.ReceivableMessages._ import encry.view.NodeViewHolder._ import encry.view.fast.sync.SnapshotHolder.SnapshotManifest.ManifestId import encry.view.fast.sync.SnapshotHolder._ +import encry.view.history.processors.{HeaderFullChainProcessorComponent, PayloadFullChainProcessorComponent} import encry.view.history.storage.HistoryStorage -import encry.view.history.{History, HistoryHeadersProcessor, HistoryPayloadsProcessor} +import encry.view.history.{History, HistoryApi} import encry.view.mempool.MemoryPool.RolledBackTransactions import encry.view.state.UtxoState import encry.view.state.avlTree.AvlTree @@ -96,7 +97,7 @@ class NodeViewHolder(memoryPoolRef: ActorRef, FileUtils.deleteDirectory(new File(s"${encrySettings.directory}/keysTmp")) FileUtils.deleteDirectory(new File(s"${encrySettings.directory}/walletTmp")) logger.info(s"Updated best block in fast sync mod. Updated state height.") - val newHistory = new History with HistoryHeadersProcessor with HistoryPayloadsProcessor { + val newHistory = new History with HeaderFullChainProcessorComponent with PayloadFullChainProcessorComponent { override val settings: EncryAppSettings = encrySettings override var isFullChainSynced: Boolean = settings.node.offlineGeneration override val timeProvider: NetworkTimeProvider = EncryApp.timeProvider diff --git a/src/main/scala/encry/view/history/BlockDownloadProcessor.scala b/src/main/scala/encry/view/history/BlockDownloadProcessor.scala index 14495dfc84..94d4867ade 100644 --- a/src/main/scala/encry/view/history/BlockDownloadProcessor.scala +++ b/src/main/scala/encry/view/history/BlockDownloadProcessor.scala @@ -35,4 +35,4 @@ case class BlockDownloadProcessor(nodeSettings: NodeSettings, constants: Constan } else if (nodeSettings.blocksToKeep >= 0) Math.max(header.height - nodeSettings.blocksToKeep + 1, minimalBlockHeightVar) else constants.GenesisHeight } -} +} \ No newline at end of file diff --git a/src/main/scala/encry/view/history/FastSyncProcessor.scala b/src/main/scala/encry/view/history/FastSyncProcessor.scala deleted file mode 100644 index 6e71c10856..0000000000 --- a/src/main/scala/encry/view/history/FastSyncProcessor.scala +++ /dev/null @@ -1,26 +0,0 @@ -package encry.view.history - -import cats.syntax.option.none -import encry.consensus.HistoryConsensus.ProgressInfo -import encry.storage.VersionalStorage.{StorageKey, StorageValue, StorageVersion} -import org.encryfoundation.common.modifiers.history.Payload - -trait FastSyncProcessor extends HistoryApi { - - def processPayload(payload: Payload): ProgressInfo = { - val startTime: Long = System.currentTimeMillis() - getBlockByPayload(payload).foreach { block => - logger.info(s"processPayloadFastSync") - historyStorage.bulkInsert(payload.id, Seq(BestBlockKey -> payload.headerId), Seq(payload)) - blockDownloadProcessor.updateBestBlock(block.header) - logger.info(s"BlockDownloadProcessor updated block at height ${block.header.height} successfully") - historyStorage.insert( - StorageVersion @@ validityKey(block.payload.id).untag(StorageKey), - List(block.header.id, block.payload.id).map(id => validityKey(id) -> StorageValue @@ Array(1.toByte)) - ) - logger.info(s"Finished processing block ${block.encodedId}. " + - s"Processing time is ${(System.currentTimeMillis() - startTime) / 1000} s") - } - ProgressInfo(none, Seq.empty, Seq.empty, none) - } -} diff --git a/src/main/scala/encry/view/history/History.scala b/src/main/scala/encry/view/history/History.scala index d2a6154eb3..148ea51797 100644 --- a/src/main/scala/encry/view/history/History.scala +++ b/src/main/scala/encry/view/history/History.scala @@ -1,6 +1,7 @@ package encry.view.history import java.io.File + import com.typesafe.scalalogging.StrictLogging import encry.consensus.HistoryConsensus.ProgressInfo import encry.settings._ @@ -10,37 +11,34 @@ import encry.storage.iodb.versionalIODB.IODBHistoryWrapper import encry.storage.levelDb.versionalLevelDB.{LevelDbFactory, VLDBWrapper, VersionalLevelDBCompanion} import encry.utils.NetworkTimeProvider import encry.view.history.storage.HistoryStorage -import io.iohk.iodb.{ByteArrayWrapper, LSMStore} +import io.iohk.iodb.LSMStore import org.encryfoundation.common.modifiers.PersistentModifier import org.encryfoundation.common.modifiers.history.{Block, Header, Payload} import org.encryfoundation.common.utils.Algos import org.encryfoundation.common.utils.TaggedTypes.ModifierId import org.iq80.leveldb.Options import cats.syntax.either._ -import supertagged.@@ +import encry.view.history.processors._ /** * History implementation. It is processing persistent modifiers generated locally or received from the network. **/ -trait History extends HistoryModifiersValidator with AutoCloseable { +trait History extends HistoryModifiersValidator with HistoryApi with AutoCloseable { + this: HistoryHeaderProcessorComponent with HistoryPayloadProcessorComponent => var isFullChainSynced: Boolean /** Appends modifier to the history if it is applicable. */ - def append(modifier: PersistentModifier): Either[Throwable, (History, ProgressInfo)] = { + final def append(modifier: PersistentModifier): Either[Throwable, (History, ProgressInfo)] = { logger.info(s"Trying to append modifier ${Algos.encode(modifier.id)} of type ${modifier.modifierTypeId} to history") Either.catchNonFatal(modifier match { case header: Header => logger.info(s"Append header ${header.encodedId} at height ${header.height} to history") - (this, processHeader(header)) - case payload: Payload => (this, processPayload(payload)) + (this, headerProcessor.processHeader(header)) + case payload: Payload => (this, payloadProcessor.processPayload(payload)) }) } - def processHeader(h: Header): ProgressInfo - - def processPayload(payload: Payload): ProgressInfo - /** @return header, that corresponds to modifier */ private def correspondingHeader(modifier: PersistentModifier): Option[Header] = modifier match { case header: Header => Some(header) @@ -54,7 +52,7 @@ trait History extends HistoryModifiersValidator with AutoCloseable { * @param modifier that is invalid against the State * @return ProgressInfo with next modifier to try to apply */ - def reportModifierIsInvalid(modifier: PersistentModifier): (History, ProgressInfo) = { + final def reportModifierIsInvalid(modifier: PersistentModifier): (History, ProgressInfo) = { logger.info(s"Modifier ${modifier.encodedId} of type ${modifier.modifierTypeId} is marked as invalid") correspondingHeader(modifier) match { case Some(invalidatedHeader) => @@ -120,7 +118,7 @@ trait History extends HistoryModifiersValidator with AutoCloseable { * @param modifier that is valid against the State * @return ProgressInfo with next modifier to try to apply */ - def reportModifierIsValid(modifier: PersistentModifier): History = { + final def reportModifierIsValid(modifier: PersistentModifier): History = { logger.info(s"Modifier ${modifier.encodedId} of type ${modifier.modifierTypeId} is marked as valid ") modifier match { case block: Block => @@ -140,9 +138,9 @@ trait History extends HistoryModifiersValidator with AutoCloseable { } } - override def close(): Unit = historyStorage.close() + override final protected[view] def close(): Unit = historyStorage.close() - def closeStorage(): Unit = historyStorage.close() + final protected[view] def closeStorage(): Unit = historyStorage.close() } object History extends StrictLogging { @@ -159,7 +157,7 @@ object History extends StrictLogging { dir } - def readOrGenerate(settingsEncry: EncryAppSettings, ntp: NetworkTimeProvider): History = { + def readOrGenerate(settingsEncry: EncryAppSettings, ntp: NetworkTimeProvider): History with HistoryApi = { val historyIndexDir: File = getHistoryIndexDir(settingsEncry) //Check what kind of storage in settings: val vldbInit = settingsEncry.storage.history match { @@ -175,14 +173,14 @@ object History extends StrictLogging { VLDBWrapper(VersionalLevelDBCompanion(levelDBInit, settingsEncry.levelDB)) } if (settingsEncry.snapshotSettings.enableFastSynchronization && !settingsEncry.node.offlineGeneration) - new History with HistoryHeadersProcessor with FastSyncProcessor { + new History with HeaderFullChainProcessorComponent with PayloadFastSyncProcessorComponent { override val settings: EncryAppSettings = settingsEncry override var isFullChainSynced: Boolean = settings.node.offlineGeneration override val historyStorage: HistoryStorage = HistoryStorage(vldbInit) override val timeProvider: NetworkTimeProvider = new NetworkTimeProvider(settingsEncry.ntp) } else - new History with HistoryHeadersProcessor with HistoryPayloadsProcessor { + new History with HeaderFullChainProcessorComponent with PayloadFullChainProcessorComponent { override val settings: EncryAppSettings = settingsEncry override var isFullChainSynced: Boolean = settings.node.offlineGeneration override val historyStorage: HistoryStorage = HistoryStorage(vldbInit) diff --git a/src/main/scala/encry/view/history/HistoryApi.scala b/src/main/scala/encry/view/history/HistoryApi.scala index a367c509ad..cb49fc887a 100644 --- a/src/main/scala/encry/view/history/HistoryApi.scala +++ b/src/main/scala/encry/view/history/HistoryApi.scala @@ -1,6 +1,5 @@ package encry.view.history -import cats.syntax.option._ import encry.consensus.HistoryConsensus._ import encry.consensus._ import encry.modifiers.history._ @@ -11,7 +10,7 @@ import io.iohk.iodb.ByteArrayWrapper import org.encryfoundation.common.modifiers.history._ import org.encryfoundation.common.network.SyncInfo import org.encryfoundation.common.utils.Algos -import org.encryfoundation.common.utils.TaggedTypes.{Difficulty, Height, ModifierId, ModifierTypeId} +import org.encryfoundation.common.utils.TaggedTypes.{Difficulty, Height, ModifierId} import scala.annotation.tailrec import scala.collection.immutable.HashSet @@ -19,153 +18,108 @@ trait HistoryApi extends HistoryDBApi { //scalastyle:ignore val timeProvider: NetworkTimeProvider - var headersCacheIndexes: Map[Int, Seq[ModifierId]] = Map.empty[Int, Seq[ModifierId]] + private var headersCacheIndexes: Map[Int, Seq[ModifierId]] = Map.empty[Int, Seq[ModifierId]] - var headersCache: Map[ByteArrayWrapper, Header] = Map.empty[ByteArrayWrapper, Header] + private var headersCache: Map[ByteArrayWrapper, Header] = Map.empty[ByteArrayWrapper, Header] - var blocksCacheIndexes: Map[Int, Seq[ModifierId]] = Map.empty[Int, Seq[ModifierId]] + private var blocksCacheIndexes: Map[Int, Seq[ModifierId]] = Map.empty[Int, Seq[ModifierId]] - var blocksCache: Map[ByteArrayWrapper, Block] = Map.empty[ByteArrayWrapper, Block] + private var blocksCache: Map[ByteArrayWrapper, Block] = Map.empty[ByteArrayWrapper, Block] private var lastSyncInfo: SyncInfo = SyncInfo(Seq.empty[ModifierId]) lazy val blockDownloadProcessor: BlockDownloadProcessor = BlockDownloadProcessor(settings.node, settings.constants) - var isHeadersChainSyncedVar: Boolean = false + final var isHeadersChainSyncedVar: Boolean = false final case class FastSyncProcessor(localSettings: EncryAppSettings) { var fastSyncVal: Boolean = settings.snapshotSettings.enableFastSynchronization && !settings.node.offlineGeneration } - var lastAvailableManifestHeight: Int = 0 + final var lastAvailableManifestHeight: Int = 0 - lazy val fastSyncInProgress: FastSyncProcessor = FastSyncProcessor(settings) + final lazy val fastSyncInProgress: FastSyncProcessor = FastSyncProcessor(settings) - def getHeaderById(id: ModifierId): Option[Header] = headersCache + def payloadsIdsToDownload(howMany: Int, excluding: HashSet[ModifierId]): Seq[ModifierId] + + final def getHeaderById(id: ModifierId): Option[Header] = headersCache .get(ByteArrayWrapper(id)) .orElse(blocksCache.get(ByteArrayWrapper(id)).map(_.header)) .orElse(getHeaderByIdDB(id)) - def getBlockByHeaderId(id: ModifierId): Option[Block] = - blocksCache - .get(ByteArrayWrapper(id)) - .orElse(headersCache.get(ByteArrayWrapper(id)) - .flatMap(h => getPayloadByIdDB(h.payloadId).map(p => Block(h, p)))) - .orElse(getBlockByHeaderIdDB(id)) - - def getBlockByHeader(header: Header): Option[Block] = blocksCache + final def getBlockByHeader(header: Header): Option[Block] = blocksCache .get(ByteArrayWrapper(header.id)) .orElse(getPayloadByIdDB(header.payloadId).map(p => Block(header, p))) - def getBestHeader: Option[Header] = getBestHeaderId.flatMap(id => + final def getBestHeader: Option[Header] = getBestHeaderId.flatMap(id => headersCache .get(ByteArrayWrapper(id)) .orElse(blocksCache.get(ByteArrayWrapper(id)).map(_.header)) .orElse(getHeaderByIdDB(id)) ) - def getBestHeaderHeight: Int = getBestHeaderId.flatMap(id => + final def getBestHeaderHeight: Int = getBestHeaderId.flatMap(id => headersCache.get(ByteArrayWrapper(id)).map(_.height) .orElse(blocksCache.get(ByteArrayWrapper(id)).map(_.header.height)) .orElse(getHeightByHeaderId(id)) ).getOrElse(settings.constants.PreGenesisHeight) - def getBestBlock: Option[Block] = getBestBlockId.flatMap(id => + final def getBestBlock: Option[Block] = getBestBlockId.flatMap(id => blocksCache.get(ByteArrayWrapper(id)) .orElse(getBlockByHeaderIdDB(id)) ) - def getBestBlockHeight: Int = getBestBlockId + final def getBestBlockHeight: Int = getBestBlockId .flatMap(id => blocksCache.get(ByteArrayWrapper(id)).map(_.header.height).orElse(getHeightByHeaderId(id))) .getOrElse(settings.constants.PreGenesisHeight) - def getHeaderOfBestBlock: Option[Header] = getBestBlockId.flatMap(id => + final def getHeaderOfBestBlock: Option[Header] = getBestBlockId.flatMap(id => headersCache.get(ByteArrayWrapper(id)) .orElse(blocksCache.get(ByteArrayWrapper(id)).map(_.header)) .orElse(getHeaderByIdDB(id)) ) - def getBestHeaderAtHeight(h: Int): Option[Header] = getBestHeaderAtHeightDB(h) + final def getBestHeaderAtHeight(h: Int): Option[Header] = getBestHeaderAtHeightDB(h) - def getBlockByPayload(payload: Payload): Option[Block] = headersCache + final def getBlockByPayload(payload: Payload): Option[Block] = headersCache .get(ByteArrayWrapper(payload.headerId)).map(h => Block(h, payload)) .orElse(blocksCache.get(ByteArrayWrapper(payload.headerId))) .orElse(getHeaderById(payload.headerId).flatMap(h => Some(Block(h, payload)))) - def getHeightByHeaderId(id: ModifierId): Option[Int] = headersCache + final def getHeightByHeaderId(id: ModifierId): Option[Int] = headersCache .get(ByteArrayWrapper(id)).map(_.height) .orElse(blocksCache.get(ByteArrayWrapper(id)).map(_.header.height)) .orElse(getHeightByHeaderIdDB(id)) - def isBestBlockDefined: Boolean = + final def isBestBlockDefined: Boolean = getBestBlockId.map(id => blocksCache.contains(ByteArrayWrapper(id))).isDefined || getHeaderOfBestBlock.map(h => isModifierDefined(h.payloadId)).isDefined - def isBlockDefined(header: Header): Boolean = + final def isBlockDefined(header: Header): Boolean = blocksCache.get(ByteArrayWrapper(header.id)).isDefined || isModifierDefined(header.payloadId) - def isHeaderDefined(id: ModifierId): Boolean = + final def isHeaderDefined(id: ModifierId): Boolean = headersCache.get(ByteArrayWrapper(id)).isDefined || blocksCache.get(ByteArrayWrapper(id)).isDefined || isModifierDefined(id) - def getBestHeaderIdAtHeight(h: Int): Option[ModifierId] = getBestHeaderIdAtHeightDB(h) - def headerIdsAtHeight(height: Int): Seq[ModifierId] = headerIdsAtHeightDB(height) + final def getBestHeaderIdAtHeight(h: Int): Option[ModifierId] = getBestHeaderIdAtHeightDB(h) + final def headerIdsAtHeight(height: Int): Seq[ModifierId] = headerIdsAtHeightDB(height) .getOrElse(Seq.empty[ModifierId]) - def modifierBytesById(id: ModifierId): Option[Array[Byte]] = headersCache + final def modifierBytesById(id: ModifierId): Option[Array[Byte]] = headersCache .get(ByteArrayWrapper(id)).map(h => HeaderProtoSerializer.toProto(h).toByteArray) .orElse(blocksCache.get(ByteArrayWrapper(id)).map(b => BlockProtoSerializer.toProto(b).toByteArray)) .orElse(modifierBytesByIdDB(id)) - def lastHeaders(count: Int): HeaderChain = getBestHeader + final def lastHeaders(count: Int): HeaderChain = getBestHeader .map(bestHeader => headerChainBack(count, bestHeader, _ => false)) .getOrElse(HeaderChain.empty) - def getHeaderIds(count: Int, offset: Int = 0): Seq[ModifierId] = (offset until (count + offset)) + final def getHeaderIds(count: Int, offset: Int = 0): Seq[ModifierId] = (offset until (count + offset)) .flatMap(getBestHeaderIdAtHeight) - def payloadsIdsToDownload(howMany: Int, excluding: HashSet[ModifierId]): Seq[ModifierId] = { - @tailrec def continuation(height: Int, acc: Seq[ModifierId]): Seq[ModifierId] = - if (acc.lengthCompare(howMany) >= 0) acc - else if (height > lastAvailableManifestHeight && fastSyncInProgress.fastSyncVal) acc - else getBestHeaderIdAtHeight(height).flatMap(getHeaderById) match { - case Some(h) if !excluding.exists(_.sameElements(h.payloadId)) && !isBlockDefined(h) => - continuation(height + 1, acc :+ h.payloadId) - case Some(_) => - continuation(height + 1, acc) - case None => - acc - } - - (for { - bestBlockId <- getBestBlockId - headerLinkedToBestBlock <- getHeaderById(bestBlockId) - } yield headerLinkedToBestBlock) match { - case _ if !isHeadersChainSynced => - Seq.empty - case Some(header) if isInBestChain(header) => - continuation(header.height + 1, Seq.empty) - case Some(header) => - lastBestBlockHeightRelevantToBestChain(header.height) - .map(height => continuation(height + 1, Seq.empty)) - .getOrElse(continuation(blockDownloadProcessor.minimalBlockHeightVar, Seq.empty)) - case None => - continuation(blockDownloadProcessor.minimalBlockHeightVar, Seq.empty) - } - } - - def toDownload(header: Header): Option[(ModifierTypeId, ModifierId)] = - // Already synced and header is not too far back. Download required modifiers - if (header.height >= blockDownloadProcessor.minimalBlockHeight) (Payload.modifierTypeId -> header.payloadId).some - // Headers chain is synced after this header. Start downloading full blocks - else if (!isHeadersChainSynced && isNewHeader(header)) { - isHeadersChainSyncedVar = true - blockDownloadProcessor.updateBestBlock(header) - none - } else none - - def headerChainBack(limit: Int, startHeader: Header, until: Header => Boolean): HeaderChain = { + final def headerChainBack(limit: Int, startHeader: Header, until: Header => Boolean): HeaderChain = { @tailrec def loop(header: Header, acc: Seq[Header]): Seq[Header] = { if (acc.length == limit || until(header)) acc else getHeaderById(header.parentId) match { @@ -182,12 +136,12 @@ trait HistoryApi extends HistoryDBApi { //scalastyle:ignore @tailrec final def loopHeightDown(height: Int, p: ModifierId => Boolean): Option[Header] = headerIdsAtHeight(height) .find(p) .flatMap(getHeaderById) match { - case h@Some(_) => h + case h@Some(_) => h case None if height > settings.constants.GenesisHeight => loopHeightDown(height - 1, p) - case n@None => n + case n@None => n } - def requiredDifficultyAfter(parent: Header): Either[HistoryApiError, Difficulty] = { + final def requiredDifficultyAfter(parent: Header): Either[HistoryApiError, Difficulty] = { val requiredHeights: Seq[Height] = PowLinearController.getHeightsForRetargetingAt(Height @@ (parent.height + 1), settings.constants.EpochLength, settings.constants.RetargetingEpochsQty) for { @@ -203,16 +157,16 @@ trait HistoryApi extends HistoryDBApi { //scalastyle:ignore settings.constants.DesiredBlockInterval, settings.constants.InitialDifficulty) } - def syncInfo: SyncInfo = lastSyncInfo + final def syncInfo: SyncInfo = lastSyncInfo - def updateIdsForSyncInfo(): Unit = + final protected[view] def updateIdsForSyncInfo(): Unit = lastSyncInfo = SyncInfo(getBestHeader.map { header: Header => ((header.height - settings.network.maxInvObjects + 1) to header.height).flatMap { height: Int => headerIdsAtHeight(height).headOption }.toList }.getOrElse(List.empty)) - def compare(si: SyncInfo): HistoryComparisonResult = lastSyncInfo.lastHeaderIds.lastOption match { + final def compare(si: SyncInfo): HistoryComparisonResult = lastSyncInfo.lastHeaderIds.lastOption match { //Our best header is the same as other history best header case Some(id) if si.lastHeaderIds.lastOption.exists(_ sameElements id) => Equal //Our best header is in other history best chain, but not at the last position @@ -230,7 +184,7 @@ trait HistoryApi extends HistoryDBApi { //scalastyle:ignore case None => Older } - def continuationIds(info: SyncInfo, size: Int): Seq[ModifierId] = + final def continuationIds(info: SyncInfo, size: Int): Seq[ModifierId] = if (getBestHeaderId.isEmpty) info.startingPoints.map(_._2) else if (info.lastHeaderIds.isEmpty) { val heightFrom: Int = Math.min(getBestHeaderHeight, size - 1) @@ -257,8 +211,8 @@ trait HistoryApi extends HistoryDBApi { //scalastyle:ignore } } - def commonBlockThenSuffixes(header1: Header, - header2: Header): (HeaderChain, HeaderChain) = { + final def commonBlockThenSuffixes(header1: Header, + header2: Header): (HeaderChain, HeaderChain) = { val heightDelta: Int = Math.max(header1.height - header2.height, 0) @scala.annotation.tailrec @@ -284,7 +238,7 @@ trait HistoryApi extends HistoryDBApi { //scalastyle:ignore loop(2, HeaderChain(Seq(header2))) } - def getChainToHeader(fromHeaderOpt: Option[Header], + final def getChainToHeader(fromHeaderOpt: Option[Header], toHeader: Header): (Option[ModifierId], HeaderChain) = fromHeaderOpt match { case Some(h1) => val (prevChain, newChain) = commonBlockThenSuffixes(h1, toHeader) @@ -292,13 +246,9 @@ trait HistoryApi extends HistoryDBApi { //scalastyle:ignore case None => (None, headerChainBack(toHeader.height + 1, toHeader, _ => false)) } - def isNewHeader(header: Header): Boolean = - timeProvider.estimatedTime - header.timestamp < - settings.constants.DesiredBlockInterval.toMillis * settings.constants.NewHeaderTimeMultiplier - - def isHeadersChainSynced: Boolean = isHeadersChainSyncedVar + final def isHeadersChainSynced: Boolean = isHeadersChainSyncedVar - def continuationHeaderChains(header: Header, + final def continuationHeaderChains(header: Header, filterCond: Header => Boolean): Seq[Seq[Header]] = { @tailrec def loop(currentHeight: Int, acc: Seq[Seq[Header]]): Seq[Seq[Header]] = { val nextHeightHeaders: Seq[Header] = headerIdsAtHeight(currentHeight + 1) @@ -321,7 +271,7 @@ trait HistoryApi extends HistoryDBApi { //scalastyle:ignore loop(header.height, Seq(Seq(header))) } - def addHeaderToCacheIfNecessary(h: Header): Unit = + protected[history] final def addHeaderToCacheIfNecessary(h: Header): Unit = if (h.height >= getBestHeaderHeight - settings.constants.MaxRollbackDepth) { logger.debug(s"Should add ${Algos.encode(h.id)} to header cache") val newHeadersIdsAtHeaderHeight = headersCacheIndexes.getOrElse(h.height, Seq.empty[ModifierId]) :+ h.id @@ -340,7 +290,7 @@ trait HistoryApi extends HistoryDBApi { //scalastyle:ignore logger.debug(s"headersCacheIndexes size: ${headersCacheIndexes.size}") } - def addBlockToCacheIfNecessary(b: Block): Unit = + protected[history] final def addBlockToCacheIfNecessary(b: Block): Unit = if (!blocksCache.contains(ByteArrayWrapper(b.id)) && (b.header.height >= getBestBlockHeight - settings.constants.MaxRollbackDepth)) { logger.debug(s"Should add ${Algos.encode(b.id)} to block cache") val newBlocksIdsAtBlockHeight = blocksCacheIndexes.getOrElse(b.header.height, Seq.empty[ModifierId]) :+ b.id diff --git a/src/main/scala/encry/view/history/HistoryDBApi.scala b/src/main/scala/encry/view/history/HistoryDBApi.scala index 55427766a8..82fa6eb1f9 100644 --- a/src/main/scala/encry/view/history/HistoryDBApi.scala +++ b/src/main/scala/encry/view/history/HistoryDBApi.scala @@ -5,9 +5,9 @@ import com.typesafe.scalalogging.StrictLogging import encry.settings.EncryAppSettings import encry.storage.VersionalStorage.StorageKey import encry.view.history.storage.HistoryStorage -import org.encryfoundation.common.modifiers.history.{Block, Header, Payload} +import org.encryfoundation.common.modifiers.history.{ Block, Header, Payload } import org.encryfoundation.common.utils.Algos -import org.encryfoundation.common.utils.TaggedTypes.{Height, ModifierId, ModifierTypeId} +import org.encryfoundation.common.utils.TaggedTypes.{ Height, ModifierId, ModifierTypeId } import scorex.crypto.hash.Digest32 import scala.reflect.ClassTag @@ -16,81 +16,86 @@ trait HistoryDBApi extends StrictLogging { val settings: EncryAppSettings - val historyStorage: HistoryStorage + protected[view] val historyStorage: HistoryStorage - lazy val BestHeaderKey: StorageKey = + protected[history] final lazy val BestHeaderKey: StorageKey = StorageKey @@ Array.fill(settings.constants.DigestLength)(Header.modifierTypeId.untag(ModifierTypeId)) - lazy val BestBlockKey: StorageKey = + protected[history] final lazy val BestBlockKey: StorageKey = StorageKey @@ Array.fill(settings.constants.DigestLength)(-1: Byte) - private def getModifierById[T: ClassTag](id: ModifierId): Option[T] = historyStorage - .modifierById(id) - .collect { case m: T => m } + private def getModifierById[T: ClassTag](id: ModifierId): Option[T] = + historyStorage + .modifierById(id) + .collect { case m: T => m } - def getHeightByHeaderIdDB(id: ModifierId): Option[Int] = historyStorage - .get(headerHeightKey(id)) - .map(Ints.fromByteArray) + final def getHeightByHeaderIdDB(id: ModifierId): Option[Int] = + historyStorage + .get(headerHeightKey(id)) + .map(Ints.fromByteArray) - def getHeaderByIdDB(id: ModifierId): Option[Header] = getModifierById[Header](id) - def getPayloadByIdDB(pId: ModifierId): Option[Payload] = getModifierById[Payload](pId) - def getBlockByHeaderDB(header: Header): Option[Block] = getModifierById[Payload](header.payloadId) - .map(payload => Block(header, payload)) - def getBlockByHeaderIdDB(id: ModifierId): Option[Block] = getHeaderByIdDB(id) - .flatMap(h => getModifierById[Payload](h.payloadId).map(p => Block(h, p))) + final def getHeaderByIdDB(id: ModifierId): Option[Header] = getModifierById[Header](id) + final def getPayloadByIdDB(pId: ModifierId): Option[Payload] = getModifierById[Payload](pId) + final def getBlockByHeaderIdDB(id: ModifierId): Option[Block] = + getHeaderByIdDB(id) + .flatMap(h => getModifierById[Payload](h.payloadId).map(p => Block(h, p))) - def getBestHeaderId: Option[ModifierId] = historyStorage.get(BestHeaderKey).map(ModifierId @@ _) - def getBestHeaderDB: Option[Header] = getBestHeaderId.flatMap(getHeaderByIdDB) - def getBestHeaderHeightDB: Int = getBestHeaderId - .flatMap(getHeightByHeaderIdDB) - .getOrElse(settings.constants.PreGenesisHeight) + final def getBestHeaderId: Option[ModifierId] = + historyStorage.get(BestHeaderKey).map(ModifierId @@ _) - def getBestBlockId: Option[ModifierId] = historyStorage.get(BestBlockKey).map(ModifierId @@ _) - def getBestBlockDB: Option[Block] = getBestBlockId.flatMap(getBlockByHeaderIdDB) - def getBestBlockHeightDB: Int = getBestBlockId - .flatMap(getHeightByHeaderIdDB) - .getOrElse(settings.constants.PreGenesisHeight) + final def getBestBlockId: Option[ModifierId] = + historyStorage.get(BestBlockKey).map(ModifierId @@ _) - def modifierBytesByIdDB(id: ModifierId): Option[Array[Byte]] = historyStorage.modifiersBytesById(id) + final def modifierBytesByIdDB(id: ModifierId): Option[Array[Byte]] = + historyStorage.modifiersBytesById(id) - def isModifierDefined(id: ModifierId): Boolean = historyStorage.containsMod(id) + final def isModifierDefined(id: ModifierId): Boolean = historyStorage.containsMod(id) //todo probably rewrite with indexes collection - def lastBestBlockHeightRelevantToBestChain(probablyAt: Int): Option[Int] = (for { - headerId <- getBestHeaderIdAtHeightDB(probablyAt) - header <- getHeaderByIdDB(headerId) if isModifierDefined(header.payloadId) - } yield header.height).orElse(lastBestBlockHeightRelevantToBestChain(probablyAt - 1)) - - def headerIdsAtHeightDB(height: Int): Option[Seq[ModifierId]] = historyStorage - .get(heightIdsKey(height)) - .map(_.grouped(32).map(ModifierId @@ _).toSeq) - - def getBestHeaderIdAtHeightDB(h: Int): Option[ModifierId] = headerIdsAtHeightDB(h).flatMap(_.headOption) - - def getBestHeaderAtHeightDB(h: Int): Option[Header] = getBestHeaderIdAtHeightDB(h).flatMap(getHeaderByIdDB) - - def isInBestChain(h: Header): Boolean = getBestHeaderIdAtHeightDB(h.height) - .exists(_.sameElements(h.id)) - - def isInBestChain(id: ModifierId): Boolean = heightOf(id) - .flatMap(getBestHeaderIdAtHeightDB) - .exists(_.sameElements(id)) - - def getBestHeadersChainScore: BigInt = getBestHeaderId.flatMap(scoreOf).getOrElse(BigInt(0)) //todo ?.getOrElse(BigInt(0))? - - def scoreOf(id: ModifierId): Option[BigInt] = historyStorage - .get(headerScoreKey(id)) - .map(d => BigInt(d)) - - def heightOf(id: ModifierId): Option[Height] = historyStorage - .get(headerHeightKey(id)) - .map(d => Height @@ Ints.fromByteArray(d)) - - def heightIdsKey(height: Int): StorageKey = + final def lastBestBlockHeightRelevantToBestChain(probablyAt: Int): Option[Int] = + (for { + headerId <- getBestHeaderIdAtHeightDB(probablyAt) + header <- getHeaderByIdDB(headerId) if isModifierDefined(header.payloadId) + } yield header.height).orElse(lastBestBlockHeightRelevantToBestChain(probablyAt - 1)) + + final def headerIdsAtHeightDB(height: Int): Option[Seq[ModifierId]] = + historyStorage + .get(heightIdsKey(height)) + .map(_.grouped(32).map(ModifierId @@ _).toSeq) + + final def getBestHeaderIdAtHeightDB(h: Int): Option[ModifierId] = + headerIdsAtHeightDB(h).flatMap(_.headOption) + + final def getBestHeaderAtHeightDB(h: Int): Option[Header] = + getBestHeaderIdAtHeightDB(h).flatMap(getHeaderByIdDB) + + final def isInBestChain(h: Header): Boolean = + getBestHeaderIdAtHeightDB(h.height) + .exists(_.sameElements(h.id)) + + final def isInBestChain(id: ModifierId): Boolean = + heightOf(id) + .flatMap(getBestHeaderIdAtHeightDB) + .exists(_.sameElements(id)) + + final def getBestHeadersChainScore: BigInt = + getBestHeaderId.flatMap(scoreOf).getOrElse(BigInt(0)) //todo ?.getOrElse(BigInt(0))? + + final def scoreOf(id: ModifierId): Option[BigInt] = + historyStorage + .get(headerScoreKey(id)) + .map(d => BigInt(d)) + + final def heightOf(id: ModifierId): Option[Height] = + historyStorage + .get(headerHeightKey(id)) + .map(d => Height @@ Ints.fromByteArray(d)) + + final def heightIdsKey(height: Int): StorageKey = StorageKey @@ Algos.hash(Ints.toByteArray(height)).untag(Digest32) - def headerScoreKey(id: ModifierId): StorageKey = + final def headerScoreKey(id: ModifierId): StorageKey = StorageKey @@ Algos.hash("score".getBytes(Algos.charset) ++ id).untag(Digest32) - def headerHeightKey(id: ModifierId): StorageKey = + final def headerHeightKey(id: ModifierId): StorageKey = StorageKey @@ Algos.hash("height".getBytes(Algos.charset) ++ id).untag(Digest32) - def validityKey(id: Array[Byte]): StorageKey = + final def validityKey(id: Array[Byte]): StorageKey = StorageKey @@ Algos.hash("validity".getBytes(Algos.charset) ++ id).untag(Digest32) -} \ No newline at end of file +} diff --git a/src/main/scala/encry/view/history/HistoryHeadersProcessor.scala b/src/main/scala/encry/view/history/HistoryHeadersProcessor.scala deleted file mode 100644 index b8e414cce8..0000000000 --- a/src/main/scala/encry/view/history/HistoryHeadersProcessor.scala +++ /dev/null @@ -1,80 +0,0 @@ -package encry.view.history - -import cats.syntax.option.none -import com.google.common.primitives.Ints -import encry.EncryApp.forceStopApplication -import encry.consensus.ConsensusSchemeReaders -import encry.consensus.HistoryConsensus.ProgressInfo -import encry.storage.VersionalStorage.{ StorageKey, StorageValue } -import org.encryfoundation.common.modifiers.history.Header -import org.encryfoundation.common.utils.TaggedTypes.{ Difficulty, ModifierId } - -trait HistoryHeadersProcessor extends HistoryApi { - - def processHeader(h: Header): ProgressInfo = getHeaderInfoUpdate(h) match { - case dataToUpdate: Seq[_] if dataToUpdate.nonEmpty => - historyStorage.bulkInsert(h.id, dataToUpdate, Seq(h)) - getBestHeaderId match { - case Some(bestHeaderId) => - ProgressInfo(none, Seq.empty, if (!bestHeaderId.sameElements(h.id)) Seq.empty else Seq(h), toDownload(h)) - case _ => - forceStopApplication(errorMessage = "Should always have best header after header application") - } - case _ => ProgressInfo(none, Seq.empty, Seq.empty, none) - } - - private def getHeaderInfoUpdate(header: Header): Seq[(StorageKey, StorageValue)] = { - addHeaderToCacheIfNecessary(header) - if (header.isGenesis) { - logger.info(s"Initialize header chain with genesis header ${header.encodedId}") - Seq( - BestHeaderKey -> StorageValue @@ header.id, - heightIdsKey(settings.constants.GenesisHeight) -> StorageValue @@ header.id, - headerHeightKey(header.id) -> StorageValue @@ Ints.toByteArray(settings.constants.GenesisHeight), - headerScoreKey(header.id) -> StorageValue @@ header.difficulty.toByteArray - ) - } else - scoreOf(header.parentId).map { parentScore => - logger.info(s"getHeaderInfoUpdate for header $header") - val score: Difficulty = - Difficulty @@ (parentScore + ConsensusSchemeReaders.consensusScheme.realDifficulty(header)) - val bestHeaderHeight: Int = getBestHeaderHeight - val bestHeadersChainScore: BigInt = getBestHeadersChainScore - val bestRow: Seq[(StorageKey, StorageValue)] = - if ((header.height > bestHeaderHeight) || (header.height == bestHeaderHeight && score > bestHeadersChainScore)) - Seq(BestHeaderKey -> StorageValue @@ header.id.untag(ModifierId)) - else Seq.empty - val scoreRow: (StorageKey, StorageValue) = headerScoreKey(header.id) -> StorageValue @@ score.toByteArray - val heightRow: (StorageKey, StorageValue) = - headerHeightKey(header.id) -> StorageValue @@ Ints.toByteArray(header.height) - val headerIdsRow: Seq[(StorageKey, StorageValue)] = - if ((header.height > bestHeaderHeight) || (header.height == bestHeaderHeight && score > bestHeadersChainScore)) - bestBlockHeaderIdsRow(header, score) - else orphanedBlockHeaderIdsRow(header, score) - Seq(scoreRow, heightRow) ++ bestRow ++ headerIdsRow - }.getOrElse(Seq.empty) - } - - private def bestBlockHeaderIdsRow(h: Header, score: Difficulty): Seq[(StorageKey, StorageValue)] = { - logger.info(s"New best header ${h.encodedId} with score: $score at height ${h.height}") - val self: (StorageKey, StorageValue) = - heightIdsKey(h.height) -> - StorageValue @@ (Seq(h.id) ++ headerIdsAtHeight(h.height).filterNot(_ sameElements h.id)).flatten.toArray - val forkHeaders: Seq[(StorageKey, StorageValue)] = getHeaderById(h.parentId).toList.view - .flatMap(headerChainBack(h.height, _, h => isInBestChain(h)).headers) - .filterNot(isInBestChain) - .map( - header => - heightIdsKey(header.height) -> - StorageValue @@ (Seq(header.id) ++ - headerIdsAtHeight(header.height).filterNot(_ sameElements header.id)).flatten.toArray - ) - .toList - forkHeaders :+ self - } - - private def orphanedBlockHeaderIdsRow(h: Header, score: Difficulty): Seq[(StorageKey, StorageValue)] = { - logger.info(s"New orphaned header ${h.encodedId} at height ${h.height} with score $score") - Seq(heightIdsKey(h.height) -> StorageValue @@ (headerIdsAtHeight(h.height) :+ h.id).flatten.toArray) - } -} diff --git a/src/main/scala/encry/view/history/HistoryModifiersValidator.scala b/src/main/scala/encry/view/history/HistoryModifiersValidator.scala index 0741251609..2e1a76182f 100644 --- a/src/main/scala/encry/view/history/HistoryModifiersValidator.scala +++ b/src/main/scala/encry/view/history/HistoryModifiersValidator.scala @@ -11,10 +11,10 @@ import org.encryfoundation.common.validation.ModifierSemanticValidity trait HistoryModifiersValidator extends HistoryApi { - lazy val powScheme: EquihashPowScheme = EquihashPowScheme(settings.constants.n, settings.constants.k, settings.constants.Version, + private lazy val powScheme: EquihashPowScheme = EquihashPowScheme(settings.constants.n, settings.constants.k, settings.constants.Version, settings.constants.PreGenesisHeight, settings.constants.MaxTarget) - def testApplicable(modifier: PersistentModifier): Either[ValidationError, PersistentModifier] = + final def testApplicable(modifier: PersistentModifier): Either[ValidationError, PersistentModifier] = (modifier match { case header: Header => validateHeader(header) case payload: Payload => validatePayload(payload) @@ -43,7 +43,7 @@ trait HistoryModifiersValidator extends HistoryApi { case None if isModifierDefined(modifierId) => ModifierSemanticValidity.Unknown case None => ModifierSemanticValidity.Absent case mod => logger.error(s"Incorrect validity status: $mod") - ModifierSemanticValidity.Absent + ModifierSemanticValidity.Absent } private def genesisBlockHeaderValidator(h: Header): Either[ValidationError, Header] = for { diff --git a/src/main/scala/encry/view/history/HistoryPayloadsProcessor.scala b/src/main/scala/encry/view/history/HistoryPayloadsProcessor.scala deleted file mode 100644 index ecdc4be798..0000000000 --- a/src/main/scala/encry/view/history/HistoryPayloadsProcessor.scala +++ /dev/null @@ -1,138 +0,0 @@ -package encry.view.history - -import cats.syntax.either._ -import cats.syntax.option._ -import encry.consensus.HistoryConsensus.ProgressInfo -import encry.modifiers.history.HeaderChain -import org.encryfoundation.common.modifiers.PersistentModifier -import org.encryfoundation.common.modifiers.history.{ Block, Header, Payload } -import org.encryfoundation.common.utils.TaggedTypes.{ Height, ModifierId } - -trait HistoryPayloadsProcessor extends HistoryApi { - - def processPayload(payload: Payload): ProgressInfo = - getBlockByPayload(payload).flatMap { block => - logger.info(s"proc block ${block.header.encodedId}!") - processBlock(block).some - }.getOrElse(putToHistory(payload)) - - private def processBlock(blockToProcess: Block): ProgressInfo = { - logger.info( - s"Starting processing block to history ||${blockToProcess.encodedId}||${blockToProcess.header.height}||" - ) - val bestFullChain: Seq[Block] = calculateBestFullChain(blockToProcess) - addBlockToCacheIfNecessary(blockToProcess) - bestFullChain.lastOption.map(_.header) match { - case Some(header) if isValidFirstBlock(blockToProcess.header) => - processValidFirstBlock(blockToProcess, header, bestFullChain) - case Some(header) if isBestBlockDefined && isBetterChain(header.id) => - processBetterChain(blockToProcess, header, Seq.empty, settings.node.blocksToKeep) - case Some(_) => - nonBestBlock(blockToProcess) - case None => - logger.debug(s"Best full chain is empty. Returning empty progress info") - ProgressInfo(none, Seq.empty, Seq.empty, none) - } - } - - private def processValidFirstBlock(fullBlock: Block, - newBestHeader: Header, - newBestChain: Seq[Block]): ProgressInfo = { - logger.info(s"Appending ${fullBlock.encodedId} as a valid first block with height ${fullBlock.header.height}") - updateStorage(fullBlock.payload, newBestHeader.id) - ProgressInfo(none, Seq.empty, newBestChain, none) - } - - private def processBetterChain(fullBlock: Block, - newBestHeader: Header, - newBestChain: Seq[Block], - blocksToKeep: Int): ProgressInfo = - getHeaderOfBestBlock.map { header => - val (prevChain: HeaderChain, newChain: HeaderChain) = commonBlockThenSuffixes(header, newBestHeader) - val toRemove: Seq[Block] = prevChain.tail.headers - .flatMap(getBlockByHeader) - val toApply: Seq[Block] = newChain.tail.headers - .flatMap(h => if (h == fullBlock.header) fullBlock.some else getBlockByHeader(h)) - toApply.foreach(addBlockToCacheIfNecessary) - if (toApply.lengthCompare(newChain.length - 1) != 0) nonBestBlock(fullBlock) - else { - //application of this block leads to full chain with higher score - logger.info(s"Appending ${fullBlock.encodedId}|${fullBlock.header.height} as a better chain") - val branchPoint: Option[ModifierId] = toRemove.headOption.map(_ => prevChain.head.id) - val bestHeaderHeight: Int = getBestHeaderHeight - val updateBestHeader: Boolean = - (fullBlock.header.height > bestHeaderHeight) || ( - (fullBlock.header.height == bestHeaderHeight) && - scoreOf(fullBlock.id) - .flatMap(fbScore => getBestHeaderId.flatMap(id => scoreOf(id).map(_ < fbScore))) - .getOrElse(false) - ) - val updatedHeadersAtHeightIds = - newChain.headers.map(header => updatedBestHeaderAtHeightRaw(header.id, Height @@ header.height)).toList - updateStorage(fullBlock.payload, newBestHeader.id, updateBestHeader, updatedHeadersAtHeightIds) - if (blocksToKeep >= 0) { - val lastKept: Int = blockDownloadProcessor.updateBestBlock(fullBlock.header) - val bestHeight: Int = toApply.lastOption.map(_.header.height).getOrElse(0) - val diff: Int = bestHeight - header.height - clipBlockDataAt(((lastKept - diff) until lastKept).filter(_ >= 0)) - } - ProgressInfo(branchPoint, toRemove, toApply, none) - } - }.getOrElse(ProgressInfo(none, Seq.empty, Seq.empty, none)) - - private def nonBestBlock(fullBlock: Block): ProgressInfo = { - //Orphaned block or full chain is not initialized yet - logger.info(s"Process block to history ${fullBlock.encodedId}||${fullBlock.header.height}||") - historyStorage.bulkInsert(fullBlock.payload.id, Seq.empty, Seq(fullBlock.payload)) - ProgressInfo(none, Seq.empty, Seq.empty, none) - } - - private def updatedBestHeaderAtHeightRaw(headerId: ModifierId, height: Height): (Array[Byte], Array[Byte]) = - heightIdsKey(height) -> - (Seq(headerId) ++ - headerIdsAtHeight(height).filterNot(_ sameElements headerId)).flatten.toArray - - private def putToHistory(payload: Payload): ProgressInfo = { - historyStorage.insertObjects(Seq(payload)) - ProgressInfo(none, Seq.empty, Seq.empty, none) - } - - private def isBetterChain(id: ModifierId): Boolean = - (for { - bestFullBlockId <- getBestBlockId - heightOfThisHeader <- getHeightByHeaderId(id) - prevBestScore <- scoreOf(bestFullBlockId) - score <- scoreOf(id) - bestBlockHeight = getBestBlockHeight - } yield (bestBlockHeight < heightOfThisHeader) || (bestBlockHeight == heightOfThisHeader && score > prevBestScore)) - .getOrElse(false) - - private def calculateBestFullChain(block: Block): Seq[Block] = { - val continuations: Seq[Seq[Header]] = continuationHeaderChains(block.header, h => isBlockDefined(h)).map(_.tail) - logger.debug(s"continuations: ${continuations.map(seq => s"Seq contains: ${seq.length}").mkString(",")}") - val chains: Seq[Seq[Block]] = continuations.map(_.filter(isBlockDefined).flatMap(getBlockByHeader)) - logger.debug(s"Chains: ${chains.map(chain => s"chain contain: ${chain.length}").mkString(",")}") - chains.map(c => block +: c).maxBy(c => scoreOf(c.last.id).get) - } - - private def clipBlockDataAt(heights: Seq[Int]): Either[Throwable, Unit] = Either.catchNonFatal { - val toRemove: Seq[ModifierId] = heights - .flatMap(headerIdsAtHeight) - .flatMap(getHeaderById) - .map(_.payloadId) - historyStorage.removeObjects(toRemove) - } - - private def updateStorage(newModRow: PersistentModifier, - bestFullHeaderId: ModifierId, - updateHeaderInfo: Boolean = false, - additionalIndexes: List[(Array[Byte], Array[Byte])] = List.empty): Unit = { - val indicesToInsert: Seq[(Array[Byte], Array[Byte])] = - if (updateHeaderInfo) Seq(BestBlockKey -> bestFullHeaderId, BestHeaderKey -> bestFullHeaderId) - else Seq(BestBlockKey -> bestFullHeaderId) - historyStorage.bulkInsert(newModRow.id, indicesToInsert ++ additionalIndexes, Seq(newModRow)) - } - - private def isValidFirstBlock(header: Header): Boolean = - header.height == blockDownloadProcessor.minimalBlockHeight && getBestBlockId.isEmpty -} diff --git a/src/main/scala/encry/view/history/processors/HeaderFullChainProcessorComponent.scala b/src/main/scala/encry/view/history/processors/HeaderFullChainProcessorComponent.scala new file mode 100644 index 0000000000..3fe4842998 --- /dev/null +++ b/src/main/scala/encry/view/history/processors/HeaderFullChainProcessorComponent.scala @@ -0,0 +1,99 @@ +package encry.view.history.processors + +import cats.syntax.option._ +import com.google.common.primitives.Ints +import encry.EncryApp.forceStopApplication +import encry.consensus.HistoryConsensus.ProgressInfo +import encry.consensus.{ ConsensusSchemeReaders, HistoryConsensus } +import encry.storage.VersionalStorage.{ StorageKey, StorageValue } +import org.encryfoundation.common.modifiers.history.{ Header, Payload } +import org.encryfoundation.common.utils.TaggedTypes.{ Difficulty, ModifierId, ModifierTypeId } + +trait HeaderFullChainProcessorComponent extends HistoryHeaderProcessorComponent { + + override val headerProcessor: HeaderProcessor = new HeaderDefaultProcessor + + class HeaderDefaultProcessor extends HeaderProcessor { + override def processHeader(h: Header): HistoryConsensus.ProgressInfo = getHeaderInfoUpdate(h) match { + case dataToUpdate: Seq[_] if dataToUpdate.nonEmpty => + historyStorage.bulkInsert(h.id, dataToUpdate, Seq(h)) + getBestHeaderId match { + case Some(bestHeaderId) => + ProgressInfo(none, Seq.empty, if (!bestHeaderId.sameElements(h.id)) Seq.empty else Seq(h), toDownload(h)) + case _ => + forceStopApplication(errorMessage = "Should always have best header after header application") + } + case _ => ProgressInfo(none, Seq.empty, Seq.empty, none) + } + + private def getHeaderInfoUpdate(header: Header): Seq[(StorageKey, StorageValue)] = { + addHeaderToCacheIfNecessary(header) + if (header.isGenesis) { + logger.info(s"Initialize header chain with genesis header ${header.encodedId}") + Seq( + BestHeaderKey -> StorageValue @@ header.id, + heightIdsKey(settings.constants.GenesisHeight) -> StorageValue @@ header.id, + headerHeightKey(header.id) -> StorageValue @@ Ints.toByteArray(settings.constants.GenesisHeight), + headerScoreKey(header.id) -> StorageValue @@ header.difficulty.toByteArray + ) + } else + scoreOf(header.parentId).map { parentScore => + logger.info(s"getHeaderInfoUpdate for header $header") + val score: Difficulty = + Difficulty @@ (parentScore + ConsensusSchemeReaders.consensusScheme.realDifficulty(header)) + val bestHeaderHeight: Int = getBestHeaderHeight + val bestHeadersChainScore: BigInt = getBestHeadersChainScore + val bestRow: Seq[(StorageKey, StorageValue)] = + if ((header.height > bestHeaderHeight) || (header.height == bestHeaderHeight && score > bestHeadersChainScore)) + Seq(BestHeaderKey -> StorageValue @@ header.id.untag(ModifierId)) + else Seq.empty + val scoreRow: (StorageKey, StorageValue) = headerScoreKey(header.id) -> StorageValue @@ score.toByteArray + val heightRow: (StorageKey, StorageValue) = + headerHeightKey(header.id) -> StorageValue @@ Ints.toByteArray(header.height) + val headerIdsRow: Seq[(StorageKey, StorageValue)] = + if ((header.height > bestHeaderHeight) || (header.height == bestHeaderHeight && score > bestHeadersChainScore)) + bestBlockHeaderIdsRow(header, score) + else orphanedBlockHeaderIdsRow(header, score) + Seq(scoreRow, heightRow) ++ bestRow ++ headerIdsRow + }.getOrElse(Seq.empty) + } + + private def bestBlockHeaderIdsRow(h: Header, score: Difficulty): Seq[(StorageKey, StorageValue)] = { + logger.info(s"New best header ${h.encodedId} with score: $score at height ${h.height}") + val self: (StorageKey, StorageValue) = + heightIdsKey(h.height) -> + StorageValue @@ (Seq(h.id) ++ headerIdsAtHeight(h.height).filterNot(_ sameElements h.id)).flatten.toArray + val forkHeaders: Seq[(StorageKey, StorageValue)] = getHeaderById(h.parentId).toList.view + .flatMap(headerChainBack(h.height, _, h => isInBestChain(h)).headers) + .filterNot(isInBestChain) + .map( + header => + heightIdsKey(header.height) -> + StorageValue @@ (Seq(header.id) ++ + headerIdsAtHeight(header.height).filterNot(_ sameElements header.id)).flatten.toArray + ) + .toList + forkHeaders :+ self + } + + private def orphanedBlockHeaderIdsRow(h: Header, score: Difficulty): Seq[(StorageKey, StorageValue)] = { + logger.info(s"New orphaned header ${h.encodedId} at height ${h.height} with score $score") + Seq(heightIdsKey(h.height) -> StorageValue @@ (headerIdsAtHeight(h.height) :+ h.id).flatten.toArray) + } + + private def toDownload(header: Header): Option[(ModifierTypeId, ModifierId)] = + // Already synced and header is not too far back. Download required modifiers + if (header.height >= blockDownloadProcessor.minimalBlockHeight) (Payload.modifierTypeId -> header.payloadId).some + // Headers chain is synced after this header. Start downloading full blocks + else if (!isHeadersChainSynced && isNewHeader(header)) { + isHeadersChainSyncedVar = true + blockDownloadProcessor.updateBestBlock(header) + none + } else none + + private def isNewHeader(header: Header): Boolean = + timeProvider.estimatedTime - header.timestamp < + settings.constants.DesiredBlockInterval.toMillis * settings.constants.NewHeaderTimeMultiplier + } + +} diff --git a/src/main/scala/encry/view/history/processors/HistoryHeaderProcessorComponent.scala b/src/main/scala/encry/view/history/processors/HistoryHeaderProcessorComponent.scala new file mode 100644 index 0000000000..a6bd911d48 --- /dev/null +++ b/src/main/scala/encry/view/history/processors/HistoryHeaderProcessorComponent.scala @@ -0,0 +1,14 @@ +package encry.view.history.processors + +import encry.consensus.HistoryConsensus.ProgressInfo +import encry.view.history.HistoryApi +import org.encryfoundation.common.modifiers.history.Header + +trait HistoryHeaderProcessorComponent extends HistoryApi { + + val headerProcessor: HeaderProcessor + + trait HeaderProcessor { + def processHeader(h: Header): ProgressInfo + } +} diff --git a/src/main/scala/encry/view/history/processors/HistoryPayloadProcessorComponent.scala b/src/main/scala/encry/view/history/processors/HistoryPayloadProcessorComponent.scala new file mode 100644 index 0000000000..a3db102fc0 --- /dev/null +++ b/src/main/scala/encry/view/history/processors/HistoryPayloadProcessorComponent.scala @@ -0,0 +1,20 @@ +package encry.view.history.processors + +import encry.consensus.HistoryConsensus.ProgressInfo +import encry.view.history.HistoryApi +import org.encryfoundation.common.modifiers.history.Payload +import org.encryfoundation.common.utils.TaggedTypes.ModifierId +import scala.collection.immutable.HashSet + +trait HistoryPayloadProcessorComponent + extends HistoryApi +{ + + val payloadProcessor: PayloadProcessor + + def payloadsIdsToDownload(howMany: Int, excluding: HashSet[ModifierId]): Seq[ModifierId] + + trait PayloadProcessor { + def processPayload(payload: Payload): ProgressInfo + } +} diff --git a/src/main/scala/encry/view/history/processors/PayloadFastSyncProcessorComponent.scala b/src/main/scala/encry/view/history/processors/PayloadFastSyncProcessorComponent.scala new file mode 100644 index 0000000000..bf38c6ff69 --- /dev/null +++ b/src/main/scala/encry/view/history/processors/PayloadFastSyncProcessorComponent.scala @@ -0,0 +1,67 @@ +package encry.view.history.processors + +import cats.syntax.option.none +import encry.consensus.HistoryConsensus +import encry.consensus.HistoryConsensus.ProgressInfo +import encry.storage.VersionalStorage.{ StorageKey, StorageValue, StorageVersion } +import org.encryfoundation.common.modifiers.history.Payload +import org.encryfoundation.common.utils.TaggedTypes.ModifierId +import scala.annotation.tailrec +import scala.collection.immutable.HashSet + +trait PayloadFastSyncProcessorComponent extends HistoryPayloadProcessorComponent { + + override val payloadProcessor = new PayloadFastSyncProcessor + + def payloadsIdsToDownload(howMany: Int, excluding: HashSet[ModifierId]): Seq[ModifierId] = { + @tailrec def continuation(height: Int, acc: Seq[ModifierId]): Seq[ModifierId] = + if (acc.lengthCompare(howMany) >= 0) acc + else if (height > lastAvailableManifestHeight && fastSyncInProgress.fastSyncVal) acc + else + getBestHeaderIdAtHeight(height).flatMap(getHeaderById) match { + case Some(h) if !excluding.exists(_.sameElements(h.payloadId)) && !isBlockDefined(h) => + continuation(height + 1, acc :+ h.payloadId) + case Some(_) => + continuation(height + 1, acc) + case None => + acc + } + + (for { + bestBlockId <- getBestBlockId + headerLinkedToBestBlock <- getHeaderById(bestBlockId) + } yield headerLinkedToBestBlock) match { + case _ if !isHeadersChainSynced => + Seq.empty + case Some(header) if isInBestChain(header) => + continuation(header.height + 1, Seq.empty) + case Some(header) => + lastBestBlockHeightRelevantToBestChain(header.height) + .map(height => continuation(height + 1, Seq.empty)) + .getOrElse(continuation(blockDownloadProcessor.minimalBlockHeightVar, Seq.empty)) + case None => + continuation(blockDownloadProcessor.minimalBlockHeightVar, Seq.empty) + } + } + + class PayloadFastSyncProcessor extends PayloadProcessor { + override def processPayload(payload: Payload): HistoryConsensus.ProgressInfo = { + val startTime: Long = System.currentTimeMillis() + getBlockByPayload(payload).foreach { block => + logger.info(s"processPayloadFastSync") + historyStorage.bulkInsert(payload.id, Seq(BestBlockKey -> payload.headerId), Seq(payload)) + blockDownloadProcessor.updateBestBlock(block.header) + logger.info(s"BlockDownloadProcessor updated block at height ${block.header.height} successfully") + historyStorage.insert( + StorageVersion @@ validityKey(block.payload.id).untag(StorageKey), + List(block.header.id, block.payload.id).map(id => validityKey(id) -> StorageValue @@ Array(1.toByte)) + ) + logger.info( + s"Finished processing block ${block.encodedId}. " + + s"Processing time is ${(System.currentTimeMillis() - startTime) / 1000} s" + ) + } + ProgressInfo(none, Seq.empty, Seq.empty, none) + } + } +} diff --git a/src/main/scala/encry/view/history/processors/PayloadFullChainProcessorComponent.scala b/src/main/scala/encry/view/history/processors/PayloadFullChainProcessorComponent.scala new file mode 100644 index 0000000000..c63ea48103 --- /dev/null +++ b/src/main/scala/encry/view/history/processors/PayloadFullChainProcessorComponent.scala @@ -0,0 +1,176 @@ +package encry.view.history.processors + +import cats.syntax.option._ +import cats.syntax.either._ +import encry.consensus.HistoryConsensus +import encry.consensus.HistoryConsensus.ProgressInfo +import encry.modifiers.history.HeaderChain +import org.encryfoundation.common.modifiers.PersistentModifier +import org.encryfoundation.common.modifiers.history.{ Block, Header, Payload } +import org.encryfoundation.common.utils.TaggedTypes.{ Height, ModifierId } +import scala.annotation.tailrec +import scala.collection.immutable.HashSet + +trait PayloadFullChainProcessorComponent extends HistoryPayloadProcessorComponent { + + override val payloadProcessor: PayloadProcessor = new PayloadNormalProcessor + + def payloadsIdsToDownload(howMany: Int, excluding: HashSet[ModifierId]): Seq[ModifierId] = { + @tailrec def continuation(height: Int, acc: Seq[ModifierId]): Seq[ModifierId] = + if (acc.lengthCompare(howMany) >= 0) acc + else + getBestHeaderIdAtHeight(height).flatMap(getHeaderById) match { + case Some(h) if !excluding.exists(_.sameElements(h.payloadId)) && !isBlockDefined(h) => + continuation(height + 1, acc :+ h.payloadId) + case Some(_) => + continuation(height + 1, acc) + case None => + acc + } + + (for { + bestBlockId <- getBestBlockId + headerLinkedToBestBlock <- getHeaderById(bestBlockId) + } yield headerLinkedToBestBlock) match { + case _ if !isHeadersChainSynced => + Seq.empty + case Some(header) if isInBestChain(header) => + continuation(header.height + 1, Seq.empty) + case Some(header) => + lastBestBlockHeightRelevantToBestChain(header.height) + .map(height => continuation(height + 1, Seq.empty)) + .getOrElse(continuation(blockDownloadProcessor.minimalBlockHeightVar, Seq.empty)) + case None => + continuation(blockDownloadProcessor.minimalBlockHeightVar, Seq.empty) + } + } + + class PayloadNormalProcessor extends PayloadProcessor { + override def processPayload(payload: Payload): HistoryConsensus.ProgressInfo = + getBlockByPayload(payload).flatMap { block => + logger.info(s"proc block ${block.header.encodedId}!") + processBlock(block).some + }.getOrElse(putToHistory(payload)) + + private def processBlock(blockToProcess: Block): ProgressInfo = { + logger.info( + s"Starting processing block to history ||${blockToProcess.encodedId}||${blockToProcess.header.height}||" + ) + val bestFullChain: Seq[Block] = calculateBestFullChain(blockToProcess) + addBlockToCacheIfNecessary(blockToProcess) + bestFullChain.lastOption.map(_.header) match { + case Some(header) if isValidFirstBlock(blockToProcess.header) => + processValidFirstBlock(blockToProcess, header, bestFullChain) + case Some(header) if isBestBlockDefined && isBetterChain(header.id) => + processBetterChain(blockToProcess, header, Seq.empty, settings.node.blocksToKeep) + case Some(_) => + nonBestBlock(blockToProcess) + case None => + logger.debug(s"Best full chain is empty. Returning empty progress info") + ProgressInfo(none, Seq.empty, Seq.empty, none) + } + } + + private def processValidFirstBlock(fullBlock: Block, + newBestHeader: Header, + newBestChain: Seq[Block]): ProgressInfo = { + logger.info(s"Appending ${fullBlock.encodedId} as a valid first block with height ${fullBlock.header.height}") + updateStorage(fullBlock.payload, newBestHeader.id) + ProgressInfo(none, Seq.empty, newBestChain, none) + } + + private def processBetterChain(fullBlock: Block, + newBestHeader: Header, + newBestChain: Seq[Block], + blocksToKeep: Int): ProgressInfo = + getHeaderOfBestBlock.map { header => + val (prevChain: HeaderChain, newChain: HeaderChain) = commonBlockThenSuffixes(header, newBestHeader) + val toRemove: Seq[Block] = prevChain.tail.headers + .flatMap(getBlockByHeader) + val toApply: Seq[Block] = newChain.tail.headers + .flatMap(h => if (h == fullBlock.header) fullBlock.some else getBlockByHeader(h)) + toApply.foreach(addBlockToCacheIfNecessary) + if (toApply.lengthCompare(newChain.length - 1) != 0) nonBestBlock(fullBlock) + else { + //application of this block leads to full chain with higher score + logger.info(s"Appending ${fullBlock.encodedId}|${fullBlock.header.height} as a better chain") + val branchPoint: Option[ModifierId] = toRemove.headOption.map(_ => prevChain.head.id) + val bestHeaderHeight: Int = getBestHeaderHeight + val updateBestHeader: Boolean = + (fullBlock.header.height > bestHeaderHeight) || ( + (fullBlock.header.height == bestHeaderHeight) && + scoreOf(fullBlock.id) + .flatMap(fbScore => getBestHeaderId.flatMap(id => scoreOf(id).map(_ < fbScore))) + .getOrElse(false) + ) + val updatedHeadersAtHeightIds = + newChain.headers.map(header => updatedBestHeaderAtHeightRaw(header.id, Height @@ header.height)).toList + updateStorage(fullBlock.payload, newBestHeader.id, updateBestHeader, updatedHeadersAtHeightIds) + if (blocksToKeep >= 0) { + val lastKept: Int = blockDownloadProcessor.updateBestBlock(fullBlock.header) + val bestHeight: Int = toApply.lastOption.map(_.header.height).getOrElse(0) + val diff: Int = bestHeight - header.height + clipBlockDataAt(((lastKept - diff) until lastKept).filter(_ >= 0)) + } + ProgressInfo(branchPoint, toRemove, toApply, none) + } + }.getOrElse(ProgressInfo(none, Seq.empty, Seq.empty, none)) + + private def nonBestBlock(fullBlock: Block): ProgressInfo = { + //Orphaned block or full chain is not initialized yet + logger.info(s"Process block to history ${fullBlock.encodedId}||${fullBlock.header.height}||") + historyStorage.bulkInsert(fullBlock.payload.id, Seq.empty, Seq(fullBlock.payload)) + ProgressInfo(none, Seq.empty, Seq.empty, none) + } + + private def updatedBestHeaderAtHeightRaw(headerId: ModifierId, height: Height): (Array[Byte], Array[Byte]) = + heightIdsKey(height) -> + (Seq(headerId) ++ + headerIdsAtHeight(height).filterNot(_ sameElements headerId)).flatten.toArray + + private def putToHistory(payload: Payload): ProgressInfo = { + historyStorage.insertObjects(Seq(payload)) + ProgressInfo(none, Seq.empty, Seq.empty, none) + } + + private def isBetterChain(id: ModifierId): Boolean = + (for { + bestFullBlockId <- getBestBlockId + heightOfThisHeader <- getHeightByHeaderId(id) + prevBestScore <- scoreOf(bestFullBlockId) + score <- scoreOf(id) + bestBlockHeight = getBestBlockHeight + } yield + (bestBlockHeight < heightOfThisHeader) || (bestBlockHeight == heightOfThisHeader && score > prevBestScore)) + .getOrElse(false) + + private def calculateBestFullChain(block: Block): Seq[Block] = { + val continuations: Seq[Seq[Header]] = continuationHeaderChains(block.header, h => isBlockDefined(h)).map(_.tail) + logger.debug(s"continuations: ${continuations.map(seq => s"Seq contains: ${seq.length}").mkString(",")}") + val chains: Seq[Seq[Block]] = continuations.map(_.filter(isBlockDefined).flatMap(getBlockByHeader)) + logger.debug(s"Chains: ${chains.map(chain => s"chain contain: ${chain.length}").mkString(",")}") + chains.map(c => block +: c).maxBy(c => scoreOf(c.last.id).get) + } + + private def clipBlockDataAt(heights: Seq[Int]): Either[Throwable, Unit] = Either.catchNonFatal { + val toRemove: Seq[ModifierId] = heights + .flatMap(headerIdsAtHeight) + .flatMap(getHeaderById) + .map(_.payloadId) + historyStorage.removeObjects(toRemove) + } + + private def updateStorage(newModRow: PersistentModifier, + bestFullHeaderId: ModifierId, + updateHeaderInfo: Boolean = false, + additionalIndexes: List[(Array[Byte], Array[Byte])] = List.empty): Unit = { + val indicesToInsert: Seq[(Array[Byte], Array[Byte])] = + if (updateHeaderInfo) Seq(BestBlockKey -> bestFullHeaderId, BestHeaderKey -> bestFullHeaderId) + else Seq(BestBlockKey -> bestFullHeaderId) + historyStorage.bulkInsert(newModRow.id, indicesToInsert ++ additionalIndexes, Seq(newModRow)) + } + + private def isValidFirstBlock(header: Header): Boolean = + header.height == blockDownloadProcessor.minimalBlockHeight && getBestBlockId.isEmpty + } +} diff --git a/src/test/scala/encry/modifiers/InstanceFactory.scala b/src/test/scala/encry/modifiers/InstanceFactory.scala index 1636d72238..29d7a2f230 100755 --- a/src/test/scala/encry/modifiers/InstanceFactory.scala +++ b/src/test/scala/encry/modifiers/InstanceFactory.scala @@ -6,7 +6,8 @@ import encry.modifiers.state.Keys import encry.settings.{EncryAppSettings, NodeSettings} import encry.storage.levelDb.versionalLevelDB.{LevelDbFactory, VLDBWrapper, VersionalLevelDBCompanion} import encry.utils.{EncryGenerator, FileHelper, NetworkTimeProvider, TestHelper} -import encry.view.history.{History, HistoryHeadersProcessor, HistoryPayloadsProcessor} +import encry.view.history.processors.{HeaderFullChainProcessorComponent, PayloadFullChainProcessorComponent} +import encry.view.history.History import encry.view.history.storage.HistoryStorage import io.iohk.iodb.LSMStore import org.encryfoundation.common.modifiers.history.{Block, Header, Payload} @@ -229,7 +230,7 @@ trait InstanceFactory extends Keys with EncryGenerator { val ntp: NetworkTimeProvider = new NetworkTimeProvider(settingsEncry.ntp) - new History with HistoryHeadersProcessor with HistoryPayloadsProcessor { + new History with HeaderFullChainProcessorComponent with PayloadFullChainProcessorComponent { override val settings: EncryAppSettings = settingsEncry override var isFullChainSynced = settings.node.offlineGeneration override val historyStorage: HistoryStorage = storage