Skip to content
Open
Show file tree
Hide file tree
Changes from 69 commits
Commits
Show all changes
111 commits
Select commit Hold shift + click to select a range
4574cb3
initial commit
GusevTimofey Nov 20, 2018
e3413b5
added planner logic
GusevTimofey Nov 20, 2018
0d10706
tests
GusevTimofey Nov 20, 2018
880dc0e
tests
GusevTimofey Nov 20, 2018
fcd2152
tests
GusevTimofey Nov 20, 2018
f5ba488
tests
GusevTimofey Nov 20, 2018
d993e76
tests
GusevTimofey Nov 20, 2018
3111e4d
changed logic with keys
GusevTimofey Nov 21, 2018
045cdcf
tests
GusevTimofey Nov 22, 2018
dbd040e
tests
GusevTimofey Nov 22, 2018
cc4f96f
tests
GusevTimofey Nov 22, 2018
d73f951
tests
GusevTimofey Nov 22, 2018
82d7ea9
tests
GusevTimofey Nov 22, 2018
9fbd0ae
tests
GusevTimofey Nov 22, 2018
66920eb
tests
GusevTimofey Nov 22, 2018
975921d
tests
GusevTimofey Nov 22, 2018
8f79637
tests
GusevTimofey Nov 22, 2018
8c94469
tests
GusevTimofey Nov 22, 2018
778cd9d
tests
GusevTimofey Nov 22, 2018
4ddb430
tests
GusevTimofey Nov 22, 2018
7fc3c61
tests
GusevTimofey Nov 22, 2018
f045691
tests
GusevTimofey Nov 22, 2018
b01512c
tests
GusevTimofey Nov 22, 2018
7c38e2e
tests
GusevTimofey Nov 23, 2018
7a4ce05
tests
GusevTimofey Nov 23, 2018
2dbf2a7
tests
GusevTimofey Nov 23, 2018
47b405f
tests
GusevTimofey Nov 23, 2018
d862fae
Merge branch 'master' into mvp2-planner
GusevTimofey Nov 23, 2018
efb40da
tests
GusevTimofey Nov 23, 2018
80aa075
tests
GusevTimofey Nov 23, 2018
85af2d7
Some additions
Bromel777 Nov 23, 2018
8393e40
Merge remote-tracking branch 'origin/mvp2-planner' into mvp2-planner
Bromel777 Nov 23, 2018
17dc4e5
Some additions
Bromel777 Nov 23, 2018
51e230f
tests
GusevTimofey Nov 23, 2018
d755a07
Some additions
Bromel777 Nov 23, 2018
f3d55a2
tests
GusevTimofey Nov 26, 2018
403f05e
Merge branch 'master' into mvp2-planner
GusevTimofey Nov 26, 2018
136a5e7
merged
GusevTimofey Nov 26, 2018
efb49a4
Some additions
Bromel777 Nov 26, 2018
67d9002
merged
GusevTimofey Nov 26, 2018
b48080b
Some additions
Bromel777 Nov 26, 2018
921aa27
Some additions
Bromel777 Nov 26, 2018
f6c2834
Some additions
Bromel777 Nov 26, 2018
ba6115f
Some additions
Bromel777 Nov 26, 2018
520feba
Some additions
Bromel777 Nov 26, 2018
1f3c7fc
Add knownPeers test
Bromel777 Nov 26, 2018
c814569
Some additions
Bromel777 Nov 27, 2018
452bc82
qtyOfPrepareSchedulerSteps 15 -> 5
Bromel777 Nov 27, 2018
1e0a8b2
added schedule in first epoch block
GusevTimofey Nov 27, 2018
df54631
Merge remote-tracking branch 'origin/mvp2-planner' into mvp2-planner
GusevTimofey Nov 27, 2018
fc5d232
move multiplier to settings
Bromel777 Nov 27, 2018
c9ca5f6
added tests
GusevTimofey Nov 27, 2018
0de1f43
fixed sorting
GusevTimofey Nov 27, 2018
482f305
fixed sorting
GusevTimofey Nov 27, 2018
84a09e3
Some additions
Bromel777 Nov 27, 2018
a06fa55
Some additions
Bromel777 Nov 27, 2018
3e10a6c
Rename
Bromel777 Nov 27, 2018
196cdb3
fixed allKeys collection
GusevTimofey Nov 27, 2018
7f42438
Some additions
Bromel777 Nov 29, 2018
2e34a88
Merge master
Bromel777 Nov 29, 2018
88c4964
some additions
GusevTimofey Nov 29, 2018
f0f02e6
some additions
GusevTimofey Nov 29, 2018
8c23952
some additions
GusevTimofey Nov 29, 2018
71faa4d
some additions
GusevTimofey Nov 29, 2018
3c7ba11
Remove updating last response peer time, during height msg sending
Bromel777 Nov 29, 2018
f6986b0
Merge remote-tracking branch 'origin/mvp2-planner' into mvp2-planner
Bromel777 Nov 29, 2018
4345a80
some additions
GusevTimofey Nov 29, 2018
e64fc00
Change condition of sending height msg
Bromel777 Nov 29, 2018
5519118
Change sync condition
Bromel777 Nov 29, 2018
67b64f5
Increase time to block period
Bromel777 Nov 29, 2018
19ddfec
Some additions
Bromel777 Nov 29, 2018
9ea08c9
added logic for syncing epoch
GusevTimofey Nov 29, 2018
a666490
added logic for syncing epoch
GusevTimofey Nov 29, 2018
486ab7c
Merge remote-tracking branch 'origin/mvp2-planner' into mvp2-planner
GusevTimofey Nov 29, 2018
6a1ebbd
added logic for syncing epoch
GusevTimofey Nov 29, 2018
2f7247d
Change case class Epoch. Remove height
Bromel777 Nov 29, 2018
10af3a9
Merge remote-tracking branch 'origin/mvp2-planner' into mvp2-planner
Bromel777 Nov 29, 2018
7dd0670
Remove schedule from blockcache
Bromel777 Nov 30, 2018
72ae971
qtyOfPrepareSchedulerSteps 5 -> 1
Bromel777 Nov 30, 2018
40a99e7
travis, hi!
Bromel777 Nov 30, 2018
cb5780c
plannerHeartbeat 2000 -> 200
Bromel777 Nov 30, 2018
dfcb602
allPublicKeys (Set -> List). New logging
Bromel777 Nov 30, 2018
4565716
Fix tests
Bromel777 Nov 30, 2018
8bc8f8c
New logging
Bromel777 Nov 30, 2018
65dc1da
added logic for syncing epoch
GusevTimofey Nov 30, 2018
413029a
added logic for syncing epoch
GusevTimofey Nov 30, 2018
e995f96
added logic for syncing epoch
GusevTimofey Nov 30, 2018
05d29be
Remove hasWritten
Bromel777 Nov 30, 2018
9a70999
Remove lastblock from schedule
Bromel777 Nov 30, 2018
d06a86a
Add logging
Bromel777 Nov 30, 2018
8978ea9
Add logging
Bromel777 Nov 30, 2018
452989e
Add sorting allpublicKeys
Bromel777 Nov 30, 2018
4320dbb
Add deleting peer when get block with schedule
Bromel777 Nov 30, 2018
f5af81d
added some logging
GusevTimofey Dec 3, 2018
12d4b5e
added some logging
GusevTimofey Dec 3, 2018
07c667a
added some logging
GusevTimofey Dec 3, 2018
6d20131
fixed epoch
GusevTimofey Dec 3, 2018
0fe198d
fixed epoch
GusevTimofey Dec 3, 2018
55b555c
fixed epoch
GusevTimofey Dec 3, 2018
5134d9e
fixed epoch
GusevTimofey Dec 3, 2018
4e6c2ed
fixed epoch
GusevTimofey Dec 3, 2018
37e664c
fixed epoch
GusevTimofey Dec 3, 2018
d7f40c8
fixed epoch
GusevTimofey Dec 3, 2018
06a7f40
fixed epoch
GusevTimofey Dec 3, 2018
7fa363a
fixed epoch
GusevTimofey Dec 3, 2018
a07ec57
fixed epoch
GusevTimofey Dec 3, 2018
1af0afe
fixed epoch
GusevTimofey Dec 3, 2018
7df62e0
fixed epoch
GusevTimofey Dec 3, 2018
85ada5d
fixed epoch
GusevTimofey Dec 3, 2018
d3c4046
fixed epoch
GusevTimofey Dec 3, 2018
e577e74
fixed epoch
GusevTimofey Dec 3, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/main/protobuf/my_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ message KeyBlockProtobuf {
bytes currentBlockHash = 4;
repeated TransactionProtobuf transactions = 5;
bytes data = 6;
bytes signature = 7;
repeated bytes scheduler = 8;
bytes publicKey = 9;
}

