Skip to content

Commit 66cd5d7

Browse files
committed
LedgerDB: implement predictable snapshotting
1 parent a878c90 commit 66cd5d7

File tree

9 files changed

+272
-251
lines changed

9 files changed

+272
-251
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
### Breaking
2+
3+
- LedgerDB: implemented *predictable* snapshots, i.e. different nodes with the
4+
same configuration will now create snapshots for the same slots.
5+
6+
See 'SnapshotPolicyArgs' for more details.

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl.hs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
164164
traceWith tracer $ TraceOpenEvent StartedOpeningLgrDB
165165
(ledgerDbGetVolatileSuffix, setGetCurrentChainForLedgerDB) <-
166166
mkLedgerDbGetVolatileSuffix
167-
(lgrDB, replayed) <-
167+
(lgrDB, _replayed) <-
168168
LedgerDB.openDB
169169
argsLgrDb
170170
(ImmutableDB.streamAPI immutableDB)
@@ -289,8 +289,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
289289
, intGarbageCollect = \slot -> getEnv h $ \e -> do
290290
Background.garbageCollectBlocks e slot
291291
LedgerDB.garbageCollect (cdbLedgerDB e) slot
292-
, intTryTakeSnapshot = getEnv h $ \env' ->
293-
void $ LedgerDB.tryTakeSnapshot (cdbLedgerDB env') Nothing maxBound
292+
, intTryTakeSnapshot = getEnv h $ LedgerDB.tryTakeSnapshot . cdbLedgerDB
294293
, intAddBlockRunner = getEnv h (Background.addBlockRunner addBlockTestFuse)
295294
, intKillBgThreads = varKillBgThreads
296295
}
@@ -301,7 +300,7 @@ openDBInternal args launchBgTasks = runWithTempRegistry $ do
301300
(castPoint $ AF.anchorPoint chain)
302301
(castPoint $ AF.headPoint chain)
303302

304-
when launchBgTasks $ Background.launchBgTasks env replayed
303+
when launchBgTasks $ Background.launchBgTasks env
305304

306305
return (chainDB, testing, env)
307306

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/ChainDB/Impl/Background.hs

