23
23
import org .hyperledger .besu .ethereum .trie .diffbased .common .trielog .TrieLogManager ;
24
24
import org .hyperledger .besu .plugin .services .trielogs .TrieLog ;
25
25
26
- import java .util .Comparator ;
26
+ import java .util .Collections ;
27
+ import java .util .Map ;
27
28
import java .util .Optional ;
29
+ import java .util .TreeMap ;
28
30
import java .util .concurrent .atomic .AtomicInteger ;
29
31
import java .util .function .Consumer ;
30
32
31
- import com .google .common .collect .ArrayListMultimap ;
32
- import com .google .common .collect .Multimap ;
33
- import com .google .common .collect .TreeMultimap ;
34
33
import org .apache .tuweni .bytes .Bytes ;
35
34
import org .slf4j .Logger ;
36
35
import org .slf4j .LoggerFactory ;
@@ -49,12 +48,12 @@ public class BonsaiArchiveFreezer implements BlockAddedObserver {
49
48
private final DiffBasedWorldStateKeyValueStorage rootWorldStateStorage ;
50
49
private final Blockchain blockchain ;
51
50
private final Consumer <Runnable > executeAsync ;
52
- private static final int PRELOAD_LIMIT = 1000 ;
51
+ private static final int CATCHUP_LIMIT = 1000 ;
53
52
private static final int DISTANCE_FROM_HEAD_BEFORE_FREEZING_OLD_STATE = 10 ;
54
53
private final TrieLogManager trieLogManager ;
55
54
56
- private final Multimap <Long , Hash > blocksToMoveToFreezer =
57
- TreeMultimap . create ( Comparator . reverseOrder (), Comparator . naturalOrder ());
55
+ private final Map <Long , Hash > pendingBlocksToArchive =
56
+ Collections . synchronizedMap ( new TreeMap <> ());
58
57
59
58
public BonsaiArchiveFreezer (
60
59
final DiffBasedWorldStateKeyValueStorage rootWorldStateStorage ,
@@ -67,9 +66,7 @@ public BonsaiArchiveFreezer(
67
66
this .trieLogManager = trieLogManager ;
68
67
}
69
68
70
- public void initialize () {
71
- // On startup there will be recent blocks whose state and storage hasn't been archived yet.
72
- // Pre-load them ready for freezing state once enough new blocks have been added to the chain.
69
+ private void preloadCatchupBlocks () {
73
70
Optional <Long > frozenBlocksHead = Optional .empty ();
74
71
75
72
Optional <Long > latestFrozenBlock = rootWorldStateStorage .getLatestArchiveFrozenBlock ();
@@ -86,7 +83,7 @@ public void initialize() {
86
83
if (frozenBlocksHead .isPresent ()) {
87
84
int preLoadedBlocks = 0 ;
88
85
Optional <Block > nextBlock = blockchain .getBlockByNumber (frozenBlocksHead .get ());
89
- for (int i = 0 ; i < PRELOAD_LIMIT ; i ++) {
86
+ for (int i = 0 ; i < CATCHUP_LIMIT ; i ++) {
90
87
if (nextBlock .isPresent ()) {
91
88
addToFreezerQueue (
92
89
nextBlock .get ().getHeader ().getNumber (), nextBlock .get ().getHeader ().getHash ());
@@ -97,13 +94,27 @@ public void initialize() {
97
94
}
98
95
}
99
96
LOG .atInfo ()
100
- .setMessage ("Preloaded {} blocks to move their state and storage to the archive freezer" )
97
+ .setMessage (
98
+ "Preloaded {} blocks from {} to move their state and storage to the archive freezer" )
101
99
.addArgument (preLoadedBlocks )
100
+ .addArgument (frozenBlocksHead .get ())
102
101
.log ();
103
102
}
103
+ }
104
+
105
+ public void initialize () {
106
+ // On startup there will be recent blocks whose state and storage hasn't been archived yet.
107
+ // Pre-load them ready for freezing state once enough new blocks have been added to the chain.
108
+ preloadCatchupBlocks ();
109
+
110
+ // Keep catching up until we move less to the freezer than the catchup limit
111
+ while (moveBlockStateToFreezer () == CATCHUP_LIMIT ) {
112
+ preloadCatchupBlocks ();
113
+ }
114
+ }
104
115
105
- // Start processing any backlog on startup - don't wait for a new block to be imported.
106
- moveBlockStateToFreezer ();
116
+ public int getPendingBlocksCount () {
117
+ return pendingBlocksToArchive . size ();
107
118
}
108
119
109
120
public synchronized void addToFreezerQueue (final long blockNumber , final Hash blockHash ) {
@@ -113,10 +124,17 @@ public synchronized void addToFreezerQueue(final long blockNumber, final Hash bl
113
124
.addArgument (blockNumber )
114
125
.addArgument (blockHash )
115
126
.log ();
116
- blocksToMoveToFreezer .put (blockNumber , blockHash );
127
+ pendingBlocksToArchive .put (blockNumber , blockHash );
128
+ }
129
+
130
+ private synchronized void removeArchivedFromQueue (final Map <Long , Hash > archivedBlocks ) {
131
+ archivedBlocks .keySet ().forEach (e -> pendingBlocksToArchive .remove (e ));
117
132
}
118
133
119
- public synchronized int moveBlockStateToFreezer () {
134
+ // Move state and storage entries from their primary DB segments to the freezer segments. This is
135
+ // intended to maintain good performance for new block imports by keeping the primary DB segments
136
+ // to live state only. Returns the number of state and storage entries moved.
137
+ public int moveBlockStateToFreezer () {
120
138
final long retainAboveThisBlock =
121
139
blockchain .getChainHeadBlockNumber () - DISTANCE_FROM_HEAD_BEFORE_FREEZING_OLD_STATE ;
122
140
@@ -135,100 +153,99 @@ public synchronized int moveBlockStateToFreezer() {
135
153
.addArgument (retainAboveThisBlock )
136
154
.log ();
137
155
138
- final var accountsToMove =
139
- blocksToMoveToFreezer .asMap ().entrySet ().stream ()
140
- .dropWhile ((e ) -> e .getKey () > retainAboveThisBlock );
156
+ // Typically we will move all storage and state for a single block i.e. when a new block is
157
+ // imported, move state for block-N. There are cases where we catch-up and move old state
158
+ // for a number of blocks so we may iterate over a number of blocks freezing their state,
159
+ // not just a single one.
141
160
142
- final Multimap <Long , Hash > accountStateFreezerActionsComplete = ArrayListMultimap .create ();
143
- final Multimap <Long , Hash > accountStorageFreezerActionsComplete = ArrayListMultimap .create ();
161
+ final Map <Long , Hash > blocksToFreeze = new TreeMap <>();
162
+ pendingBlocksToArchive .entrySet ().stream ()
163
+ .filter ((e ) -> e .getKey () <= retainAboveThisBlock )
164
+ .forEach (
165
+ (e ) -> {
166
+ blocksToFreeze .put (e .getKey (), e .getValue ());
167
+ });
144
168
145
169
// Determine which world state keys have changed in the last N blocks by looking at the
146
170
// trie logs for the blocks. Then move the old keys to the freezer segment (if and only if they
147
171
// have changed)
148
- accountsToMove
149
- .parallel ()
172
+ blocksToFreeze
173
+ .entrySet ()
150
174
.forEach (
151
175
(block ) -> {
152
- for (Hash blockHash : block .getValue ()) {
153
- Optional <TrieLog > trieLog = trieLogManager .getTrieLogLayer (blockHash );
154
- if (trieLog .isPresent ()) {
155
- trieLog
156
- .get ()
157
- .getAccountChanges ()
158
- .forEach (
159
- (address , change ) -> {
160
- // Move any previous state for this account
161
- frozenAccountStateCount .addAndGet (
162
- rootWorldStateStorage .freezePreviousAccountState (
163
- blockchain .getBlockHeader (
164
- blockchain .getBlockHeader (blockHash ).get ().getParentHash ()),
165
- address .addressHash ()));
166
- });
167
- }
168
- accountStateFreezerActionsComplete .put (block .getKey (), blockHash );
176
+ if (pendingBlocksToArchive .size () > 0 && pendingBlocksToArchive .size () % 100 == 0 ) {
177
+ // Log progress in case catching up causes there to be a large number of keys
178
+ // to move
179
+ LOG .atInfo ()
180
+ .setMessage ("state for blocks {} to {} archived" )
181
+ .addArgument (block .getKey ())
182
+ .addArgument (block .getKey () + pendingBlocksToArchive .size ())
183
+ .log ();
169
184
}
170
- });
171
-
172
- final var storageToMove =
173
- blocksToMoveToFreezer .asMap ().entrySet ().stream ()
174
- .dropWhile ((e ) -> e .getKey () > retainAboveThisBlock );
175
-
176
- storageToMove
177
- .parallel ()
178
- .forEach (
179
- (block ) -> {
180
- for (Hash blockHash : block .getValue ()) {
181
- Optional <TrieLog > trieLog = trieLogManager .getTrieLogLayer (blockHash );
182
- if (trieLog .isPresent ()) {
183
- trieLog
184
- .get ()
185
- .getStorageChanges ()
186
- .forEach (
187
- (address , storageSlotKey ) -> {
188
- storageSlotKey .forEach (
189
- (slotKey , slotValue ) -> {
190
- // Move any previous state for this account
191
- frozenAccountStorageCount .addAndGet (
192
- rootWorldStateStorage .freezePreviousStorageState (
193
- blockchain .getBlockHeader (
194
- blockchain
195
- .getBlockHeader (blockHash )
196
- .get ()
197
- .getParentHash ()),
198
- Bytes .concatenate (
199
- address .addressHash (), slotKey .getSlotHash ())));
200
- });
201
- });
202
- }
203
- accountStorageFreezerActionsComplete .put (block .getKey (), blockHash );
185
+ Hash blockHash = block .getValue ();
186
+ LOG .atDebug ()
187
+ .setMessage ("Freezing all account state for block {}" )
188
+ .addArgument (block .getKey ())
189
+ .log ();
190
+ Optional <TrieLog > trieLog = trieLogManager .getTrieLogLayer (blockHash );
191
+ if (trieLog .isPresent ()) {
192
+ trieLog
193
+ .get ()
194
+ .getAccountChanges ()
195
+ .forEach (
196
+ (address , change ) -> {
197
+ // Move any previous state for this account
198
+ frozenAccountStateCount .addAndGet (
199
+ rootWorldStateStorage .freezePreviousAccountState (
200
+ blockchain .getBlockHeader (
201
+ blockchain .getBlockHeader (blockHash ).get ().getParentHash ()),
202
+ address .addressHash ()));
203
+ });
204
+ LOG .atDebug ()
205
+ .setMessage ("Freezing all storage state for block {}" )
206
+ .addArgument (block .getKey ())
207
+ .log ();
208
+ trieLog
209
+ .get ()
210
+ .getStorageChanges ()
211
+ .forEach (
212
+ (address , storageSlotKey ) -> {
213
+ storageSlotKey .forEach (
214
+ (slotKey , slotValue ) -> {
215
+ // Move any previous state for this account
216
+ frozenAccountStorageCount .addAndGet (
217
+ rootWorldStateStorage .freezePreviousStorageState (
218
+ blockchain .getBlockHeader (
219
+ blockchain
220
+ .getBlockHeader (blockHash )
221
+ .get ()
222
+ .getParentHash ()),
223
+ Bytes .concatenate (
224
+ address .addressHash (), slotKey .getSlotHash ())));
225
+ });
226
+ });
204
227
}
228
+ LOG .atDebug ()
229
+ .setMessage ("All account state and storage frozen for block {}" )
230
+ .addArgument (block .getKey ())
231
+ .log ();
232
+ rootWorldStateStorage .setLatestArchiveFrozenBlock (block .getKey ());
205
233
});
206
234
207
- // For us to consider all state and storage changes for a block complete, it must have been
208
- // recorded in both accountState and accountStorage lists. If only one finished we need to try
209
- // freezing state/storage for that block again on the next loop
210
- AtomicInteger frozenBlocksCompleted = new AtomicInteger ();
211
- accountStateFreezerActionsComplete
212
- .keySet ()
213
- .forEach (
214
- (b ) -> {
215
- if (accountStorageFreezerActionsComplete .containsKey (b )) {
216
- frozenBlocksCompleted .getAndIncrement ();
217
- rootWorldStateStorage .setLatestArchiveFrozenBlock (b );
218
- blocksToMoveToFreezer .removeAll (b );
219
- }
220
- });
235
+ LOG .atDebug ()
236
+ .setMessage (
237
+ "finished moving cold state to freezer storage for range (chainHeadNumber: {} - numberOfBlocksToKeepInWarmStorage: {}) = {}. Froze {} account state entries, {} account storage entries from {} blocks" )
238
+ .addArgument (blockchain ::getChainHeadBlockNumber )
239
+ .addArgument (DISTANCE_FROM_HEAD_BEFORE_FREEZING_OLD_STATE )
240
+ .addArgument (retainAboveThisBlock )
241
+ .addArgument (frozenAccountStateCount .get ())
242
+ .addArgument (frozenAccountStorageCount .get ())
243
+ .addArgument (blocksToFreeze .size ())
244
+ .log ();
221
245
222
- if (frozenAccountStateCount .get () > 0 || frozenAccountStorageCount .get () > 0 ) {
223
- LOG .atDebug ()
224
- .setMessage ("Froze {} account state entries, {} account storage entries for {} blocks" )
225
- .addArgument (frozenAccountStateCount .get ())
226
- .addArgument (frozenAccountStorageCount .get ())
227
- .addArgument (frozenBlocksCompleted .get ())
228
- .log ();
229
- }
246
+ removeArchivedFromQueue (blocksToFreeze );
230
247
231
- return frozenBlocksCompleted .get ();
248
+ return frozenAccountStateCount . get () + frozenAccountStorageCount .get ();
232
249
}
233
250
234
251
@ Override
0 commit comments