diff --git a/src/main/protobuf/my_messages.proto b/src/main/protobuf/my_messages.proto index 0798c61..a2511dc 100644 --- a/src/main/protobuf/my_messages.proto +++ b/src/main/protobuf/my_messages.proto @@ -40,6 +40,9 @@ message KeyBlockProtobuf { bytes currentBlockHash = 4; repeated TransactionProtobuf transactions = 5; bytes data = 6; + bytes signature = 7; + repeated bytes scheduler = 8; + bytes publicKey = 9; } message TransactionProtobuf { diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 81b2933..70f2fb5 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -3,13 +3,15 @@ port = 9101 otherNodes = [] heartbeat = 60000 - plannerHeartbeat = 500 + plannerHeartbeat = 200 blockPeriod = 10000 - biasForBlockPeriod = 1000 - newBlockchain = false + biasForBlockPeriod = 1500 canPublishBlocks = false + epochMultiplier = 5 network { maxBlockQtyInBlocksMessage = 10 + heightMessageInterval = 10 + qtyOfPrepareSchedulerSteps = 1 } apiSettings { httpHost = "0.0.0.0" diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 0745c6c..929cce8 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -13,6 +13,14 @@ akka { loggers = ["akka.event.slf4j.Slf4jLogger"] loglevel = "DEBUG" logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" + debug { + receive = on + autoreceive = on + lifecycle = on + unhandled = on + fsm = on + event-stream = on + } persistence.journal.plugin = akka.persistence.journal.leveldb actor.warn-about-java-serializer-usage = false persistence { diff --git a/src/main/scala/mvp2/actors/Blockchainer.scala b/src/main/scala/mvp2/actors/Blockchainer.scala index d8cbea1..a21db92 100644 --- a/src/main/scala/mvp2/actors/Blockchainer.scala +++ b/src/main/scala/mvp2/actors/Blockchainer.scala @@ -1,17 +1,23 @@ package mvp2.actors -import akka.actor.SupervisorStrategy.Resume -import akka.actor.{ActorRef, ActorSelection, OneForOneStrategy, Props, SupervisorStrategy} +import java.text.SimpleDateFormat + +import akka.actor.SupervisorStrategy.{Restart, Resume} +import akka.actor.{OneForOneStrategy, SupervisorStrategy} +import akka.actor.{ActorRef, ActorSelection, Props} import akka.persistence.{PersistentActor, RecoveryCompleted} +import akka.util.ByteString import com.typesafe.scalalogging.StrictLogging -import mvp2.actors.Planner.Period +import mvp2.actors.Planner.{Epoch, Period} import mvp2.data.InnerMessages._ import mvp2.data.NetworkMessages.Blocks +import mvp2.data.InnerMessages.{CurrentBlockchainInfo, ExpectedBlockPublicKeyAndHeight, Get, TimeDelta} import mvp2.data._ -import mvp2.utils.Settings + import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global import scala.language.postfixOps +import mvp2.utils.{EncodingUtils, Settings} class Blockchainer(settings: Settings) extends PersistentActor with StrictLogging { @@ -26,14 +32,17 @@ class Blockchainer(settings: Settings) extends PersistentActor with StrictLoggin val publisher: ActorRef = context.actorOf(Props(classOf[Publisher], settings), "publisher") val informator: ActorSelection = context.system.actorSelection("/user/starter/informator") val planner: ActorRef = context.actorOf(Props(classOf[Planner], settings), "planner") + var expectedPublicKeyAndHeight: Option[ByteString] = None + var epoch: Epoch = Epoch(List.empty) + val df = new SimpleDateFormat("HH:mm:ss") override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(){ - case _: Exception => Resume + case _: Exception => Restart } override def preStart(): Unit = if (settings.otherNodes.nonEmpty) - context.system.scheduler.scheduleOnce(2 seconds)(networker ! + context.system.scheduler.scheduleOnce(5 seconds)(networker ! OwnBlockchainHeight(blockchain.chain.lastOption.map(_.height).getOrElse(-1))) override def receiveRecover: Receive = { @@ -44,10 +53,15 @@ class Blockchainer(settings: Settings) extends PersistentActor with StrictLoggin case Blocks(blocks) => blockCache += blocks applyBlockFromCache() + case ExpectedBlockPublicKeyAndHeight(publicKey) => + expectedPublicKeyAndHeight = Some(publicKey) + logger.info(s"Blockchainer got new public key " + + s"${EncodingUtils.encode2Base16(expectedPublicKeyAndHeight.getOrElse(ByteString.empty))}") case TimeDelta(delta: Long) => currentDelta = delta case Get => sender ! blockchain case period: Period => - logger.info(s"Blockchainer received period for new block with exact timestamp ${period.exactTime}.") + logger.info(s"Blockchainer received period for new block with exact timestamp ${df.format(period.begin)} -" + + s" ${df.format(period.end)}.") nextTurn = period case CheckRemoteBlockchain(remoteHeight, remote) => blockchain.getMissingPart(remoteHeight).foreach(blocks => @@ -60,6 +74,8 @@ class Blockchainer(settings: Settings) extends PersistentActor with StrictLoggin case Some(block) => blockchain += block blockCache -= block + planner ! block + logger.info(s"Blockchainer sent new bloch to a planner with height ${block.height}") informator ! CurrentBlockchainInfo( blockchain.chain.lastOption.map(block => block.height).getOrElse(0), blockchain.chain.lastOption, @@ -67,12 +83,13 @@ class Blockchainer(settings: Settings) extends PersistentActor with StrictLoggin ) logger.info(s"Blockchainer apply new keyBlock with height ${block.height}. " + s"Blockchain's height is ${blockchain.chain.size}.") - planner ! block - publisher ! block - if (blockCache.isEmpty && !isSynced) { + if (!isSynced && blockchain.isSynced(settings.blockPeriod) && blockCache.isEmpty) { isSynced = true + logger.info(s"Synced done. Sent this message on the Planner and Publisher.") publisher ! SyncingDone + planner ! SyncingDone } + if (isSynced) publisher ! block applyBlockFromCache() case None => networker ! OwnBlockchainHeight(blockchain.chain.lastOption.map(_.height).getOrElse(-1)) diff --git a/src/main/scala/mvp2/actors/InfluxActor.scala b/src/main/scala/mvp2/actors/InfluxActor.scala index f69741d..77ad2e5 100644 --- a/src/main/scala/mvp2/actors/InfluxActor.scala +++ b/src/main/scala/mvp2/actors/InfluxActor.scala @@ -55,18 +55,18 @@ class InfluxActor(settings: Settings) extends CommonActor { msgFromRemote = newIncrements influxDB.write(settings.influx.port, s"""networkMsg,node=$myNodeAddress,msgid=${EncodingUtils.encode2Base16(id) + i},msg=${message.name} value=$time""") - logger.info(s"Report about msg:${EncodingUtils.encode2Base16(id)} with incr: $i") +// logger.info(s"Report about msg:${EncodingUtils.encode2Base16(id)} with incr: $i") case ToNet(message, remote, id) => val (newIncrements, i) = getMsgIncrements(remote, message.name, msgToRemote) msgToRemote = newIncrements influxDB.write(settings.influx.port, s"""networkMsg,node=$myNodeAddress,msgid=${EncodingUtils.encode2Base16(id) + i},msg=${message.name} value=$time""") - logger.info(s"Sent data about message to influx: $message with id: ${EncodingUtils.encode2Base16(id)} with incr: $i") +// logger.info(s"Sent data about message to influx: $message with id: ${EncodingUtils.encode2Base16(id)} with incr: $i") case SyncMessageIteratorsFromRemote(iterators, remote) => - logger.info(s"Sync iterators from $remote") +// logger.info(s"Sync iterators from $remote") msgFromRemote = msgFromRemote - remote + (remote -> iterators) case TimeDelta(delta: Long) => - logger.info(s"Update delta to: $delta") +// logger.info(s"Update delta to: $delta") currentDelta = delta case _ => } diff --git a/src/main/scala/mvp2/actors/Informator.scala b/src/main/scala/mvp2/actors/Informator.scala index d1d95cc..648c79f 100644 --- a/src/main/scala/mvp2/actors/Informator.scala +++ b/src/main/scala/mvp2/actors/Informator.scala @@ -29,7 +29,10 @@ class Informator(settings: Settings) extends CommonActor { block.previousKeyBlockHash, block.currentBlockHash, block.transactions.size, - block.data + block.data, + block.signature, + block.scheduler, + block.publicKey )).getOrElse(LightKeyBlock()) :: lightChain actualInfo = currentBlockchainInfo } diff --git a/src/main/scala/mvp2/actors/KeyKeeper.scala b/src/main/scala/mvp2/actors/KeyKeeper.scala index 01724f8..9e32cd6 100644 --- a/src/main/scala/mvp2/actors/KeyKeeper.scala +++ b/src/main/scala/mvp2/actors/KeyKeeper.scala @@ -12,14 +12,16 @@ class KeyKeeper extends CommonActor { override def preStart(): Unit = { logger.info(s"My public key is: ${EncodingUtils.encode2Base16(ECDSA.compressPublicKey(myKeys.getPublic))}") - context.actorSelection("/user/starter/blockchainer/networker") ! MyPublicKey(myKeys.getPublic) - context.actorSelection("/user/starter/blockchainer/planner") ! MyPublicKey(myKeys.getPublic) + val pubKeyMessage: MyPublicKey = MyPublicKey(ECDSA.compressPublicKey(myKeys.getPublic)) + context.actorSelection("/user/starter/blockchainer/networker") ! pubKeyMessage + context.actorSelection("/user/starter/blockchainer/planner") ! pubKeyMessage + context.actorSelection("/user/starter/blockchainer/publisher") ! myKeys } override def specialBehavior: Receive = { case Get => sender ! myKeys.getPublic case PeerPublicKey(key) => - logger.info(s"Got public key from remote: ${EncodingUtils.encode2Base16(ECDSA.compressPublicKey(key))} on KeyKeeper.") - allPublicKeys = allPublicKeys + key + logger.info(s"Got public key from remote: ${EncodingUtils.encode2Base16(key)} on KeyKeeper.") + allPublicKeys = allPublicKeys + ECDSA.uncompressPublicKey(key) } -} +} \ No newline at end of file diff --git a/src/main/scala/mvp2/actors/Networker.scala b/src/main/scala/mvp2/actors/Networker.scala index c269a20..4dffea7 100644 --- a/src/main/scala/mvp2/actors/Networker.scala +++ b/src/main/scala/mvp2/actors/Networker.scala @@ -8,7 +8,8 @@ import akka.util.ByteString import mvp2.data.InnerMessages._ import mvp2.data.NetworkMessages._ import mvp2.data.{KeyBlock, KnownPeers, Transaction} -import mvp2.utils.{ECDSA, Settings} +import mvp2.utils.Settings +import scala.language.postfixOps class Networker(settings: Settings) extends CommonActor { @@ -36,14 +37,11 @@ class Networker(settings: Settings) extends CommonActor { override def specialBehavior: Receive = { case msgFromNetwork: FromNet => + peers = peers.updatePeerTime(msgFromNetwork.remote) msgFromNetwork.message match { - case Peers(peersFromRemote, _) => - peers = peersFromRemote.foldLeft(peers) { - case (newKnownPeers, peerToAddOrUpdate) => - updatePeerKey(peerToAddOrUpdate.publicKey) - newKnownPeers.addOrUpdatePeer(peerToAddOrUpdate.addr, peerToAddOrUpdate.publicKey) - .updatePeerTime(msgFromNetwork.remote) - } + case msg@Peers(peersFromRemote, _) => + peersFromRemote.foreach(peer => updatePeerKey(peer.publicKey)) + peers = peers.updatePeers(msg, myAddr, msgFromNetwork.remote) case Blocks(blocks) => if (blocks.nonEmpty) context.parent ! msgFromNetwork.message case SyncMessageIterators(iterators) => @@ -56,7 +54,20 @@ class Networker(settings: Settings) extends CommonActor { transactions.foreach(tx => publisher ! tx) } case OwnBlockchainHeight(height) => peers.getHeightMessage(height).foreach(udpSender ! _) - case MyPublicKey(key) => myPublicKey = Some(ECDSA.compressPublicKey(key)) + case MyPublicKey(key) => myPublicKey = Some(key) + case PrepareScheduler => self ! PrepareSchedulerStep(settings.network.qtyOfPrepareSchedulerSteps) + case PrepareSchedulerStep(currentStep) => + logger.info(s"Get PrepareSchedulerStep: $currentStep") + sendPeers() + if (currentStep != 0) + context.system.scheduler + .scheduleOnce((settings.blockPeriod / 10) milliseconds)(self ! PrepareSchedulerStep(currentStep - 1)) + else { + logger.info(s"Before cleaning: ${peers.peersPublicKeyMap.mkString(",")}") + peers = peers.cleanPeersByTime.cleanPeersByIdenticalKnownPeers + logger.info(s"After cleaning: ${peers.peersPublicKeyMap.mkString(",")}") + planner ! KeysForSchedule(peers.getPeersKeys) + } case transaction: Transaction => peers.getTransactionMsg(transaction).foreach(msg => udpSender ! msg) case keyBlock: KeyBlock => peers.getBlockMessage(keyBlock).foreach(udpSender ! _) case RemoteBlockchainMissingPart(blocks, remote) => @@ -64,11 +75,7 @@ class Networker(settings: Settings) extends CommonActor { } def updatePeerKey(serializedKey: ByteString): Unit = - if (!myPublicKey.contains(serializedKey)) { - val newPublicKey: PeerPublicKey = PeerPublicKey(ECDSA.uncompressPublicKey(serializedKey)) - keyKeeper ! newPublicKey - planner ! newPublicKey - } + if (!myPublicKey.contains(serializedKey)) keyKeeper ! PeerPublicKey(serializedKey) def sendPeers(): Unit = myPublicKey.foreach(key => peers.getPeersMessages(myAddr, key).foreach(msg => udpSender ! msg)) diff --git a/src/main/scala/mvp2/actors/Planner.scala b/src/main/scala/mvp2/actors/Planner.scala index 75378bc..948416e 100644 --- a/src/main/scala/mvp2/actors/Planner.scala +++ b/src/main/scala/mvp2/actors/Planner.scala @@ -1,10 +1,15 @@ package mvp2.actors -import java.security.{KeyPair, PublicKey} -import akka.actor.{ActorRef, ActorSelection, Cancellable, Props} -import mvp2.data.InnerMessages.{Get, MyPublicKey, PeerPublicKey} +import java.text.SimpleDateFormat + +import akka.actor.{ActorRef, Props} +import akka.actor.{ActorSelection, Cancellable} +import akka.util.ByteString +import com.typesafe.scalalogging.StrictLogging +import mvp2.actors.Planner.Epoch +import mvp2.data.InnerMessages._ import mvp2.data.KeyBlock -import mvp2.utils.{ECDSA, EncodingUtils, Settings} +import mvp2.utils.{EncodingUtils, Settings} import scala.concurrent.duration._ import scala.language.postfixOps import scala.concurrent.ExecutionContext.Implicits.global @@ -13,38 +18,132 @@ class Planner(settings: Settings) extends CommonActor { import Planner.{Period, Tick} - var nextTurn: Period = Period(KeyBlock(), settings) val keyKeeper: ActorRef = context.actorOf(Props(classOf[KeyKeeper]), "keyKeeper") - val myKeys: KeyPair = ECDSA.createKeyPair - var allPublicKeys: Set[PublicKey] = Set.empty[PublicKey] + var myPublicKey: ByteString = ByteString.empty + var allPublicKeys: List[ByteString] = List.empty var nextPeriod: Period = Period(KeyBlock(), settings) - if (settings.canPublishBlocks) - context.system.scheduler.schedule(0 seconds, settings.plannerHeartbeat milliseconds, self, Tick) + var lastBlock: KeyBlock = KeyBlock() + var epoch: Epoch = Epoch(List.empty) + val heartBeat: Cancellable = + context.system.scheduler.schedule(10.seconds, settings.plannerHeartbeat milliseconds, self, Tick) val publisher: ActorSelection = context.system.actorSelection("/user/starter/blockchainer/publisher") + val networker: ActorSelection = context.system.actorSelection("/user/starter/blockchainer/networker") + var scheduleForWriting: List[ByteString] = List() + var hasWritten: Boolean = false + var needToCheckTimeToPublish: Boolean = true + var isRemoved: Boolean = false override def specialBehavior: Receive = { + case SyncingDone => + logger.info(s"Synced done on Planner.") + if (settings.canPublishBlocks) + context.system.scheduler.schedule(0 seconds, settings.plannerHeartbeat milliseconds, self, Tick) + context.become(syncedNode) case keyBlock: KeyBlock => - logger.info(s"Planner received new keyBlock with height: ${keyBlock.height}.") + lastBlock = keyBlock nextPeriod = Period(keyBlock, settings) - context.parent ! nextPeriod + logger.info(s"Epoch before trying to update with new block from network is: $epoch") + if (keyBlock.scheduler.nonEmpty) epoch = Epoch(keyBlock.scheduler) + epoch = epoch.dropNextPublisherPublicKey + logger.info(s"Current epoch is(before sync): $epoch. Height of last block is: ${lastBlock.height}") case PeerPublicKey(key) => - logger.info(s"Got public key from remote: ${EncodingUtils.encode2Base16(ECDSA.compressPublicKey(key))} on Planner.") - allPublicKeys = allPublicKeys + key + allPublicKeys = (allPublicKeys :+ key).sortWith((a, b) => a.utf8String.compareTo(b.utf8String) > 1) + logger.info(s"Set allPublickKeys to1 : ${allPublicKeys.map(EncodingUtils.encode2Base16).mkString(",")}") case MyPublicKey(key) => - case Tick if nextPeriod.timeToPublish => - publisher ! Get - logger.info("Planner sent publisher request: time to publish!") + logger.info(s"Set allPublickKeys to2 : ${EncodingUtils.encode2Base16(key)}") + allPublicKeys = (allPublicKeys :+ key).sortWith((a, b) => a.utf8String.compareTo(b.utf8String) > 1) + myPublicKey = key + if (settings.otherNodes.isEmpty) self ! SyncingDone + case _ => + } + + def syncedNode: Receive = { + case keyBlock: KeyBlock => + nextPeriod = Period(keyBlock, settings) + needToCheckTimeToPublish = true + lastBlock = keyBlock + if (!isRemoved) epoch = epoch.dropNextPublisherPublicKey + if (lastBlock.scheduler.nonEmpty) hasWritten = true + logger.info(s"Last block was updated. Height of last block is: ${lastBlock.height}. Period was updated. " + + s"New period is: $nextPeriod.") + context.parent ! nextPeriod + case KeysForSchedule(keys) => + logger.info(s"Get peers public keys for schedule: ${keys.map(EncodingUtils.encode2Base16).mkString(",")}") + allPublicKeys = (keys :+ myPublicKey).sortWith((a, b) => a.utf8String.compareTo(b.utf8String) > 1) + logger.info(s"Current epoch is: $epoch. Height of last block is: ${lastBlock.height}") + logger.info(s"Current public keys: ${allPublicKeys.map(EncodingUtils.encode2Base16).mkString(",")}") + case MyPublicKey(key) => + logger.info("Get my key") + allPublicKeys = (allPublicKeys :+ key).sortWith((a, b) => a.utf8String.compareTo(b.utf8String) > 1) + logger.info(s"Current epoch is: $epoch. Height of last block is: ${lastBlock.height}") + logger.info(s"Current public keys: ${allPublicKeys.map(EncodingUtils.encode2Base16).mkString(",")}") + myPublicKey = key + case Tick if epoch.isDone => + hasWritten = false + logger.info(s"epoch.isDone. Height of last block is: ${lastBlock.height}") + logger.info(s"Current epoch is: $epoch. Height of last block is: ${lastBlock.height}") + logger.info(s"Current public keys: ${allPublicKeys.map(EncodingUtils.encode2Base16).mkString(",")}") + epoch = Epoch(allPublicKeys, settings.epochMultiplier) + logger.info(s"New epoch is: $epoch") + logger.info(s"Current public keys: ${allPublicKeys.map(EncodingUtils.encode2Base16).mkString(",")}") + scheduleForWriting = epoch.schedule + checkMyTurn(scheduleForWriting) + isRemoved = true + case Tick if nextPeriod.timeToPublish && needToCheckTimeToPublish => + logger.info(s"nextPeriod.timeToPublish. Height of last block is: ${lastBlock.height}") + checkMyTurn(scheduleForWriting) + needToCheckTimeToPublish = false + isRemoved = true + logger.info(s"Current epoch is: $epoch. Height of last block is: ${lastBlock.height}") + logger.info(s"Current public keys: ${allPublicKeys.map(EncodingUtils.encode2Base16).mkString(",")}") + checkScheduleUpdateTime() case Tick if nextPeriod.noBlocksInTime => - val newPeriod = Period(nextPeriod, settings) - logger.info(s"No blocks in time. Planner added ${newPeriod.exactTime - System.currentTimeMillis} milliseconds.") - nextPeriod = newPeriod + logger.info(s"nextPeriod.noBlocksInTime. Height of last block is: ${lastBlock.height}") + //epoch = epoch.dropNextPublisherPublicKey + logger.info(s"Current epoch is: $epoch") + logger.info(s"Current public keys: ${allPublicKeys.map(EncodingUtils.encode2Base16).mkString(",")}") + nextPeriod = Period(nextPeriod, settings) + if (!hasWritten) epoch = Epoch(epoch.schedule) + checkMyTurn(scheduleForWriting) + context.parent ! nextPeriod + needToCheckTimeToPublish = true + checkScheduleUpdateTime() + isRemoved = true case Tick => + logger.info("123") + // logger.info(s"Current epoch is: $epoch. Height of last block is: ${lastBlock.height}") + // logger.info(s"Current public keys: ${allPublicKeys.map(EncodingUtils.encode2Base16).mkString(",")}") + } + + def checkMyTurn(schedule: List[ByteString]): Unit = { + logger.info(s"Going to check publisher at height: ${lastBlock.height + 1}." + + s" Next publisher is: ${EncodingUtils.encode2Base16(epoch.publicKeyOfNextPublisher)}. " + + s"My key: ${EncodingUtils.encode2Base16(myPublicKey)}. Result: ${epoch.publicKeyOfNextPublisher == myPublicKey}." + + s" ${epoch.publicKeyOfNextPublisher == myPublicKey} ... ${schedule.map(EncodingUtils.encode2Base16).mkString(",")}") + if (epoch.publicKeyOfNextPublisher == myPublicKey) { + logger.info(s"Got request for a new local block. Write schedule inside is. ${epoch.full}. Schedule is: " + + s"${schedule.map(EncodingUtils.encode2Base16).mkString(",")}") + publisher ! RequestForNewBlock(epoch.full, schedule) + } + logger.info(s"${epoch.full} && ${lastBlock.height}") + context.parent ! ExpectedBlockPublicKeyAndHeight(epoch.publicKeyOfNextPublisher) + epoch = epoch.dropNextPublisherPublicKey + logger.info(s"Epoch after checkMyTurn is: $epoch") } + + def checkScheduleUpdateTime(): Unit = + if (epoch.prepareNextEpoch) { + networker ! PrepareScheduler + logger.info("epoch.prepareNextEpoch") + } } object Planner { case class Period(begin: Long, exactTime: Long, end: Long) { + + val df = new SimpleDateFormat("HH:mm:ss") + def timeToPublish: Boolean = { val now: Long = System.currentTimeMillis now >= this.begin && now <= this.end @@ -52,9 +151,11 @@ object Planner { def noBlocksInTime: Boolean = System.currentTimeMillis > this.end + override def toString: String = s"Period(begin: ${df.format(begin)}. " + + s"ExactTime: ${df.format(exactTime)}. end: ${df.format(end)})" } - object Period { + object Period extends StrictLogging { def apply(lastKeyBlock: KeyBlock, settings: Settings): Period = { val exactTimestamp: Long = lastKeyBlock.timestamp + settings.blockPeriod @@ -62,33 +163,40 @@ object Planner { } def apply(previousPeriod: Period, settings: Settings): Period = { - val exactTimestamp: Long = previousPeriod.exactTime + settings.blockPeriod / 2 - Period(exactTimestamp - settings.biasForBlockPeriod, exactTimestamp, exactTimestamp + settings.biasForBlockPeriod) + val exactTimestamp: Long = previousPeriod.exactTime + settings.blockPeriod + logger.info(s"Generating next period after $previousPeriod") + val newPeriod: Period = + Period( + exactTimestamp - settings.biasForBlockPeriod, + exactTimestamp, + exactTimestamp + settings.biasForBlockPeriod + ) + logger.info(s"New period: $newPeriod") + newPeriod } } - case class Epoch(schedule: Map[Long, PublicKey]) { - - def nextBlock: (Long, PublicKey) = schedule.head + case class Epoch(schedule: List[ByteString], full: Boolean) { - def delete: Epoch = this.copy(schedule - schedule.head._1) + def isApplicableBlock(block: KeyBlock): Boolean = block.publicKey == schedule.head - def delete(height: Long): Epoch = this.copy(schedule = schedule.drop(height.toInt)) + def publicKeyOfNextPublisher: ByteString = schedule.head - def noBlockInTime: Epoch = this.copy((schedule - schedule.head._1).map(each => (each._1 - 1, each._2))) + def dropNextPublisherPublicKey: Epoch = if (schedule.nonEmpty) this.copy(schedule.tail, full = false) else this def isDone: Boolean = this.schedule.isEmpty + + def prepareNextEpoch: Boolean = schedule.size <= 2 + + override def toString: String = this.schedule.map(EncodingUtils.encode2Base16).mkString(",") } - object Epoch { - def apply(lastKeyBlock: KeyBlock, publicKeys: List[PublicKey], multiplier: Int = 1): Epoch = { - val startingHeight: Long = lastKeyBlock.height + 1 - val numberOfBlocksInEpoch: Int = publicKeys.size * multiplier - var schedule: Map[Long, PublicKey] = - (for (i <- startingHeight until startingHeight + numberOfBlocksInEpoch) - yield i).zip(publicKeys).toMap[Long, PublicKey] - Epoch(schedule) - } + object Epoch extends StrictLogging { + + def apply(publicKeys: List[ByteString], multiplier: Int = 1): Epoch = + Epoch((1 to multiplier).foldLeft(List[ByteString]()) { case (a, _) => a ::: publicKeys }, full = true) + + def apply(schedule: List[ByteString]): Epoch = Epoch(schedule, full = true) } case object Tick diff --git a/src/main/scala/mvp2/actors/Publisher.scala b/src/main/scala/mvp2/actors/Publisher.scala index 3842b6e..1c5dfa4 100644 --- a/src/main/scala/mvp2/actors/Publisher.scala +++ b/src/main/scala/mvp2/actors/Publisher.scala @@ -1,10 +1,15 @@ package mvp2.actors +import java.security.KeyPair + import akka.actor.ActorSelection -import mvp2.data.InnerMessages.{Get, SyncingDone, TimeDelta} +import akka.util.ByteString +import mvp2.data.InnerMessages.{RequestForNewBlock, SyncingDone, TimeDelta} import mvp2.data.NetworkMessages.Blocks import mvp2.data.{KeyBlock, Mempool, Transaction} -import mvp2.utils.Settings +import mvp2.utils.{ECDSA, EncodingUtils, Settings} + +import scala.language.postfixOps import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.language.postfixOps @@ -13,6 +18,7 @@ import scala.util.Random class Publisher(settings: Settings) extends CommonActor { var lastKeyBlock: KeyBlock = KeyBlock() + var myKeyPair: Option[KeyPair] = None val randomizer: Random.type = scala.util.Random var currentDelta: Long = 0 //val testTxGenerator: ActorRef = context.actorOf(Props(classOf[TestTxGenerator]), "testTxGenerator")//TODO delete @@ -28,6 +34,7 @@ class Publisher(settings: Settings) extends CommonActor { if (settings.otherNodes.isEmpty) context.become(publishBlockEnabled) override def specialBehavior: Receive = { + case pair: KeyPair => myKeyPair = Some(pair) case SyncingDone => logger.info("Syncing done!") context.become(publishBlockEnabled) @@ -37,6 +44,7 @@ class Publisher(settings: Settings) extends CommonActor { } def publishBlockEnabled: Receive = { + case pair: KeyPair => myKeyPair = Some(pair) case transaction: Transaction => if (mempool.updateMempool(transaction)) networker ! transaction logger.info(s"Mempool size is: ${mempool.mempool.size} after updating with new transaction.") @@ -45,24 +53,34 @@ class Publisher(settings: Settings) extends CommonActor { networker ! keyBlock lastKeyBlock = keyBlock mempool.removeUsedTxs(keyBlock.transactions) - case Get => - val newBlock: KeyBlock = createKeyBlock + case RequestForNewBlock(isFirstBlock, schedule) => + val newBlock: KeyBlock = createKeyBlock(isFirstBlock, schedule) logger.info(s"Publisher got new request and published block with height ${newBlock.height}.") context.parent ! Blocks(List(newBlock)) networker ! newBlock case TimeDelta(delta: Long) => logger.info(s"Update delta to: $delta") currentDelta = delta + case tx => println(tx) } def time: Long = System.currentTimeMillis() + currentDelta - def createKeyBlock: KeyBlock = { + def createKeyBlock(isFirstBlock: Boolean, schedule: List[ByteString]): KeyBlock = { + logger.info(s"This is schedule for writing into new block ${schedule.map(EncodingUtils.encode2Base16).mkString(",")}") + val currentTime: Long = time val keyBlock: KeyBlock = - KeyBlock(lastKeyBlock.height + 1, time, lastKeyBlock.currentBlockHash, List.empty) + KeyBlock(lastKeyBlock.height + 1, currentTime, lastKeyBlock.currentBlockHash, mempool.mempool) + val signedBlock: KeyBlock = + if (isFirstBlock) + keyBlock.copy(signature = ECDSA.sign(myKeyPair.get.getPrivate, keyBlock.getBytes), scheduler = schedule, + publicKey = myKeyPair.map(x => ECDSA.compressPublicKey(x.getPublic)).getOrElse(ByteString.empty)) + else keyBlock.copy(signature = ECDSA.sign(myKeyPair.get.getPrivate, keyBlock.getBytes), + publicKey = myKeyPair.map(x => ECDSA.compressPublicKey(x.getPublic)).getOrElse(ByteString.empty)) + KeyBlock(lastKeyBlock.height + 1, time, lastKeyBlock.currentBlockHash, List.empty) logger.info(s"New keyBlock with height ${keyBlock.height} is published by local publisher. " + s"${keyBlock.transactions.size} transactions inside.") mempool.cleanMempool() - keyBlock + signedBlock } } \ No newline at end of file diff --git a/src/main/scala/mvp2/actors/UdpReceiver.scala b/src/main/scala/mvp2/actors/UdpReceiver.scala index 9054532..34444bb 100644 --- a/src/main/scala/mvp2/actors/UdpReceiver.scala +++ b/src/main/scala/mvp2/actors/UdpReceiver.scala @@ -1,7 +1,8 @@ package mvp2.actors import java.net.{InetAddress, InetSocketAddress} -import akka.actor.{Actor, ActorRef, ActorSelection} + +import akka.actor.{Actor, ActorRef, ActorSelection, Terminated} import akka.io.{IO, Udp} import akka.serialization.{Serialization, SerializationExtension} import akka.util.ByteString @@ -27,10 +28,16 @@ class UdpReceiver(settings: Settings) extends Actor with StrictLogging { IO(Udp) ! Udp.Bind(self, myAddr) } - override def receive: Receive = { + override def receive: Receive = unboundedCycle + + def readCycleWithUnbounded(socket: ActorRef): Receive = + readCycle(socket) orElse unboundedCycle + + def unboundedCycle: Receive = { case Udp.Bound(local) => logger.info(s"Binded to $local") - context.become(readCycle(sender)) + context.watch(sender) + context.become(readCycleWithUnbounded(sender)) udpSender ! UdpSocket(sender) case msg => logger.info(s"Received message $msg from $sender before binding") } @@ -53,6 +60,8 @@ class UdpReceiver(settings: Settings) extends Actor with StrictLogging { case Udp.Unbound => logger.info(s"Unbound $socket") context.stop(self) + case Terminated(_) => + IO(Udp) ! Udp.Bind(self, myAddr) } def deserialize(bytes: ByteString): Try[NetworkMessage] = bytes.head match { diff --git a/src/main/scala/mvp2/actors/UdpSender.scala b/src/main/scala/mvp2/actors/UdpSender.scala index 209f259..80adfc4 100644 --- a/src/main/scala/mvp2/actors/UdpSender.scala +++ b/src/main/scala/mvp2/actors/UdpSender.scala @@ -13,11 +13,16 @@ import mvp2.utils.{Settings, Sha256} class UdpSender(settings: Settings) extends Actor with StrictLogging { - override def receive: Receive = { - case UdpSocket(connection) => context.become(sendingCycle(connection)) + override def receive: Receive = unboundedCycle + + def unboundedCycle: Receive = { + case UdpSocket(connection) => context.become(sendingCycleWithUnbounded(connection)) case smth: Any => logger.info(s"Got smth strange: $smth.") } + def sendingCycleWithUnbounded(socket: ActorRef): Receive = + sendingCycle(socket) orElse unboundedCycle + def sendingCycle(connection: ActorRef): Receive = { case ToNet(message, remote, _) => logger.info(s"Sending $message to $remote") diff --git a/src/main/scala/mvp2/actors/Zombie.scala b/src/main/scala/mvp2/actors/Zombie.scala index c0262c9..4ae1ee0 100644 --- a/src/main/scala/mvp2/actors/Zombie.scala +++ b/src/main/scala/mvp2/actors/Zombie.scala @@ -11,8 +11,8 @@ class Zombie extends CommonActor { } override def specialBehavior: Receive = { - case deadMessage: DeadLetter => logger.info(s"Dead letter: ${deadMessage.toString}.") - case unhandled: UnhandledMessage => logger.info(s"Unhandled message ${unhandled.toString}") + case deadMessage: DeadLetter => println(s"Dead letter: ${deadMessage.toString}.") + case unhandled: UnhandledMessage => println(s"Unhandled message ${unhandled.toString}") } } \ No newline at end of file diff --git a/src/main/scala/mvp2/data/Block.scala b/src/main/scala/mvp2/data/Block.scala index 5adeef0..a56de0f 100644 --- a/src/main/scala/mvp2/data/Block.scala +++ b/src/main/scala/mvp2/data/Block.scala @@ -1,6 +1,7 @@ package mvp2.data import akka.util.ByteString +import com.google.common.primitives.Longs import com.google.protobuf.{ByteString => pByteString} import mvp2.data.my_messages.KeyBlockProtobuf import mvp2.utils.Sha256 @@ -23,10 +24,16 @@ final case class KeyBlock(height: Long, previousKeyBlockHash: ByteString, currentBlockHash: ByteString, transactions: List[Transaction], - data: ByteString) extends Block with Ordered[KeyBlock] { + data: ByteString, + signature: ByteString, + scheduler: List[ByteString], + publicKey: ByteString) extends Block with Ordered[KeyBlock] { + override def isValid(previousBlock: Block): Boolean = previousBlock.height + 1 == this.height override def compare(that: KeyBlock): Int = this.height compare that.height + + def getBytes: ByteString = ByteString(KeyBlock.toProtobuf(this).toByteArray) } object KeyBlock { @@ -34,9 +41,12 @@ object KeyBlock { timestamp: Long = System.currentTimeMillis, previousKeyBlockHash: ByteString = ByteString.empty, transactions: List[Transaction] = List.empty, - data: ByteString = ByteString.empty): KeyBlock = { + data: ByteString = ByteString.empty, + signature: ByteString = ByteString.empty, + scheduler: List[ByteString] = List.empty, + publicKey: ByteString = ByteString.empty): KeyBlock = { val currentBlockHash: ByteString = Sha256.toSha256(height.toString + timestamp.toString + previousKeyBlockHash.toString) - new KeyBlock(height, timestamp, previousKeyBlockHash, currentBlockHash, transactions, data) + new KeyBlock(height, timestamp, previousKeyBlockHash, currentBlockHash, transactions, data, signature, scheduler, publicKey) } def toProtobuf(block: KeyBlock): KeyBlockProtobuf = KeyBlockProtobuf() @@ -45,7 +55,11 @@ object KeyBlock { .withTimestamp(block.timestamp) .withTransactions(block.transactions.map(Transaction.toProtobuf)) .withData(pByteString.copyFrom(block.data.toByteBuffer)) + .withSignature(pByteString.copyFrom(block.signature.toByteBuffer)) + .withScheduler(block.scheduler.map(publicKey => pByteString.copyFrom(publicKey.toByteBuffer))) .withPreviousKeyBlockHash(pByteString.copyFrom(block.previousKeyBlockHash.toByteBuffer)) + .withPublicKey(pByteString.copyFrom(block.publicKey.toByteBuffer)) + def fromProtobuf(blockProtobuf: KeyBlockProtobuf): KeyBlock = KeyBlock( blockProtobuf.height, @@ -53,7 +67,10 @@ object KeyBlock { ByteString(blockProtobuf.previousKeyBlockHash.toByteArray), ByteString(blockProtobuf.currentBlockHash.toByteArray), blockProtobuf.transactions.map(Transaction.fromProtobuf).toList, - ByteString(blockProtobuf.data.toByteArray) + ByteString(blockProtobuf.data.toByteArray), + ByteString(blockProtobuf.signature.toByteArray), + blockProtobuf.scheduler.map(protKey => ByteString(protKey.toByteArray)).toList, + ByteString(blockProtobuf.publicKey.toByteArray) ) } @@ -72,6 +89,9 @@ case class LightKeyBlock(height: Long = 0, previousKeyBlockHash: ByteString = ByteString.empty, currentBlockHash: ByteString = ByteString.empty, txNum: Int = 0, - data: ByteString = ByteString.empty) extends Block { + data: ByteString = ByteString.empty, + signature: ByteString = ByteString.empty, + scheduler: List[ByteString] = List(), + publicKey: ByteString = ByteString.empty) extends Block { override def isValid(previousBlock: Block): Boolean = true } \ No newline at end of file diff --git a/src/main/scala/mvp2/data/Blockchain.scala b/src/main/scala/mvp2/data/Blockchain.scala index 1455f4f..480d0b8 100644 --- a/src/main/scala/mvp2/data/Blockchain.scala +++ b/src/main/scala/mvp2/data/Blockchain.scala @@ -26,6 +26,9 @@ final case class Blockchain(var chain: SortedSet[KeyBlock] = SortedSet.empty[Key def getMissingPart(remoteHeight: Long): Option[SortedSet[KeyBlock]] = if (chain.lastOption.exists(_.height == remoteHeight)) None else Some(chain.drop(remoteHeight.toInt + 1)) + + def isSynced(blockInterval: Long): Boolean = + lastBlock.timestamp > (System.currentTimeMillis() - blockInterval) } final case class BlocksCache(var chain: SortedSet[KeyBlock] = SortedSet.empty[KeyBlock]) extends Chain { @@ -38,4 +41,4 @@ final case class BlocksCache(var chain: SortedSet[KeyBlock] = SortedSet.empty[Ke def -(block: KeyBlock): BlocksCache = this.copy(chain.filter(_.height != block.height)) def getApplicableBlock(blochchain: Blockchain): Option[KeyBlock] = chain.find(_.height == blochchain.maxHeight + 1) -} +} \ No newline at end of file diff --git a/src/main/scala/mvp2/data/InnerMessages.scala b/src/main/scala/mvp2/data/InnerMessages.scala index 388b542..98d7bd8 100644 --- a/src/main/scala/mvp2/data/InnerMessages.scala +++ b/src/main/scala/mvp2/data/InnerMessages.scala @@ -1,9 +1,10 @@ package mvp2.data import java.net.InetSocketAddress -import java.security.PublicKey + import akka.actor.ActorRef import akka.util.ByteString +import mvp2.actors.Planner.Epoch import mvp2.data.NetworkMessages.NetworkMessage object InnerMessages { @@ -26,13 +27,18 @@ object InnerMessages { final case class FromNet(message: NetworkMessage, remote: InetSocketAddress, id: ByteString = ByteString.empty) extends InnerMessage - final case class SyncMessageIteratorsFromRemote(iterators: Map[String, Int], remote: InetSocketAddress) extends InnerMessage + final case class SyncMessageIteratorsFromRemote(iterators: Map[String, Int], + remote: InetSocketAddress) extends InnerMessage final case class UdpSocket(conection: ActorRef) extends InnerMessage - final case class PeerPublicKey(peerPublicKey: PublicKey) extends InnerMessage + final case class PeerPublicKey(peerPublicKey: ByteString) extends InnerMessage + + final case class KeysForSchedule(keys: List[ByteString]) extends InnerMessage - final case class MyPublicKey(publicKey: PublicKey) extends InnerMessage + final case class MyPublicKey(publicKey: ByteString) extends InnerMessage + + final case class ExpectedBlockPublicKeyAndHeight(publicKey: ByteString) extends InnerMessage final case class TimeDelta(delta: Long) extends InnerMessage @@ -46,4 +52,13 @@ object InnerMessages { final case object SyncingDone extends InnerMessage + final case class PublishNextBlock(scheduler: Set[ByteString]) extends InnerMessage + + final case class RequestForNewBlock(firstInEpoch: Boolean, schedule: List[ByteString]) extends InnerMessage + + final case object PrepareScheduler extends InnerMessage + + final case class PrepareSchedulerStep(i: Int) extends InnerMessage + + final case class GetNewSyncedEpoch(epoch: Epoch) extends InnerMessage } \ No newline at end of file diff --git a/src/main/scala/mvp2/data/KnownPeers.scala b/src/main/scala/mvp2/data/KnownPeers.scala index 13c7a43..0c5cc0d 100644 --- a/src/main/scala/mvp2/data/KnownPeers.scala +++ b/src/main/scala/mvp2/data/KnownPeers.scala @@ -2,29 +2,89 @@ package mvp2.data import java.net.{InetAddress, InetSocketAddress} import akka.util.ByteString +import com.typesafe.scalalogging.StrictLogging +import mvp2.data.KnownPeers.KnownPeerInfo import mvp2.data.InnerMessages.ToNet import mvp2.data.NetworkMessages.{Blocks, LastBlockHeight, Peers, Transactions} -import mvp2.utils.Settings +import mvp2.utils.{EncodingUtils, Settings} -case class KnownPeers(peersPublicKeyMap: Map[InetSocketAddress, Option[ByteString]], - peersLastTimeUpdateMap: Map[InetSocketAddress, Long]) { +case class KnownPeers(var peersPublicKeyMap: Map[InetSocketAddress, KnownPeerInfo], + settings: Settings) extends StrictLogging { - def addOrUpdatePeer(peer: (InetSocketAddress, ByteString)): KnownPeers = - if (!isSelfIp(peer._1)) this.copy(peersPublicKeyMap + (peer._1 -> Some(peer._2))) + + def updatePeers(peersMsg: Peers, myAddr: InetSocketAddress, remote: InetSocketAddress): KnownPeers = + peersMsg.peers.filter(_.addr != myAddr).foldLeft(this) { + case (newPeers, nextPeer) => + newPeers + .addOrUpdatePeer(nextPeer.addr, nextPeer.publicKey) + .updatePeerTime(nextPeer.addr) + .checkPeersIdentity(peersMsg, myAddr, remote) + } + + def checkPeersIdentity(peersMsg: Peers, myAddr: InetSocketAddress, remote: InetSocketAddress): KnownPeers = + if (remote != myAddr) { + this.copy(peersPublicKeyMap + + (remote -> + peersPublicKeyMap.getOrElse(remote, KnownPeerInfo()) + .copy( + knownPeersIdentity = peersMsg + .peers + .filter(_.addr != myAddr) + .forall(peerInfo => peersPublicKeyMap.contains(peerInfo.addr)) + && peersMsg + .peers + .count(peer => peer.addr != myAddr && + peer.addr != peersMsg.remote) == peersPublicKeyMap.count(_._1 != peersMsg.remote) + ) + ) + ) + } else this + def cleanPeersByTime: KnownPeers = { + val newPeers: Map[InetSocketAddress, KnownPeerInfo] = + peersPublicKeyMap.filter(_._2.lastResponseTime > System.currentTimeMillis() - settings.blockPeriod) + logger.info(s"Delete from peers: ${peersPublicKeyMap.keys.toList.diff(newPeers.keys.toList).mkString(",")}") + this.copy(newPeers) + } + + def cleanPeersByIdenticalKnownPeers: KnownPeers = this.copy(peersPublicKeyMap.filter(_._2.knownPeersIdentity)) + + def getPeersKeys: List[ByteString] = peersPublicKeyMap.flatMap(_._2.publicKey).toList + + def addOrUpdatePeer(peer: (InetSocketAddress, ByteString)): KnownPeers = { + logger.info(s"Updating peer: $peer") + if (!isSelfIp(peer._1)) { + val le = this.copy(peersPublicKeyMap + ( + peer._1 -> peersPublicKeyMap.getOrElse(peer._1, KnownPeerInfo()) + .copy(Some(peer._2), lastResponseTime = System.currentTimeMillis())) + ) + logger.info(s"Get peer456 ${peer._1}: ${peersPublicKeyMap + .getOrElse(peer._1, KnownPeerInfo())}") + le + } + else this + } + def updatePeerTime(peer: InetSocketAddress): KnownPeers = - if (!isSelfIp(peer)) - this.copy(peersLastTimeUpdateMap = peersLastTimeUpdateMap + (peer -> System.currentTimeMillis())) + if (!isSelfIp(peer)) { + this.copy( + peersPublicKeyMap + (peer -> + peersPublicKeyMap + .getOrElse(peer, KnownPeerInfo()) + .copy(lastResponseTime = System.currentTimeMillis())) + ) + } else this def getPeersMessages(myAddr: InetSocketAddress, publicKey: ByteString): Seq[ToNet] = peersPublicKeyMap.map(peer => - ToNet( - Peers(peersPublicKeyMap.flatMap { - case (addr, Some(key)) => Some(addr -> key) - case (_, None) => None - }, (myAddr, publicKey), peer._1), + ToNet( + Peers( + peersPublicKeyMap.flatMap(peer => peer._2.publicKey.map(key => peer._1 -> key)), + (myAddr, publicKey), + peer._1 + ), peer._1 ) ).toSeq @@ -35,8 +95,11 @@ case class KnownPeers(peersPublicKeyMap: Map[InetSocketAddress, Option[ByteStrin def getTransactionMsg(transaction: Transaction): Seq[ToNet] = peersPublicKeyMap.map(peer => ToNet(Transactions(List(transaction)), peer._1)).toSeq - def getHeightMessage(height: Long): Seq[ToNet] = - peersPublicKeyMap.keys.map(peer => ToNet(LastBlockHeight(height), peer)).toSeq + def getHeightMessage(height: Long): Seq[ToNet] = { + val peersToSync = peersPublicKeyMap + .filter(_._2.lastResponseTime > System.currentTimeMillis() - settings.network.heightMessageInterval).keys + peersToSync.map(peer => ToNet(LastBlockHeight(height), peer)).toSeq + } def isSelfIp(addr: InetSocketAddress): Boolean = (InetAddress.getLocalHost.getAddress sameElements addr.getAddress.getAddress) || @@ -45,9 +108,25 @@ case class KnownPeers(peersPublicKeyMap: Map[InetSocketAddress, Option[ByteStrin object KnownPeers { + case class KnownPeerInfo(publicKey: Option[ByteString], + lastResponseTime: Long, + knownPeersIdentity: Boolean) { + + override def toString: String = s"(PublicKey: ${publicKey.map(EncodingUtils.encode2Base16)}. " + + s"lastResponseTime: $lastResponseTime. knownPeersIdentity: $knownPeersIdentity)" + } + + object KnownPeerInfo { + + def apply(publicKey: Option[ByteString] = None, + lastResponseTime: Long = System.currentTimeMillis(), + knownPeersIdentity: Boolean = false): KnownPeerInfo = + new KnownPeerInfo(publicKey, lastResponseTime, knownPeersIdentity) + } + def apply(settings: Settings): KnownPeers = new KnownPeers( - settings.otherNodes.map(node => new InetSocketAddress(node.host, node.port) -> None).toMap, - settings.otherNodes.map(node => (new InetSocketAddress(node.host, node.port), 0: Long)).toMap + settings.otherNodes.map(node => new InetSocketAddress(node.host, node.port) -> KnownPeerInfo()).toMap, + settings ) } \ No newline at end of file diff --git a/src/main/scala/mvp2/data/NetworkMessages.scala b/src/main/scala/mvp2/data/NetworkMessages.scala index 1eb56a0..01042a5 100644 --- a/src/main/scala/mvp2/data/NetworkMessages.scala +++ b/src/main/scala/mvp2/data/NetworkMessages.scala @@ -143,5 +143,4 @@ object NetworkMessages { LastBlockHeight(LastBlockHeightProtobuf.parseFrom(bytes.toArray).height) } } - } diff --git a/src/main/scala/mvp2/data/Transaction.scala b/src/main/scala/mvp2/data/Transaction.scala index 03aff31..a813cba 100644 --- a/src/main/scala/mvp2/data/Transaction.scala +++ b/src/main/scala/mvp2/data/Transaction.scala @@ -32,4 +32,4 @@ object Transaction { ByteString(txProt.signature.toByteArray), ByteString(txProt.data.toByteArray) ) -} +} \ No newline at end of file diff --git a/src/main/scala/mvp2/utils/Settings.scala b/src/main/scala/mvp2/utils/Settings.scala index d8c72a9..c37cfa4 100644 --- a/src/main/scala/mvp2/utils/Settings.scala +++ b/src/main/scala/mvp2/utils/Settings.scala @@ -5,10 +5,10 @@ case class Settings(port: Int, network: NetworkSettings, heartbeat: Int, plannerHeartbeat: Int, + epochMultiplier: Int, blockPeriod: Long, canPublishBlocks: Boolean, biasForBlockPeriod: Long, - newBlockchain: Boolean, apiSettings: ApiSettings, ntp: NetworkTimeProviderSettings, influx: InfluxSettings, @@ -28,4 +28,6 @@ case class MempoolSetting(transactionsValidTime: Long, mempoolCleaningTime: Long case class TestingSettings(messagesTime: Boolean, iteratorsSyncTime: Int) -case class NetworkSettings(maxBlockQtyInBlocksMessage: Int) \ No newline at end of file +case class NetworkSettings(maxBlockQtyInBlocksMessage: Int, + heightMessageInterval: Int, + qtyOfPrepareSchedulerSteps: Int) \ No newline at end of file diff --git a/src/test/scala/mvp2/data/EpochTest.scala b/src/test/scala/mvp2/data/EpochTest.scala new file mode 100644 index 0000000..4c0aebf --- /dev/null +++ b/src/test/scala/mvp2/data/EpochTest.scala @@ -0,0 +1,29 @@ +package mvp2.data + +import akka.util.ByteString +import mvp2.actors.Planner.Epoch +import org.scalatest.{Matchers, PropSpecLike} + +class EpochTest extends PropSpecLike with Matchers { + + val publicKeys1: List[ByteString] = List( + ByteString("11qwertynddsvm"), + ByteString("22qwertynddsvmwerf"), + ByteString("33qwertynwerqewrddsvm"), + ByteString("44qwertynddsvmqwerqw"), + ByteString("55qwertyndd1241svm"), + ) + + val publicKeys2: List[ByteString] = List(ByteString("11qwertynddsvm")) + + property("Epoch size must me 50 after apply method:") { + val epoch: Epoch = Epoch(publicKeys1, 10) + epoch.schedule.size shouldEqual 50 + } + + property("Epoch size must me 10 after apply method with 1 l element in keys set:") { + val epoch: Epoch = Epoch(publicKeys2, 10) + epoch.schedule.size shouldEqual 10 + } + +} \ No newline at end of file diff --git a/src/test/scala/mvp2/data/KnownPeersTest.scala b/src/test/scala/mvp2/data/KnownPeersTest.scala new file mode 100644 index 0000000..b481801 --- /dev/null +++ b/src/test/scala/mvp2/data/KnownPeersTest.scala @@ -0,0 +1,86 @@ +package mvp2.data + +import java.net.InetSocketAddress + +import akka.util.ByteString +import com.typesafe.config.ConfigFactory +import mvp2.data.NetworkMessages.Peers +import mvp2.utils.Settings +import org.scalatest.{Matchers, PropSpecLike} +import net.ceedubs.ficus.Ficus._ +import net.ceedubs.ficus.readers.ArbitraryTypeReader._ + +class KnownPeersTest extends PropSpecLike with Matchers { + + val settings: Settings = ConfigFactory.load("local.conf").withFallback(ConfigFactory.load) + .as[Settings]("mvp") + + val peers: KnownPeers = KnownPeers(settings) + + val fakePeer1: (InetSocketAddress, ByteString) = + new InetSocketAddress("192.168.1.1", 1234) -> ByteString("PubKey1") + val fakePeer2: (InetSocketAddress, ByteString) = + new InetSocketAddress("192.168.1.2", 1234) -> ByteString("PubKey2") + val fakePeer3: (InetSocketAddress, ByteString) = + new InetSocketAddress("192.168.1.3", 1234) -> ByteString("PubKey3") + val fakePeer4: (InetSocketAddress, ByteString) = + new InetSocketAddress("192.168.1.4", 1234) -> ByteString("PubKey4") + val fakePeer5: (InetSocketAddress, ByteString) = + new InetSocketAddress("192.168.1.5", 1234) -> ByteString("PubKey5") + + val fakePeers: Map[InetSocketAddress, ByteString] = Map( + fakePeer1, fakePeer2, fakePeer3, fakePeer4, fakePeer5 + ) + + val fakeMyNode: (InetSocketAddress, ByteString) = + new InetSocketAddress("192.168.1.6", 1234) -> ByteString("MyPubKey") + + val msgWithFakePeers = Peers(fakePeers, fakeMyNode, fakeMyNode._1) + + property("KnownPeers should add peer after knownPeers msg and should ignore myAddr") { + + val newPeers = peers.updatePeers(msgWithFakePeers, fakeMyNode._1, fakePeer1._1) + + newPeers.peersPublicKeyMap.size shouldEqual peers.peersPublicKeyMap.size + fakePeers.size + } + + property("KnownPeers should get all keys of all online nodes") { + + val newPeers = peers.updatePeers(msgWithFakePeers, fakeMyNode._1, fakePeer1._1).cleanPeersByTime + + newPeers.getPeersKeys.length shouldEqual fakePeers.size + } + + property("Known peers should ignore peers with lastResponseTime > System.currentTimeMillis() - settings.blockPeriod") { + + val newPeers = peers.updatePeers(msgWithFakePeers, fakeMyNode._1, fakePeer1._1) + + Thread.sleep(settings.blockPeriod + 10) + + val msgWithFakePeersTail = Peers(fakePeers.tail, fakeMyNode, fakeMyNode._1) + + newPeers.updatePeers(msgWithFakePeersTail, fakeMyNode._1, fakePeer1._1) + .cleanPeersByTime + .getPeersKeys + .length shouldEqual fakePeers.size - 1 + } + + property("Known peers should return keys of nodes with equal peers msg") { + + val newPeers = peers.updatePeers(Peers(Map(fakePeer2, fakePeer3), fakePeer1, fakeMyNode._1), + fakeMyNode._1, fakePeer1._1) + + val msg1WithFakePeers = Peers(Map(fakePeer2, fakePeer3), fakePeer1, fakeMyNode._1) -> fakePeer1 + val msg2WithFakePeers = Peers(Map(fakePeer1, fakePeer3), fakePeer2, fakeMyNode._1) -> fakePeer2 + val msg3WithFakePeers = Peers(Map(fakePeer1, fakePeer2), fakePeer3, fakeMyNode._1) -> fakePeer3 + val msg4WithFakePeers = Peers(Map(fakePeer4), fakePeer4, fakeMyNode._1) -> fakePeer4 + + Seq(msg1WithFakePeers, msg2WithFakePeers, msg3WithFakePeers, msg4WithFakePeers) + .foldLeft(newPeers) { + case (knownPeers, nextMsg) => knownPeers.checkPeersIdentity(nextMsg._1, fakeMyNode._1, nextMsg._2._1) + }.cleanPeersByIdenticalKnownPeers + .cleanPeersByTime + .getPeersKeys + .length shouldEqual 3 + } +}