Lines changed: 18 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
{-# LANGUAGE BangPatterns #-}
22
{-# LANGUAGE DeriveAnyClass #-}
33
{-# LANGUAGE DeriveGeneric #-}
4-
{-# LANGUAGE DerivingStrategies #-}
54
{-# LANGUAGE FlexibleContexts #-}
65
{-# LANGUAGE LambdaCase #-}
76
{-# LANGUAGE NamedFieldPuns #-}
87
{-# LANGUAGE RecordWildCards #-}
98
{-# LANGUAGE ScopedTypeVariables #-}
10-
{-# LANGUAGE TupleSections #-}
119

1210
-- | Background tasks:
1311
--
@@ -52,7 +50,6 @@ import Data.Sequence.Strict (StrictSeq (..))
5250
import qualified Data.Sequence.Strict as Seq
5351
import Data.Time.Clock
5452
import Data.Void (Void)
55-
import Data.Word
5653
import GHC.Generics (Generic)
5754
import GHC.Stack (HasCallStack)
5855
import Ouroboros.Consensus.Block
@@ -75,7 +72,7 @@ import qualified Ouroboros.Consensus.Storage.VolatileDB as VolatileDB
7572
import Ouroboros.Consensus.Util
7673
import Ouroboros.Consensus.Util.Condense
7774
import Ouroboros.Consensus.Util.IOLike
78-
import Ouroboros.Consensus.Util.STM (Watcher (..), forkLinkedWatcher)
75+
import Ouroboros.Consensus.Util.STM (Watcher (..), blockUntilJust, forkLinkedWatcher)
7976
import Ouroboros.Network.AnchoredFragment (AnchoredSeq (..))
8077
import qualified Ouroboros.Network.AnchoredFragment as AF
8178

@@ -92,15 +89,13 @@ launchBgTasks ::
9289
, HasHardForkHistory blk
9390
) =>
9491
ChainDbEnv m blk ->
95-
-- | Number of immutable blocks replayed on ledger DB startup
96-
Word64 ->
9792
m ()
98-
launchBgTasks cdb@CDB{..} replayed = do
93+
launchBgTasks cdb@CDB{..} = do
9994
!addBlockThread <-
10095
launch "ChainDB.addBlockRunner" $
10196
addBlockRunner cdbChainSelFuse cdb
10297

103-
ledgerDbTasksTrigger <- newLedgerDbTasksTrigger replayed
98+
ledgerDbTasksTrigger <- newLedgerDbTasksTrigger
10499
!ledgerDbMaintenaceThread <-
105100
forkLinkedWatcher cdbRegistry "ChainDB.ledgerDbTaskWatcher" $
106101
ledgerDbTaskWatcher cdb ledgerDbTasksTrigger
@@ -259,20 +254,18 @@ copyToImmutableDBRunner cdb@CDB{..} ledgerDbTasksTrigger gcSchedule fuse = do
259254
copyAndTrigger :: m ()
260255
copyAndTrigger = do
261256
-- Wait for 'cdbChain' to become longer than 'getCurrentChain'.
262-
numToWrite <- atomically $ do
257+
atomically $ do
263258
curChain <- icWithoutTime <$> readTVar cdbChain
264259
curChainVolSuffix <- Query.getCurrentChain cdb
265-
let numToWrite = AF.length curChain - AF.length curChainVolSuffix
266-
check $ numToWrite > 0
267-
return $ fromIntegral numToWrite
260+
check $ AF.length curChain > AF.length curChainVolSuffix
268261

269262
-- Copy blocks to ImmutableDB
270263
--
271264
-- This is a synchronous operation: when it returns, the blocks have been
272265
-- copied to disk (though not flushed, necessarily).
273266
gcSlotNo <- withFuse fuse (copyToImmutableDB cdb)
274267

275-
triggerLedgerDbTasks ledgerDbTasksTrigger gcSlotNo numToWrite
268+
triggerLedgerDbTasks ledgerDbTasksTrigger gcSlotNo
276269
scheduleGC' gcSlotNo
277270

278271
scheduleGC' :: WithOrigin SlotNo -> m ()
@@ -294,45 +287,20 @@ copyToImmutableDBRunner cdb@CDB{..} ledgerDbTasksTrigger gcSchedule fuse = do
294287
-- | Trigger for the LedgerDB maintenance tasks, namely whenever the immutable
295288
-- DB tip slot advances when we finish copying blocks to it.
296289
newtype LedgerDbTasksTrigger m
297-
= LedgerDbTasksTrigger (StrictTVar m LedgerDbTaskState)
290+
= LedgerDbTasksTrigger (StrictTVar m (WithOrigin SlotNo))
298291

299-
data LedgerDbTaskState = LedgerDbTaskState
300-
{ ldbtsImmTip :: !(WithOrigin SlotNo)
301-
, ldbtsPrevSnapshotTime :: !(Maybe Time)
302-
, ldbtsBlocksSinceLastSnapshot :: !Word64
303-
}
304-
deriving stock Generic
305-
deriving anyclass NoThunks
306-
307-
newLedgerDbTasksTrigger ::
308-
IOLike m =>
309-
-- | Number of blocks replayed.
310-
Word64 ->
311-
m (LedgerDbTasksTrigger m)
312-
newLedgerDbTasksTrigger replayed = LedgerDbTasksTrigger <$> newTVarIO st
313-
where
314-
st =
315-
LedgerDbTaskState
316-
{ ldbtsImmTip = Origin
317-
, ldbtsPrevSnapshotTime = Nothing
318-
, ldbtsBlocksSinceLastSnapshot = replayed
319-
}
292+
newLedgerDbTasksTrigger :: IOLike m => m (LedgerDbTasksTrigger m)
293+
newLedgerDbTasksTrigger = LedgerDbTasksTrigger <$> newTVarIO Origin
320294

321295
triggerLedgerDbTasks ::
322296
forall m.
323297
IOLike m =>
324298
LedgerDbTasksTrigger m ->
325299
-- | New tip of the ImmutableDB.
326300
WithOrigin SlotNo ->
327-
-- | Number of blocks written to the ImmutableDB.
328-
Word64 ->
329301
m ()
330-
triggerLedgerDbTasks (LedgerDbTasksTrigger varSt) immTip numWritten =
331-
atomically $ modifyTVar varSt $ \st ->
332-
st
333-
{ ldbtsImmTip = immTip
334-
, ldbtsBlocksSinceLastSnapshot = ldbtsBlocksSinceLastSnapshot st + numWritten
335-
}
302+
triggerLedgerDbTasks (LedgerDbTasksTrigger varSt) =
303+
atomically . writeTVar varSt
336304

337305
-- | Run LedgerDB maintenance tasks when 'LedgerDbTasksTrigger' changes.
338306
--
@@ -344,38 +312,16 @@ ledgerDbTaskWatcher ::
344312
IOLike m =>
345313
ChainDbEnv m blk ->
346314
LedgerDbTasksTrigger m ->
347-
Watcher m LedgerDbTaskState (WithOrigin SlotNo)
315+
Watcher m SlotNo SlotNo
348316
ledgerDbTaskWatcher CDB{..} (LedgerDbTasksTrigger varSt) =
349317
Watcher
350-
{ wFingerprint = ldbtsImmTip
318+
{ wFingerprint = id
351319
, wInitial = Nothing
352-
, wReader = readTVar varSt
353-
, wNotify =
354-
\LedgerDbTaskState
355-
{ ldbtsImmTip
356-
, ldbtsBlocksSinceLastSnapshot = blocksSinceLast
357-
, ldbtsPrevSnapshotTime = prevSnapTime
358-
} ->
359-
whenJust (withOriginToMaybe ldbtsImmTip) $ \slotNo -> do
360-
LedgerDB.tryFlush cdbLedgerDB
361-
362-
now <- getMonotonicTime
363-
LedgerDB.SnapCounters
364-
{ prevSnapshotTime
365-
, ntBlocksSinceLastSnap
366-
} <-
367-
LedgerDB.tryTakeSnapshot
368-
cdbLedgerDB
369-
((,now) <$> prevSnapTime)
370-
blocksSinceLast
371-
atomically $ modifyTVar varSt $ \st ->
372-
st
373-
{ ldbtsBlocksSinceLastSnapshot =
374-
ldbtsBlocksSinceLastSnapshot st - blocksSinceLast + ntBlocksSinceLastSnap
375-
, ldbtsPrevSnapshotTime = prevSnapshotTime
376-
}
377-
378-
LedgerDB.garbageCollect cdbLedgerDB slotNo
320+
, wReader = blockUntilJust $ withOriginToMaybe <$> readTVar varSt
321+
, wNotify = \slotNo -> do
322+
LedgerDB.tryFlush cdbLedgerDB
323+
LedgerDB.tryTakeSnapshot cdbLedgerDB
324+
LedgerDB.garbageCollect cdbLedgerDB slotNo
379325
}
380326

381327
{-------------------------------------------------------------------------------

ouroboros-consensus/src/ouroboros-consensus/Ouroboros/Consensus/Storage/LedgerDB/API.hs

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -147,9 +147,6 @@ module Ouroboros.Consensus.Storage.LedgerDB.API
147147
, withPrivateTipForker
148148
, withTipForker
149149

150-
-- * Snapshots
151-
, SnapCounters (..)
152-
153150
-- * Streaming
154151
, StreamingBackend (..)
155152
, Yield
@@ -166,7 +163,6 @@ import Codec.CBOR.Decoding
166163
import Codec.CBOR.Read
167164
import Codec.Serialise
168165
import qualified Control.Monad as Monad
169-
import Control.Monad.Class.MonadTime.SI
170166
import Control.Monad.Except
171167
import Control.ResourceRegistry
172168
import Control.Tracer
@@ -271,18 +267,12 @@ data LedgerDB m l blk = LedgerDB
271267
-- * The set of previously applied points.
272268
, tryTakeSnapshot ::
273269
l ~ ExtLedgerState blk =>
274-
Maybe (Time, Time) ->
275-
Word64 ->
276-
m SnapCounters
270+
m ()
277271
-- ^ If the provided arguments indicate so (based on the SnapshotPolicy with
278272
-- which this LedgerDB was opened), take a snapshot and delete stale ones.
279273
--
280-
-- The arguments are:
281-
--
282-
-- - If a snapshot has been taken already, the time at which it was taken
283-
-- and the current time.
284-
--
285-
-- - How many blocks have been processed since the last snapshot.
274+
-- For V1, this must not be called concurrently with 'garbageCollect' and/or
275+
-- 'tryFlush'.
286276
, tryFlush :: m ()
287277
-- ^ Flush V1 in-memory LedgerDB state to disk, if possible. This is a no-op
288278
-- for implementations that do not need an explicit flush function.
@@ -429,18 +419,6 @@ getReadOnlyForker ::
429419
m (Either GetForkerError (ReadOnlyForker m l blk))
430420
getReadOnlyForker ldb rr pt = fmap readOnlyForker <$> getForkerAtTarget ldb rr pt
431421

432-
{-------------------------------------------------------------------------------
433-
Snapshots
434-
-------------------------------------------------------------------------------}
435-
436-
-- | Counters to keep track of when we made the last snapshot.
437-
data SnapCounters = SnapCounters
438-
{ prevSnapshotTime :: !(Maybe Time)
439-
-- ^ When was the last time we made a snapshot
440-
, ntBlocksSinceLastSnap :: !Word64
441-
-- ^ How many blocks have we processed since the last snapshot
442-
}
443-
444422
{-------------------------------------------------------------------------------
445423
Initialization
446424
-------------------------------------------------------------------------------}

0 commit comments

Comments
 (0)