message TransactionProtobuf {
Expand Down
7 changes: 5 additions & 2 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
port = 9101
otherNodes = []
heartbeat = 60000
plannerHeartbeat = 500
plannerHeartbeat = 2000
blockPeriod = 10000
biasForBlockPeriod = 1000
biasForBlockPeriod = 3000
newBlockchain = false
canPublishBlocks = false
epochMultiplier = 3
network {
maxBlockQtyInBlocksMessage = 10
heightMessageInterval = 10
qtyOfPrepareSchedulerSteps = 5
}
apiSettings {
httpHost = "0.0.0.0"
Expand Down
8 changes: 8 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,14 @@ akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "DEBUG"
logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
debug {
receive = on
autoreceive = on
lifecycle = on
unhandled = on
fsm = on
event-stream = on
}
persistence.journal.plugin = akka.persistence.journal.leveldb
actor.warn-about-java-serializer-usage = false
persistence {
Expand Down
30 changes: 21 additions & 9 deletions src/main/scala/mvp2/actors/Blockchainer.scala
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
package mvp2.actors

import akka.actor.SupervisorStrategy.Resume
import akka.actor.{ActorRef, ActorSelection, OneForOneStrategy, Props, SupervisorStrategy}
import akka.actor.SupervisorStrategy.{Restart, Resume}
import akka.actor.{OneForOneStrategy, SupervisorStrategy}
import akka.actor.{ActorRef, ActorSelection, Props}
import akka.persistence.{PersistentActor, RecoveryCompleted}
import akka.util.ByteString
import com.typesafe.scalalogging.StrictLogging
import mvp2.actors.Planner.Period
import mvp2.data.InnerMessages._
import mvp2.data.NetworkMessages.Blocks
import mvp2.data.InnerMessages.{CurrentBlockchainInfo, ExpectedBlockPublicKeyAndHeight, Get, TimeDelta}
import mvp2.data._
import mvp2.utils.Settings

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.language.postfixOps
import mvp2.utils.{EncodingUtils, Settings}

class Blockchainer(settings: Settings) extends PersistentActor with StrictLogging {

Expand All @@ -26,14 +30,15 @@ class Blockchainer(settings: Settings) extends PersistentActor with StrictLoggin
val publisher: ActorRef = context.actorOf(Props(classOf[Publisher], settings), "publisher")
val informator: ActorSelection = context.system.actorSelection("/user/starter/informator")
val planner: ActorRef = context.actorOf(Props(classOf[Planner], settings), "planner")
var expectedPublicKeyAndHeight: Option[(Long, ByteString)] = None

override def supervisorStrategy: SupervisorStrategy = OneForOneStrategy(){
case _: Exception => Resume
case _: Exception => Restart
}

override def preStart(): Unit =
if (settings.otherNodes.nonEmpty)
context.system.scheduler.scheduleOnce(2 seconds)(networker !
context.system.scheduler.scheduleOnce(5 seconds)(networker !
OwnBlockchainHeight(blockchain.chain.lastOption.map(_.height).getOrElse(-1)))

override def receiveRecover: Receive = {
Expand All @@ -44,10 +49,14 @@ class Blockchainer(settings: Settings) extends PersistentActor with StrictLoggin
case Blocks(blocks) =>
blockCache += blocks
applyBlockFromCache()
case ExpectedBlockPublicKeyAndHeight(height, signature) =>
expectedPublicKeyAndHeight = Some(height, signature)
logger.info(s"Blockchainer got new public key " +
s"${EncodingUtils.encode2Base16(expectedPublicKeyAndHeight.map(_._2).getOrElse(ByteString.empty))}")
case TimeDelta(delta: Long) => currentDelta = delta
case Get => sender ! blockchain
case period: Period =>
logger.info(s"Blockchainer received period for new block with exact timestamp ${period.exactTime}.")
logger.info(s"Blockchainer received period for new block with exact timestamp ${period.begin} ${period.end}.")
nextTurn = period
case CheckRemoteBlockchain(remoteHeight, remote) =>
blockchain.getMissingPart(remoteHeight).foreach(blocks =>
Expand All @@ -60,18 +69,21 @@ class Blockchainer(settings: Settings) extends PersistentActor with StrictLoggin
case Some(block) =>
blockchain += block
blockCache -= block
planner ! block
if (block.scheduler.nonEmpty) planner ! GetNewScheduleFromRemote(block.scheduler)
informator ! CurrentBlockchainInfo(
blockchain.chain.lastOption.map(block => block.height).getOrElse(0),
blockchain.chain.lastOption,
None
)
logger.info(s"Blockchainer apply new keyBlock with height ${block.height}. " +
s"Blockchain's height is ${blockchain.chain.size}.")
planner ! block
publisher ! block
if (blockCache.isEmpty && !isSynced) {
if (isSynced) publisher ! block
if (!isSynced && blockchain.isSynced(settings.blockPeriod)) {
isSynced = true
logger.info(s"Synced done. Sent this message on the Planner and Publisher.")
publisher ! SyncingDone
planner ! SyncingDone
}
applyBlockFromCache()
case None =>
Expand Down
5 changes: 4 additions & 1 deletion src/main/scala/mvp2/actors/Informator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ class Informator(settings: Settings) extends CommonActor {
block.previousKeyBlockHash,
block.currentBlockHash,
block.transactions.size,
block.data
block.data,
block.signature,
block.scheduler,
block.publicKey
)).getOrElse(LightKeyBlock()) :: lightChain
actualInfo = currentBlockchainInfo
}
Expand Down
12 changes: 7 additions & 5 deletions src/main/scala/mvp2/actors/KeyKeeper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ class KeyKeeper extends CommonActor {

override def preStart(): Unit = {
logger.info(s"My public key is: ${EncodingUtils.encode2Base16(ECDSA.compressPublicKey(myKeys.getPublic))}")
context.actorSelection("/user/starter/blockchainer/networker") ! MyPublicKey(myKeys.getPublic)
context.actorSelection("/user/starter/blockchainer/planner") ! MyPublicKey(myKeys.getPublic)
val pubKeyMessage: MyPublicKey = MyPublicKey(ECDSA.compressPublicKey(myKeys.getPublic))
context.actorSelection("/user/starter/blockchainer/networker") ! pubKeyMessage
context.actorSelection("/user/starter/blockchainer/planner") ! pubKeyMessage
context.actorSelection("/user/starter/blockchainer/publisher") ! myKeys
}

override def specialBehavior: Receive = {
case Get => sender ! myKeys.getPublic
case PeerPublicKey(key) =>
logger.info(s"Got public key from remote: ${EncodingUtils.encode2Base16(ECDSA.compressPublicKey(key))} on KeyKeeper.")
allPublicKeys = allPublicKeys + key
logger.info(s"Got public key from remote: ${EncodingUtils.encode2Base16(key)} on KeyKeeper.")
allPublicKeys = allPublicKeys + ECDSA.uncompressPublicKey(key)
}
}
}
34 changes: 20 additions & 14 deletions src/main/scala/mvp2/actors/Networker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import akka.util.ByteString
import mvp2.data.InnerMessages._
import mvp2.data.NetworkMessages._
import mvp2.data.{KeyBlock, KnownPeers, Transaction}
import mvp2.utils.{ECDSA, Settings}
import mvp2.utils.Settings
import scala.language.postfixOps

class Networker(settings: Settings) extends CommonActor {

Expand Down Expand Up @@ -36,14 +37,11 @@ class Networker(settings: Settings) extends CommonActor {

override def specialBehavior: Receive = {
case msgFromNetwork: FromNet =>
peers = peers.updatePeerTime(msgFromNetwork.remote)
msgFromNetwork.message match {
case Peers(peersFromRemote, _) =>
peers = peersFromRemote.foldLeft(peers) {
case (newKnownPeers, peerToAddOrUpdate) =>
updatePeerKey(peerToAddOrUpdate.publicKey)
newKnownPeers.addOrUpdatePeer(peerToAddOrUpdate.addr, peerToAddOrUpdate.publicKey)
.updatePeerTime(msgFromNetwork.remote)
}
case msg@Peers(peersFromRemote, _) =>
peersFromRemote.foreach(peer => updatePeerKey(peer.publicKey))
peers = peers.updatePeers(msg, myAddr, msgFromNetwork.remote)
case Blocks(blocks) =>
if (blocks.nonEmpty) context.parent ! msgFromNetwork.message
case SyncMessageIterators(iterators) =>
Expand All @@ -56,19 +54,27 @@ class Networker(settings: Settings) extends CommonActor {
transactions.foreach(tx => publisher ! tx)
}
case OwnBlockchainHeight(height) => peers.getHeightMessage(height).foreach(udpSender ! _)
case MyPublicKey(key) => myPublicKey = Some(ECDSA.compressPublicKey(key))
case MyPublicKey(key) => myPublicKey = Some(key)
case PrepareScheduler => self ! PrepareSchedulerStep(settings.network.qtyOfPrepareSchedulerSteps)
case PrepareSchedulerStep(currentStep) =>
sendPeers()
if (currentStep != 0)
context.system.scheduler
.scheduleOnce((settings.blockPeriod / 10) milliseconds)(self ! PrepareSchedulerStep(currentStep - 1))
else {
logger.info(s"Before cleaning: ${peers.peersPublicKeyMap.mkString(",")}")
peers = peers.cleanPeersByTime.cleanPeersByIdenticalKnownPeers
logger.info(s"After cleaning: ${peers.peersPublicKeyMap.mkString(",")}")
planner ! KeysForSchedule(peers.getPeersKeys)
}
case transaction: Transaction => peers.getTransactionMsg(transaction).foreach(msg => udpSender ! msg)
case keyBlock: KeyBlock => peers.getBlockMessage(keyBlock).foreach(udpSender ! _)
case RemoteBlockchainMissingPart(blocks, remote) =>
udpSender ! ToNet(Blocks(blocks), remote)
}

def updatePeerKey(serializedKey: ByteString): Unit =
if (!myPublicKey.contains(serializedKey)) {
val newPublicKey: PeerPublicKey = PeerPublicKey(ECDSA.uncompressPublicKey(serializedKey))
keyKeeper ! newPublicKey
planner ! newPublicKey
}
if (!myPublicKey.contains(serializedKey)) keyKeeper ! PeerPublicKey(serializedKey)

def sendPeers(): Unit =
myPublicKey.foreach(key => peers.getPeersMessages(myAddr, key).foreach(msg => udpSender ! msg))
Expand Down
124 changes: 95 additions & 29 deletions src/main/scala/mvp2/actors/Planner.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package mvp2.actors

import java.security.{KeyPair, PublicKey}
import akka.actor.{ActorRef, ActorSelection, Cancellable, Props}
import mvp2.data.InnerMessages.{Get, MyPublicKey, PeerPublicKey}
import akka.actor.{ActorRef, Props}
import akka.actor.{ActorSelection, Cancellable}
import akka.util.ByteString
import com.typesafe.scalalogging.StrictLogging
import mvp2.actors.Planner.Epoch
import mvp2.data.InnerMessages._
import mvp2.data.KeyBlock
import mvp2.utils.{ECDSA, EncodingUtils, Settings}
import mvp2.utils.{EncodingUtils, Settings}
import scala.collection.immutable.SortedMap
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.concurrent.ExecutionContext.Implicits.global
Expand All @@ -15,31 +19,87 @@ class Planner(settings: Settings) extends CommonActor {

var nextTurn: Period = Period(KeyBlock(), settings)
val keyKeeper: ActorRef = context.actorOf(Props(classOf[KeyKeeper]), "keyKeeper")
val myKeys: KeyPair = ECDSA.createKeyPair
var allPublicKeys: Set[PublicKey] = Set.empty[PublicKey]
var myPublicKey: ByteString = ByteString.empty
var allPublicKeys: Set[ByteString] = Set()
var nextPeriod: Period = Period(KeyBlock(), settings)
if (settings.canPublishBlocks)
context.system.scheduler.schedule(0 seconds, settings.plannerHeartbeat milliseconds, self, Tick)
var lastBlock: KeyBlock = KeyBlock()
var epoch: Epoch = Epoch(SortedMap())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 variables with the same meaning
var epoch: Epoch = Epoch(SortedMap())
var nextEpoch: Option[Epoch] = None

var nextEpoch: Option[Epoch] = None
val heartBeat: Cancellable =
context.system.scheduler.schedule(10.seconds, settings.plannerHeartbeat milliseconds, self, Tick)
val publisher: ActorSelection = context.system.actorSelection("/user/starter/blockchainer/publisher")
val networker: ActorSelection = context.system.actorSelection("/user/starter/blockchainer/networker")
var scheduleForWriting: List[ByteString] = List()
var hasWritten: Boolean = false

override def specialBehavior: Receive = {
case GetNewScheduleFromRemote(shedule) =>
logger.info(s"Got new schedule from remote")
epoch = Epoch(lastBlock, shedule.toSet, settings.epochMultiplier)
logger.info(s"Epoch from remote is: $epoch")
case SyncingDone =>
logger.info(s"Synced done on Planner.")
if (settings.canPublishBlocks)
context.system.scheduler.schedule(0 seconds, settings.plannerHeartbeat milliseconds, self, Tick)
context.become(syncedNode)
case keyBlock: KeyBlock => lastBlock = keyBlock
case PeerPublicKey(key) =>
allPublicKeys = allPublicKeys + key
logger.info(s"Set allPublickKeys to1: ${allPublicKeys.map(EncodingUtils.encode2Base16).mkString(",")}")
case MyPublicKey(key) =>
logger.info(s"Set allPublickKeys to2: ${EncodingUtils.encode2Base16(key)}")
allPublicKeys = allPublicKeys + key
myPublicKey = key
if (settings.otherNodes.isEmpty) self ! SyncingDone
case _ =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

planner extends CommonActor, case Any is useless

}

def syncedNode: Receive = {
case keyBlock: KeyBlock =>
logger.info(s"Planner received new keyBlock with height: ${keyBlock.height}.")
if (!hasWritten && keyBlock.scheduler.nonEmpty) hasWritten = true
nextPeriod = Period(keyBlock, settings)
lastBlock = keyBlock
context.parent ! nextPeriod
case PeerPublicKey(key) =>
logger.info(s"Got public key from remote: ${EncodingUtils.encode2Base16(ECDSA.compressPublicKey(key))} on Planner.")
allPublicKeys = allPublicKeys + key
case KeysForSchedule(keys) =>
logger.info(s"Get peers public keys for schedule: ${keys.map(EncodingUtils.encode2Base16).mkString(",")}")
allPublicKeys = (keys :+ myPublicKey).sortWith((a, b) => a.utf8String.compareTo(b.utf8String) > 1).toSet
case MyPublicKey(key) =>
logger.info("Get key")
allPublicKeys = allPublicKeys + key
myPublicKey = key
case Tick if epoch.isDone =>
logger.info(s"epoch.isDone. Height of last block is: ${lastBlock.height}")
hasWritten = false
epoch = Epoch(lastBlock, allPublicKeys, settings.epochMultiplier)
logger.info(s"New epoch is: ${epoch.schedule}")
scheduleForWriting = epoch.schedule.values.toList
checkMyTurn(isFirstBlock = true, scheduleForWriting)
case Tick if nextPeriod.timeToPublish =>
publisher ! Get
logger.info("Planner sent publisher request: time to publish!")
checkMyTurn(isFirstBlock = false, List())
checkScheduleUpdateTime()
logger.info("nextPeriod.timeToPublish. Height of last block is: ${lastBlock.height}")
case Tick if nextPeriod.noBlocksInTime =>
val newPeriod = Period(nextPeriod, settings)
logger.info(s"No blocks in time. Planner added ${newPeriod.exactTime - System.currentTimeMillis} milliseconds.")
nextPeriod = newPeriod
case Tick =>
logger.info("nextPeriod.noBlocksInTime. Height of last block is: ${lastBlock.height}")
epoch = epoch.noBlockInTime
if (!hasWritten) checkMyTurn(isFirstBlock = true, scheduleForWriting)
else checkMyTurn(isFirstBlock = false, List())
nextPeriod = Period(nextPeriod, settings)
context.parent ! nextPeriod
checkScheduleUpdateTime()
case Tick => logger.info("123")
}

def checkMyTurn(isFirstBlock: Boolean, schedule: List[ByteString]): Unit = {
if (epoch.nextBlock._2 == myPublicKey) publisher ! RequestForNewBlock(isFirstBlock, schedule)
context.parent ! ExpectedBlockPublicKeyAndHeight(epoch.nextBlock._1, epoch.nextBlock._2)
epoch = epoch.delete
}

def checkScheduleUpdateTime(): Unit =
if (epoch.prepareNextEpoch) {
networker ! PrepareScheduler
logger.info("epoch.prepareNextEpoch")
}
}

object Planner {
Expand All @@ -62,31 +122,37 @@ object Planner {
}

def apply(previousPeriod: Period, settings: Settings): Period = {
val exactTimestamp: Long = previousPeriod.exactTime + settings.blockPeriod / 2
val exactTimestamp: Long = previousPeriod.exactTime + settings.blockPeriod
Period(exactTimestamp - settings.biasForBlockPeriod, exactTimestamp, exactTimestamp + settings.biasForBlockPeriod)
}
}

case class Epoch(schedule: Map[Long, PublicKey]) {
case class Epoch(schedule: SortedMap[Long, ByteString]) {

def nextBlock: (Long, PublicKey) = schedule.head
def nextBlock: (Long, ByteString) = schedule.head

def delete: Epoch = this.copy(schedule - schedule.head._1)
def delete: Epoch = this.copy(schedule.tail)

def delete(height: Long): Epoch = this.copy(schedule = schedule.drop(height.toInt))
def delete(height: Long): Epoch = this.copy(schedule.drop(height.toInt))

def noBlockInTime: Epoch = this.copy((schedule - schedule.head._1).map(each => (each._1 - 1, each._2)))
def noBlockInTime: Epoch = this.copy(schedule.map(each => (each._1 - 1, each._2)))

def isDone: Boolean = this.schedule.isEmpty

def prepareNextEpoch: Boolean = schedule.size <= 2

override def toString: String = this.schedule.map(epochInfo =>
s"Height: ${epochInfo._1} -> ${EncodingUtils.encode2Base16(epochInfo._2)}").mkString(",")
}

object Epoch {
def apply(lastKeyBlock: KeyBlock, publicKeys: List[PublicKey], multiplier: Int = 1): Epoch = {
object Epoch extends StrictLogging {
def apply(lastKeyBlock: KeyBlock, publicKeys: Set[ByteString], multiplier: Int = 1): Epoch = {
val startingHeight: Long = lastKeyBlock.height + 1
val numberOfBlocksInEpoch: Int = publicKeys.size * multiplier
var schedule: Map[Long, PublicKey] =
(for (i <- startingHeight until startingHeight + numberOfBlocksInEpoch)
yield i).zip(publicKeys).toMap[Long, PublicKey]
val keysSchedule: List[ByteString] = (1 to multiplier).foldLeft(publicKeys.toList) { case (a, _) => a ::: a }
val schedule: SortedMap[Long, ByteString] =
SortedMap((for (i <- startingHeight until startingHeight + numberOfBlocksInEpoch)
yield i).zip(keysSchedule): _*)
Epoch(schedule)
}
}
Expand Down
Loading