@@ -41,6 +41,7 @@ import scala.util.Random
4141import scala .collection .mutable
4242import scala .util .Try
4343import scala .util .Success
44+ import java .util .concurrent .atomic .AtomicInteger
4445
4546// scalastyle:off file.size.limit
4647class FastSync (
@@ -115,8 +116,11 @@ class FastSync(
115116 }
116117 }
117118
119+ private val actorCounter = new AtomicInteger
120+ private def countActor : Int = actorCounter.incrementAndGet
121+
118122 // scalastyle:off number.of.methods
119- private class SyncingHandler (initialSyncState : SyncState ) {
123+ private class SyncingHandler (initialSyncState : SyncState , var masterPeer : Option [ Peer ] = None ) {
120124
121125 // not part of syncstate as we do not want to persist is.
122126 private var stateSyncRestartRequested = false
@@ -128,7 +132,6 @@ class FastSync(
128132 private var assignedHandlers : Map [ActorRef , Peer ] = Map .empty
129133 private var peerRequestsTime : Map [Peer , Instant ] = Map .empty
130134
131- private var masterPeer : Option [Peer ] = None
132135 // TODO ETCM-701 get rid of state and move skeleton download to a separate actor
133136 private val blockHeadersQueue : mutable.Queue [HeaderRange ] = mutable.Queue .empty
134137 private var currentSkeletonState : Option [HeaderSkeleton ] = None
@@ -138,13 +141,13 @@ class FastSync(
138141 private var requestedBlockBodies : Map [ActorRef , Seq [ByteString ]] = Map .empty
139142 private var requestedReceipts : Map [ActorRef , Seq [ByteString ]] = Map .empty
140143
141- private val syncStateStorageActor = context.actorOf(Props [StateStorageActor ](), " state-storage" )
144+ private val syncStateStorageActor = context.actorOf(Props [StateStorageActor ](), s " $countActor - state-storage" )
142145 syncStateStorageActor ! fastSyncStateStorage
143146
144147 private val branchResolver = context.actorOf(
145148 FastSyncBranchResolverActor
146149 .props(self, peerEventBus, etcPeerManager, blockchain, blacklist, syncConfig, appStateStorage, scheduler),
147- " fast-sync-branch-resolver"
150+ s " $countActor - fast-sync-branch-resolver"
148151 )
149152
150153 private val syncStateScheduler = context.actorOf(
@@ -157,7 +160,7 @@ class FastSync(
157160 blacklist,
158161 scheduler
159162 ),
160- " state-scheduler"
163+ s " $countActor - state-scheduler"
161164 )
162165
163166 // Delay before starting to persist snapshot. It should be 0, as the presence of it marks that fast sync was started
@@ -178,7 +181,7 @@ class FastSync(
178181 syncState = syncState.copy(downloadedNodesCount = saved, totalNodesCount = saved + missing)
179182 }
180183
181- def receive : Receive = handlePeerListMessages orElse handleStatus orElse {
184+ def receive : Receive = handlePeerListMessages orElse handleStatus orElse handleRequestFailure orElse {
182185 case UpdatePivotBlock (reason) => updatePivotBlock(reason)
183186 case WaitingForNewTargetBlock =>
184187 log.info(" State sync stopped until receiving new pivot block" )
@@ -190,10 +193,15 @@ class FastSync(
190193 case StateSyncFinished =>
191194 syncState = syncState.copy(stateSyncFinished = true )
192195 processSyncing()
196+ }
197+
198+ def handleRequestFailure : Receive = {
193199 case PeerRequestHandler .RequestFailed (peer, reason) =>
194200 handleRequestFailure(peer, sender(), FastSyncRequestFailed (reason))
195- case Terminated (ref) if assignedHandlers.contains(ref) =>
196- handleRequestFailure(assignedHandlers(ref), ref, PeerActorTerminated )
201+ case Terminated (ref) =>
202+ assignedHandlers.get(ref).foreach {
203+ handleRequestFailure(_, ref, PeerActorTerminated )
204+ }
197205 }
198206
199207 // TODO ETCM-701 will be moved to separate actor and refactored
@@ -340,12 +348,13 @@ class FastSync(
340348 batchFailuresCount += 1
341349 if (batchFailuresCount > fastSyncMaxBatchRetries) {
342350 log.info(" Max number of allowed failures reached. Switching branch and master peer." )
343- handleRewind(header, masterPeer.get, fastSyncBlockValidationN, blacklistDuration)
351+
352+ blockHeadersQueue.dequeueAll(_ => true )
353+
354+ handleRewind(header, masterPeer.get, fastSyncBlockValidationN, blacklistDuration, continueSyncing = false )
344355
345356 // Start branch resolution and wait for response from the FastSyncBranchResolver actor.
346357 context become waitingForBranchResolution
347- currentSkeletonState = None
348- blockHeadersQueue.dequeueAll(_ => true )
349358 branchResolver ! FastSyncBranchResolverActor .StartBranchResolver
350359 }
351360 }
@@ -362,19 +371,33 @@ class FastSync(
362371 }
363372 }
364373
365- private def waitingForBranchResolution : Receive = handleStatus orElse {
374+ private def waitingForBranchResolution : Receive = handleStatus orElse handleRequestFailure orElse {
366375 case FastSyncBranchResolverActor .BranchResolvedSuccessful (firstCommonBlockNumber, newMasterPeer) =>
376+ log.debug(
377+ s " Resolved branch with first common block number $firstCommonBlockNumber for new master peer $newMasterPeer"
378+ )
367379 // Reset the batch failures count
368380 batchFailuresCount = 0
369381
382+ context.children.foreach { child =>
383+ log.debug(s " Unwatching and killing $child" )
384+ context.unwatch(child)
385+ child ! PoisonPill
386+ }
387+
370388 // Restart syncing from the valid block available in state.
371- syncState = syncState.copy(
372- bestBlockHeaderNumber = firstCommonBlockNumber,
373- nextBlockToFullyValidate = firstCommonBlockNumber + 1
389+ log.debug(" Starting with fresh SyncingHandler" )
390+ val syncingHandler = new SyncingHandler (
391+ syncState.copy(
392+ bestBlockHeaderNumber = firstCommonBlockNumber,
393+ nextBlockToFullyValidate = firstCommonBlockNumber + 1 ,
394+ pivotBlockUpdateFailures = 0
395+ ),
396+ masterPeer = Some (newMasterPeer)
374397 )
375- masterPeer = Some (newMasterPeer )
376- context become receive
377- processSyncing()
398+ context.become(syncingHandler.receive )
399+ syncingHandler.processSyncing()
400+
378401 case _ : FastSyncBranchResolverActor .BranchResolutionFailed =>
379402 // there isn't much we can do if we don't find a branch/peer to continue syncing, so let's try again
380403 branchResolver ! FastSyncBranchResolverActor .StartBranchResolver
@@ -390,7 +413,8 @@ class FastSync(
390413 log.info(" Asking for new pivot block" )
391414 val pivotBlockSelector = {
392415 context.actorOf(
393- PivotBlockSelector .props(etcPeerManager, peerEventBus, syncConfig, scheduler, context.self, blacklist)
416+ PivotBlockSelector .props(etcPeerManager, peerEventBus, syncConfig, scheduler, context.self, blacklist),
417+ s " $countActor-pivot-block-selector-update "
394418 )
395419 }
396420 pivotBlockSelector ! PivotBlockSelector .SelectPivotBlock
@@ -408,7 +432,7 @@ class FastSync(
408432 }
409433
410434 def waitingForPivotBlockUpdate (updateReason : PivotBlockUpdateReason ): Receive =
411- handlePeerListMessages orElse handleStatus orElse {
435+ handlePeerListMessages orElse handleStatus orElse handleRequestFailure orElse {
412436 case PivotBlockSelector .Result (pivotBlockHeader)
413437 if newPivotIsGoodEnough(pivotBlockHeader, syncState, updateReason) =>
414438 log.info(" New pivot block with number {} received" , pivotBlockHeader.number)
@@ -507,7 +531,9 @@ class FastSync(
507531 }
508532
509533 private def removeRequestHandler (handler : ActorRef ): Unit = {
534+ log.debug(s " Removing request handler ${handler.path}" )
510535 context unwatch handler
536+ skeletonHandler = skeletonHandler.filter(_ != handler)
511537 assignedHandlers -= handler
512538 }
513539
@@ -560,17 +586,23 @@ class FastSync(
560586 syncState = syncState.updateNextBlockToValidate(header, K , X )
561587 }
562588
563- private def handleRewind (header : BlockHeader , peer : Peer , N : Int , duration : FiniteDuration ): Unit = {
589+ private def handleRewind (
590+ header : BlockHeader ,
591+ peer : Peer ,
592+ N : Int ,
593+ duration : FiniteDuration ,
594+ continueSyncing : Boolean = true
595+ ): Unit = {
564596 blacklist.add(peer.id, duration, BlockHeaderValidationFailed )
565597 if (header.number <= syncState.safeDownloadTarget) {
566598 discardLastBlocks(header.number, N )
567599 syncState = syncState.updateDiscardedBlocks(header, N )
568600 if (header.number >= syncState.pivotBlock.number) {
569601 updatePivotBlock(LastBlockValidationFailed )
570- } else {
602+ } else if (continueSyncing) {
571603 processSyncing()
572604 }
573- } else {
605+ } else if (continueSyncing) {
574606 processSyncing()
575607 }
576608 }
@@ -685,6 +717,9 @@ class FastSync(
685717 }
686718
687719 private def handleRequestFailure (peer : Peer , handler : ActorRef , reason : BlacklistReason ): Unit = {
720+ if (skeletonHandler == Some (handler))
721+ currentSkeletonState = None
722+
688723 removeRequestHandler(handler)
689724
690725 requestedHeaders.get(peer).foreach(blockHeadersQueue.enqueue)
@@ -816,6 +851,16 @@ class FastSync(
816851
817852 def processSyncing (): Unit = {
818853 FastSyncMetrics .measure(syncState)
854+ log.debug(
855+ " Start of processSyncing: {}" ,
856+ Map (
857+ " fullySynced" -> fullySynced,
858+ " blockchainDataToDownload" -> blockchainDataToDownload,
859+ " noBlockchainWorkRemaining" -> noBlockchainWorkRemaining,
860+ " stateSyncFinished" -> syncState.stateSyncFinished,
861+ " notInTheMiddleOfUpdate" -> notInTheMiddleOfUpdate
862+ )
863+ )
819864 if (fullySynced) {
820865 finish()
821866 } else {
@@ -887,8 +932,9 @@ class FastSync(
887932 requestSkeletonHeaders(peer)
888933 } else {
889934 log.debug(
890- " Nothing to request. Waiting for responses for [{}] sent requests." ,
891- assignedHandlers.size + skeletonHandler.size
935+ " Nothing to request. Waiting for responses from: {} and/or {}" ,
936+ assignedHandlers.keys,
937+ skeletonHandler
892938 )
893939 }
894940 }
@@ -911,7 +957,8 @@ class FastSync(
911957 peerEventBus,
912958 requestMsg = GetReceipts (receiptsToGet),
913959 responseMsgCode = Codes .ReceiptsCode
914- )
960+ ),
961+ s " $countActor-peer-request-handler-receipts "
915962 )
916963
917964 context watch handler
@@ -934,7 +981,8 @@ class FastSync(
934981 peerEventBus,
935982 requestMsg = GetBlockBodies (blockBodiesToGet),
936983 responseMsgCode = Codes .BlockBodiesCode
937- )
984+ ),
985+ s " $countActor-peer-request-handler-block-bodies "
938986 )
939987
940988 context watch handler
@@ -962,7 +1010,8 @@ class FastSync(
9621010 peerEventBus,
9631011 requestMsg = GetBlockHeaders (Left (toRequest.from), toRequest.limit, skip = 0 , reverse = false ),
9641012 responseMsgCode = Codes .BlockHeadersCode
965- )
1013+ ),
1014+ s " $countActor-peer-request-handler-block-headers "
9661015 )
9671016
9681017 context watch handler
@@ -1026,7 +1075,8 @@ class FastSync(
10261075 peerEventBus,
10271076 requestMsg = msg,
10281077 responseMsgCode = Codes .BlockHeadersCode
1029- )
1078+ ),
1079+ s " $countActor-peer-request-handler-block-headers-skeleton "
10301080 )
10311081
10321082 context watch handler
0 commit comments