From 7d4a524ed7f2099649b19b99690592accb1b8a3d Mon Sep 17 00:00:00 2001 From: Matthew Whitehead Date: Fri, 16 Aug 2024 16:19:08 +0100 Subject: [PATCH] New ACCOUNT_FREEZER_STATE DB segment. New BonsaiArchiveFreezer to listen for blocks and move account state to new DB segment Signed-off-by: Matthew Whitehead --- .../storage/TrieLogSubCommand.java | 1 + .../controller/BesuControllerBuilder.java | 31 ++++ .../flat/ArchiveCodeStorageStrategy.java | 2 +- .../storage/flat/ArchiveFlatDbStrategy.java | 33 +++- .../worldview/BonsaiArchiveFreezer.java | 161 ++++++++++++++++++ .../DiffBasedWorldStateKeyValueStorage.java | 71 ++++++++ .../storage/flat/FlatDbStrategyProvider.java | 8 +- 7 files changed, 299 insertions(+), 8 deletions(-) create mode 100644 ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/worldview/BonsaiArchiveFreezer.java diff --git a/besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/TrieLogSubCommand.java b/besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/TrieLogSubCommand.java index 0a65dcb91498..7ab3eff81654 100644 --- a/besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/TrieLogSubCommand.java +++ b/besu/src/main/java/org/hyperledger/besu/cli/subcommands/storage/TrieLogSubCommand.java @@ -39,6 +39,7 @@ import org.apache.logging.log4j.Level; import org.apache.logging.log4j.core.config.Configurator; +import org.hyperledger.besu.ethereum.worldstate.ImmutableDataStorageConfiguration; import org.rocksdb.RocksDBException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java index 4185f9b7be8b..2b05f4b7ae40 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java @@ -85,6 +85,7 @@ import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.BonsaiWorldStateProvider; import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.cache.BonsaiCachedMerkleTrieLoader; import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.storage.BonsaiWorldStateKeyValueStorage; +import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.worldview.BonsaiArchiveFreezer; import org.hyperledger.besu.ethereum.trie.diffbased.common.trielog.TrieLogManager; import org.hyperledger.besu.ethereum.trie.diffbased.common.trielog.TrieLogPruner; import org.hyperledger.besu.ethereum.trie.forest.ForestWorldStateArchive; @@ -752,6 +753,19 @@ public BesuController build() { trieLogManager.subscribe(trieLogPruner); } + // TODO - do we want a flag to turn this on and off? + if (DataStorageFormat.BONSAI_ARCHIVE.equals(dataStorageConfiguration.getDataStorageFormat())) { + final BonsaiWorldStateKeyValueStorage worldStateKeyValueStorage = + worldStateStorageCoordinator.getStrategy(BonsaiWorldStateKeyValueStorage.class); + final BonsaiArchiveFreezer archiveFreezer = + createBonsaiArchiveFreezer( + worldStateKeyValueStorage, + blockchain, + scheduler, + ((BonsaiWorldStateProvider) worldStateArchive).getTrieLogManager()); + blockchain.observeBlockAdded(archiveFreezer); + } + final List closeables = new ArrayList<>(); closeables.add(protocolContext.getWorldStateArchive()); closeables.add(storageProvider); @@ -818,6 +832,23 @@ private TrieLogPruner createTrieLogPruner( return trieLogPruner; } + private BonsaiArchiveFreezer createBonsaiArchiveFreezer( + final WorldStateKeyValueStorage worldStateStorage, + final Blockchain blockchain, + final EthScheduler scheduler, + final TrieLogManager trieLogManager) { + final BonsaiArchiveFreezer archiveFreezer = + new BonsaiArchiveFreezer( + (BonsaiWorldStateKeyValueStorage) worldStateStorage, + blockchain, + scheduler::executeServiceTask, + 10, + trieLogManager); + archiveFreezer.initialize(); + + return archiveFreezer; + } + /** * Create synchronizer synchronizer. * diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/storage/flat/ArchiveCodeStorageStrategy.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/storage/flat/ArchiveCodeStorageStrategy.java index a34e7bcf3d61..ad4219faae72 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/storage/flat/ArchiveCodeStorageStrategy.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/storage/flat/ArchiveCodeStorageStrategy.java @@ -55,7 +55,7 @@ public Optional getFlatCode( // use getNearest() with an account key that is suffixed by the block context final Optional codeFound = storage - .getNearestTo(CODE_STORAGE, keyNearest) + .getNearestBefore(CODE_STORAGE, keyNearest) // return empty when we find a "deleted value key" .filter( found -> diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/storage/flat/ArchiveFlatDbStrategy.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/storage/flat/ArchiveFlatDbStrategy.java index 1f0587feea2a..76b6aff90d18 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/storage/flat/ArchiveFlatDbStrategy.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/storage/flat/ArchiveFlatDbStrategy.java @@ -14,6 +14,7 @@ */ package org.hyperledger.besu.ethereum.trie.diffbased.bonsai.storage.flat; +import static org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier.ACCOUNT_FREEZER_STATE; import static org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier.ACCOUNT_INFO_STATE; import static org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier.ACCOUNT_STORAGE_STORAGE; @@ -50,7 +51,7 @@ public ArchiveFlatDbStrategy( static final byte[] MAX_BLOCK_SUFFIX = Bytes.ofUnsignedLong(Long.MAX_VALUE).toArrayUnsafe(); static final byte[] MIN_BLOCK_SUFFIX = Bytes.ofUnsignedLong(0L).toArrayUnsafe(); - static final byte[] DELETED_ACCOUNT_VALUE = new byte[0]; + public static final byte[] DELETED_ACCOUNT_VALUE = new byte[0]; public static final byte[] DELETED_CODE_VALUE = new byte[0]; static final byte[] DELETED_STORAGE_VALUE = new byte[0]; @@ -60,15 +61,15 @@ public Optional getFlatAccount( final NodeLoader nodeLoader, final Hash accountHash, final SegmentedKeyValueStorage storage) { - getAccountCounter.inc(); + getAccountCounter.inc(); // keyNearest, use MAX_BLOCK_SUFFIX in the absence of a block context: Bytes keyNearest = calculateArchiveKeyWithMaxSuffix(context, accountHash.toArrayUnsafe()); // use getNearest() with an account key that is suffixed by the block context final Optional accountFound = storage - .getNearestTo(ACCOUNT_INFO_STATE, keyNearest) + .getNearestBefore(ACCOUNT_INFO_STATE, keyNearest) // return empty when we find a "deleted value key" .filter( found -> @@ -80,10 +81,30 @@ public Optional getFlatAccount( if (accountFound.isPresent()) { getAccountFoundInFlatDatabaseCounter.inc(); + return accountFound; } else { - getAccountNotFoundInFlatDatabaseCounter.inc(); + // Check the frozen state as old state is moved out of the primary DB segment + final Optional frozenAccountFound = + storage + .getNearestBefore(ACCOUNT_FREEZER_STATE, keyNearest) + // return empty when we find a "deleted value key" + .filter( + found -> + !Arrays.areEqual( + DELETED_ACCOUNT_VALUE, found.value().orElse(DELETED_ACCOUNT_VALUE))) + // don't return accounts that do not have a matching account hash + .filter(found -> accountHash.commonPrefixLength(found.key()) >= accountHash.size()) + .flatMap(SegmentedKeyValueStorage.NearestKeyValue::wrapBytes); + + if (frozenAccountFound.isPresent()) { + // TODO - different metric for frozen lookups? + getAccountFoundInFlatDatabaseCounter.inc(); + } else { + getAccountNotFoundInFlatDatabaseCounter.inc(); + } + + return frozenAccountFound; } - return accountFound; } /* @@ -132,7 +153,7 @@ public Optional getFlatStorageValueByStorageSlotKey( // use getNearest() with a key that is suffixed by the block context final Optional storageFound = storage - .getNearestTo(ACCOUNT_STORAGE_STORAGE, keyNearest) + .getNearestBefore(ACCOUNT_STORAGE_STORAGE, keyNearest) // return empty when we find a "deleted value key" .filter( found -> diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/worldview/BonsaiArchiveFreezer.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/worldview/BonsaiArchiveFreezer.java new file mode 100644 index 000000000000..bbd0d4c4a0b3 --- /dev/null +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/worldview/BonsaiArchiveFreezer.java @@ -0,0 +1,161 @@ +/* + * Copyright contributors to Hyperledger Besu. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.trie.diffbased.bonsai.worldview; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.chain.BlockAddedEvent; +import org.hyperledger.besu.ethereum.chain.BlockAddedObserver; +import org.hyperledger.besu.ethereum.chain.Blockchain; +import org.hyperledger.besu.ethereum.trie.diffbased.common.storage.DiffBasedWorldStateKeyValueStorage; +import org.hyperledger.besu.ethereum.trie.diffbased.common.trielog.TrieLogManager; +import org.hyperledger.besu.plugin.services.trielogs.TrieLog; + +import java.util.Comparator; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.TreeMultimap; +import org.apache.tuweni.bytes.Bytes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class manages the "freezing" of historic state that is still needed to satisfy queries but + * doesn't need to be in the main DB segment for. Doing so would degrade block-import performance + * over time so we move state beyond a certain age (in blocks) to other DB segments, assuming there + * is a more recent (i.e. changed) version of the state. If state is created once and never changed + * it will remain in the primary DB segment(s). + */ +public class BonsaiArchiveFreezer implements BlockAddedObserver { + + private static final Logger LOG = LoggerFactory.getLogger(BonsaiArchiveFreezer.class); + + private final DiffBasedWorldStateKeyValueStorage rootWorldStateStorage; + private final Blockchain blockchain; + private final Consumer executeAsync; + private final long numberOfBlocksToKeepInWarmStorage; + private final TrieLogManager trieLogManager; + + private final Multimap blocksToMoveToFreezer = + TreeMultimap.create(Comparator.reverseOrder(), Comparator.naturalOrder()); + + public BonsaiArchiveFreezer( + final DiffBasedWorldStateKeyValueStorage rootWorldStateStorage, + final Blockchain blockchain, + final Consumer executeAsync, + final long numberOfBlocksToKeepInWarmStorage, + final TrieLogManager trieLogManager) { + this.rootWorldStateStorage = rootWorldStateStorage; + this.blockchain = blockchain; + this.executeAsync = executeAsync; + this.numberOfBlocksToKeepInWarmStorage = numberOfBlocksToKeepInWarmStorage; + this.trieLogManager = trieLogManager; + } + + public int initialize() { + // TODO Probably need to freeze old blocks that haven't been frozen already? + return 0; + } + + public synchronized void addToFreezerQueue(final long blockNumber, final Hash blockHash) { + LOG.atDebug() + .setMessage( + "adding block to archive freezer queue for moving to cold storage, blockNumber {}; blockHash {}") + .addArgument(blockNumber) + .addArgument(blockHash) + .log(); + blocksToMoveToFreezer.put(blockNumber, blockHash); + } + + public synchronized int moveBlockStateToFreezer() { + final long retainAboveThisBlock = + blockchain.getChainHeadBlockNumber() - numberOfBlocksToKeepInWarmStorage; + if (rootWorldStateStorage.getFlatDbMode().getVersion() == Bytes.EMPTY) { + throw new IllegalStateException("DB mode version not set"); + } + + AtomicInteger frozenStateCount = new AtomicInteger(); + + LOG.atDebug() + .setMessage( + "Moving cold state to freezer storage (chainHeadNumber: {} - numberOfBlocksToKeepInWarmStorage: {}) = {}") + .addArgument(blockchain::getChainHeadBlockNumber) + .addArgument(numberOfBlocksToKeepInWarmStorage) + .addArgument(retainAboveThisBlock) + .log(); + + final var blocksToMove = + blocksToMoveToFreezer.asMap().entrySet().stream() + .dropWhile((e) -> e.getKey() > retainAboveThisBlock); + // TODO - limit to a configurable number of blocks to move per loop + + final Multimap movedToFreezer = ArrayListMultimap.create(); + + // Determine which world state keys have changed in the last N blocks by looking at the + // trie logs for the blocks. Then move the old keys to the freezer segment (if and only if they + // have changed) + blocksToMove.forEach( + (block) -> { + for (Hash blockHash : block.getValue()) { + Optional trieLog = trieLogManager.getTrieLogLayer(blockHash); + if (trieLog.isPresent()) { + trieLog + .get() + .getAccountChanges() + .forEach( + (address, change) -> { + // Move any previous state for this account + frozenStateCount.addAndGet( + rootWorldStateStorage.freezePreviousAccountState( + blockchain.getBlockHeader(blockHash), + blockchain.getBlockHeader(block.getKey() - 1), + address.addressHash())); + // TODO - block number - 1 is a hack until getNearestBefore() is pulled in + }); + } + movedToFreezer.put(block.getKey(), blockHash); + } + }); + + movedToFreezer.keySet().forEach(blocksToMoveToFreezer::removeAll); + + if (frozenStateCount.get() > 0) { + LOG.atInfo() + .setMessage("froze {} state entries for {} blocks") + .addArgument(frozenStateCount.get()) + .addArgument(movedToFreezer::size) + .log(); + } + + return movedToFreezer.size(); + } + + @Override + public void onBlockAdded(final BlockAddedEvent addedBlockContext) { + final Hash blockHash = addedBlockContext.getBlock().getHeader().getBlockHash(); + final Optional blockNumber = + Optional.of(addedBlockContext.getBlock().getHeader().getNumber()); + blockNumber.ifPresent( + blockNum -> + executeAsync.accept( + () -> { + addToFreezerQueue(blockNum, blockHash); + moveBlockStateToFreezer(); + })); + } +} diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/storage/DiffBasedWorldStateKeyValueStorage.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/storage/DiffBasedWorldStateKeyValueStorage.java index 50adf7b34fea..6edf2ea763ab 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/storage/DiffBasedWorldStateKeyValueStorage.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/storage/DiffBasedWorldStateKeyValueStorage.java @@ -14,14 +14,19 @@ */ package org.hyperledger.besu.ethereum.trie.diffbased.common.storage; +import static org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier.ACCOUNT_FREEZER_STATE; import static org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier.ACCOUNT_INFO_STATE; import static org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier.ACCOUNT_STORAGE_STORAGE; import static org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier.CODE_STORAGE; import static org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier.TRIE_BRANCH_STORAGE; +import static org.hyperledger.besu.ethereum.trie.diffbased.bonsai.storage.flat.ArchiveFlatDbStrategy.DELETED_ACCOUNT_VALUE; import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.bonsai.BonsaiContext; +import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.storage.StorageProvider; import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueSegmentIdentifier; +import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.storage.flat.ArchiveFlatDbStrategy; import org.hyperledger.besu.ethereum.trie.diffbased.common.StorageSubscriber; import org.hyperledger.besu.ethereum.trie.diffbased.common.storage.flat.FlatDbStrategy; import org.hyperledger.besu.ethereum.worldstate.FlatDbMode; @@ -38,12 +43,14 @@ import java.util.NavigableMap; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import java.util.stream.Stream; import kotlin.Pair; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.bytes.Bytes32; +import org.bouncycastle.util.Arrays; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -192,6 +199,70 @@ public boolean pruneTrieLog(final Hash blockHash) { } } + /** + * Move old state from the primary DB segments to "cold" segments that will only be used for + * historic state queries. This prevents performance degradation over time for writes to the + * primary DB segments. + * + * @param currentBlockHeader TODO - should not be needed + * @param previousBlockHeader the block header for the previous block, used to get the "nearest + * before" state + * @param accountHash the account to freeze old state for + * @return the number of account states that were moved to frozen storage + */ + public int freezePreviousAccountState( + final Optional currentBlockHeader, + final Optional previousBlockHeader, + final Hash accountHash) { + AtomicInteger frozenStateCount = new AtomicInteger(); + if (previousBlockHeader.isPresent()) { + try { + // Get the key for this block + final BonsaiContext theContext = new BonsaiContext(); + theContext.setBlockHeader(currentBlockHeader.get()); + + // Get the key for the previous block + final BonsaiContext previousContext = new BonsaiContext(); + previousContext.setBlockHeader(previousBlockHeader.get()); + final Bytes previousKey = + ArchiveFlatDbStrategy.calculateArchiveKeyWithMaxSuffix( + previousContext, accountHash.toArrayUnsafe()); + + composedWorldStateStorage + .getNearestBefore(ACCOUNT_INFO_STATE, previousKey) + .filter( + // Ignore deleted entries + found -> + !Arrays.areEqual( + DELETED_ACCOUNT_VALUE, found.value().orElse(DELETED_ACCOUNT_VALUE))) + // Skip "nearest" entries that are for a different account + .filter(found -> accountHash.commonPrefixLength(found.key()) >= accountHash.size()) + .stream() + .forEach( + (nearestKey) -> { + SegmentedKeyValueStorageTransaction tx = + composedWorldStateStorage.startTransaction(); + tx.remove(ACCOUNT_INFO_STATE, nearestKey.key().toArrayUnsafe()); + tx.put( + ACCOUNT_FREEZER_STATE, + nearestKey.key().toArrayUnsafe(), + nearestKey.value().get()); + tx.commit(); + frozenStateCount.getAndIncrement(); + }); + + LOG.atDebug() + .setMessage("no previous state for account {} found to move to cold storage") + .addArgument(accountHash) + .log(); + } catch (Exception e) { + LOG.error("Error moving account state for account {} to cold storage", accountHash, e); + } + } + + return frozenStateCount.get(); + } + @Override public synchronized void close() throws Exception { // when the storage clears, close diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/storage/flat/FlatDbStrategyProvider.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/storage/flat/FlatDbStrategyProvider.java index 731d78fba988..3e08f8eb6fd4 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/storage/flat/FlatDbStrategyProvider.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/storage/flat/FlatDbStrategyProvider.java @@ -25,6 +25,7 @@ import org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration; import org.hyperledger.besu.ethereum.worldstate.FlatDbMode; import org.hyperledger.besu.plugin.services.MetricsSystem; +import org.hyperledger.besu.plugin.services.storage.DataStorageFormat; import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorage; import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorageTransaction; @@ -99,7 +100,12 @@ FlatDbMode deriveFlatDbStrategy(final SegmentedKeyValueStorage composedWorldStat // and default to the storage config otherwise // TODO: temporarily hard code ARCHIVE mode for testing - var flatDbModeVal = FlatDbMode.ARCHIVE.getVersion(); + var flatDbModeVal = + dataStorageConfiguration + .getDataStorageFormat() + .equals(DataStorageFormat.BONSAI_ARCHIVE) + ? FlatDbMode.ARCHIVE.getVersion() + : FlatDbMode.FULL.getVersion(); // existingTrieData // ? FlatDbMode.ARCHIVE.getVersion() // : requestedFlatDbMode.getVersion();