@@ -80,6 +80,7 @@ import Control.Monad.Class.MonadTimer (MonadTimer)
80
80
import Control.Monad.Except (runExcept , throwError )
81
81
import Control.Tracer
82
82
import Data.Foldable (traverse_ )
83
+ import Data.Function (fix )
83
84
import Data.Functor ((<&>) )
84
85
import Data.Kind (Type )
85
86
import Data.Map.Strict (Map )
@@ -114,7 +115,7 @@ import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.HistoricityCh
114
115
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.InFutureCheck as InFutureCheck
115
116
import qualified Ouroboros.Consensus.MiniProtocol.ChainSync.Client.Jumping as Jumping
116
117
import Ouroboros.Consensus.MiniProtocol.ChainSync.Client.State
117
- import Ouroboros.Consensus.Node.GsmState (GsmState (.. ))
118
+ import Ouroboros.Consensus.Node.GsmState (GsmState (.. ), GsmTransition ( .. ) )
118
119
import Ouroboros.Consensus.Node.NetworkProtocolVersion
119
120
import Ouroboros.Consensus.Protocol.Abstract
120
121
import Ouroboros.Consensus.Storage.ChainDB (ChainDB )
@@ -404,14 +405,16 @@ bracketChainSyncClient
404
405
cschJumping <- newTVarIO (Disengaged DisengagedDone )
405
406
let handle = ChainSyncClientHandle {
406
407
cschGDDKill = throwTo tid DensityTooLow
407
- , cschOnGsmStateChanged = updateLopBucketConfig lopBucket
408
+ , cschOnGsmStateChanged = \ gsmTransition time -> do
409
+ updateLopBucketConfig lopBucket gsmTransition time
410
+ pure $ pure ()
408
411
, cschState
409
412
, cschJumping
410
413
, cschJumpInfo
411
414
}
412
415
insertHandle = atomicallyWithMonotonicTime $ \ time -> do
413
416
initialGsmState <- getGsmState
414
- updateLopBucketConfig lopBucket initialGsmState time
417
+ initializeLopBucketConfig lopBucket initialGsmState time
415
418
cschcAddHandle varHandles peer handle
416
419
deleteHandle = atomically $ cschcRemoveHandle varHandles peer
417
420
bracket_ insertHandle deleteHandle $ f Jumping. noJumping
@@ -425,16 +428,39 @@ bracketChainSyncClient
425
428
tid <- myThreadId
426
429
atomicallyWithMonotonicTime $ \ time -> do
427
430
initialGsmState <- getGsmState
428
- updateLopBucketConfig lopBucket initialGsmState time
431
+ initializeLopBucketConfig lopBucket initialGsmState time
429
432
cschJumpInfo <- newTVar Nothing
430
433
context <- Jumping. makeContext varHandles jumpSize tracerCsj
431
- Jumping. registerClient context peer cschState $ \ cschJumping -> ChainSyncClientHandle
432
- { cschGDDKill = throwTo tid DensityTooLow
433
- , cschOnGsmStateChanged = updateLopBucketConfig lopBucket
434
- , cschState
435
- , cschJumping
436
- , cschJumpInfo
437
- }
434
+ Jumping. registerClient context peer cschState $ \ cschJumping ->
435
+ fix $ \ handle -> ChainSyncClientHandle -- NB @handle@ only occurs under a lambda
436
+ { cschGDDKill = throwTo tid DensityTooLow
437
+ , cschOnGsmStateChanged = \ gsmTransition time' -> do
438
+ updateLopBucketConfig lopBucket gsmTransition time'
439
+ let peerContext =
440
+ context {Jumping. peer = peer, Jumping. handle = handle}
441
+ -- 'cschJumpInfo' does not need to be reset. The ChainSync
442
+ -- client constantly updates it, even when its not
443
+ -- registered with CSJ. That's necessary: when this peer
444
+ -- /re-registers/, it might immediately be the Dynamo, and
445
+ -- the Dynamo must not have 'Nothing' in its
446
+ -- 'cschJumpInfo'.
447
+ case gsmTransition of
448
+ PreSyncingSyncing -> pure $ pure ()
449
+ SyncingCaughtUp -> do
450
+ writeTVar cschJumping $ Disengaged DisengagedDone
451
+ -- The GSM only transitions to CaughtUp if all peers
452
+ -- have send MsgAwaitReply, so DisengagedDone is
453
+ -- correct here.
454
+ mbEv <- Jumping. unregisterClient peerContext
455
+ pure $ traverse_ (traceWith (Jumping. tracer peerContext)) mbEv
456
+ CaughtUpPreSyncing -> do
457
+ mbEv <- Jumping. reregisterClient peerContext
458
+ pure $ traverse_ (traceWith (Jumping. tracer peerContext)) mbEv
459
+ SyncingPreSyncing -> pure $ pure ()
460
+ , cschState
461
+ , cschJumping
462
+ , cschJumpInfo
463
+ }
438
464
439
465
releaseContext (peerContext, _mbEv) = do
440
466
mbEv <- atomically $ Jumping. unregisterClient peerContext
@@ -444,33 +470,44 @@ bracketChainSyncClient
444
470
invalidBlockRejector
445
471
tracer version pipelining getIsInvalidBlock (csCandidate <$> readTVar varState)
446
472
447
- -- | Update the configuration of the bucket to match the given GSM state.
448
- -- NOTE: The new level is currently the maximal capacity of the bucket;
449
- -- maybe we want to change that later.
450
- updateLopBucketConfig :: LeakyBucket. Handlers m -> GsmState -> Time -> STM m ()
451
- updateLopBucketConfig lopBucket gsmState =
452
- LeakyBucket. updateConfig lopBucket $ \ _ ->
453
- let config = lopBucketConfig gsmState in
454
- (LeakyBucket. capacity config, config)
455
-
456
- -- | Wrapper around 'LeakyBucket.execAgainstBucket' that handles the
457
- -- disabled bucket by running the given action with dummy handlers.
458
- lopBucketConfig :: GsmState -> LeakyBucket. Config m
459
- lopBucketConfig gsmState =
460
- case (gsmState, csBucketConfig) of
461
- (Syncing , ChainSyncLoPBucketEnabled ChainSyncLoPBucketEnabledConfig {csbcCapacity, csbcRate}) ->
462
- LeakyBucket. Config
463
- { capacity = fromInteger $ csbcCapacity,
464
- rate = csbcRate,
465
- onEmpty = throwIO EmptyBucket ,
466
- fillOnOverflow = True
467
- }
468
- -- NOTE: If we decide to slow the bucket down when “almost caught-up”,
469
- -- we should add a state to the GSM and corresponding configuration
470
- -- fields and a bucket config here.
471
- (_, ChainSyncLoPBucketDisabled ) -> LeakyBucket. dummyConfig
472
- (PreSyncing , ChainSyncLoPBucketEnabled _) -> LeakyBucket. dummyConfig
473
- (CaughtUp , ChainSyncLoPBucketEnabled _) -> LeakyBucket. dummyConfig
473
+ mkLopConfig ChainSyncLoPBucketEnabledConfig {csbcCapacity, csbcRate} =
474
+ LeakyBucket. Config {
475
+ capacity = fromInteger csbcCapacity
476
+ , rate = csbcRate
477
+ , onEmpty = throwIO EmptyBucket
478
+ , fillOnOverflow = True
479
+ }
480
+
481
+ -- | Update the configuration of the bucket as part of the given GSM transition.
482
+ updateLopBucketConfig :: LeakyBucket. Handlers m -> GsmTransition -> Time -> STM m ()
483
+ updateLopBucketConfig lopBucket gsmTransition =
484
+ LeakyBucket. updateConfig lopBucket $ \ (oldLevel, _oldConfig) ->
485
+ case csBucketConfig of
486
+ ChainSyncLoPBucketDisabled -> (oldLevel, LeakyBucket. dummyConfig)
487
+ ChainSyncLoPBucketEnabled csbc ->
488
+ let config = mkLopConfig csbc
489
+ in
490
+ case gsmTransition of
491
+ PreSyncingSyncing -> (oldLevel, config)
492
+ SyncingCaughtUp -> (oldLevel, LeakyBucket. dummyConfig)
493
+ CaughtUpPreSyncing -> (LeakyBucket. capacity config, config)
494
+ SyncingPreSyncing -> (oldLevel, config)
495
+
496
+ initializeLopBucketConfig :: LeakyBucket. Handlers m -> GsmState -> Time -> STM m ()
497
+ initializeLopBucketConfig lopBucket gsmState =
498
+ LeakyBucket. updateConfig lopBucket $ \ (uninitializedLevel, _oldConfig) ->
499
+ let disabled = (uninitializedLevel, LeakyBucket. dummyConfig)
500
+ in
501
+ case csBucketConfig of
502
+ ChainSyncLoPBucketDisabled -> disabled
503
+ ChainSyncLoPBucketEnabled csbc ->
504
+ let config = mkLopConfig csbc
505
+ enabled = (LeakyBucket. capacity config, config)
506
+ in
507
+ case gsmState of
508
+ PreSyncing -> enabled
509
+ Syncing -> enabled
510
+ CaughtUp -> disabled
474
511
475
512
-- Our task: after connecting to an upstream node, try to maintain an
476
513
-- up-to-date header-only fragment representing their chain. We maintain
0 commit comments