From 0b77cd7d7829c4e3f237056c5c6b83debf5d5464 Mon Sep 17 00:00:00 2001 From: kdeme <7857583+kdeme@users.noreply.github.com> Date: Mon, 17 Feb 2025 18:47:05 +0100 Subject: [PATCH 1/3] PoC Syncing nimbus EL from Portal network To be heavily cleaned up, very quick version to make a streaming version work instead of the previous version which would do batches of 8192. --- execution_chain/config.nim | 12 + execution_chain/nimbus_execution_client.nim | 2 +- execution_chain/nimbus_import.nim | 237 +++++++++++++++++++- fluffy/network/history/history_network.nim | 2 +- 4 files changed, 242 insertions(+), 11 deletions(-) diff --git a/execution_chain/config.nim b/execution_chain/config.nim index 04b1e55bac..bbebf65124 100644 --- a/execution_chain/config.nim +++ b/execution_chain/config.nim @@ -532,6 +532,18 @@ type defaultValue: false name: "debug-store-slot-hashes".}: bool + usePortal* {. + hidden + desc: "Use portal network instead of era files" + defaultValue: false + name: "debug-use-portal".}: bool + + portalWorkers* {. + hidden + desc: "Amount of Portal workers to use for downloading blocks" + defaultValue: 512 + name: "debug-portal-workers".}: int + of `import-rlp`: blocksFile* {. argument diff --git a/execution_chain/nimbus_execution_client.nim b/execution_chain/nimbus_execution_client.nim index 730c4ecd72..a2bf3ab05a 100644 --- a/execution_chain/nimbus_execution_client.nim +++ b/execution_chain/nimbus_execution_client.nim @@ -247,7 +247,7 @@ proc run(nimbus: NimbusNode, conf: NimbusConf) = case conf.cmd of NimbusCmd.`import`: - importBlocks(conf, com) + importBlocksPortal(conf, com) of NimbusCmd.`import-rlp`: importRlpBlocks(conf, com) else: diff --git a/execution_chain/nimbus_import.nim b/execution_chain/nimbus_import.nim index 8ab72ac028..6e8a655602 100644 --- a/execution_chain/nimbus_import.nim +++ b/execution_chain/nimbus_import.nim @@ -13,7 +13,8 @@ import chronicles, metrics, chronos/timer, - std/[strformat, strutils], + chronos, + std/[strformat, strutils, os], stew/io2, beacon_chain/era_db, beacon_chain/networking/network_metadata, @@ -21,7 +22,16 @@ import ./common/common, ./core/chain, ./db/era1_db, - ./utils/era_helpers + ./utils/era_helpers, + eth/common/keys, # rng + eth/net/nat, # setupAddress + eth/p2p/discoveryv5/protocol as discv5_protocol, + eth/p2p/discoveryv5/routing_table, + eth/p2p/discoveryv5/enr, + ../fluffy/portal_node, + ../fluffy/common/common_utils, # getPersistentNetKey, getPersistentEnr + ../fluffy/network_metadata, + ../fluffy/version declareGauge nec_import_block_number, "Latest imported block number" @@ -95,7 +105,192 @@ template boolFlag(flags, b): PersistBlockFlags = else: {} -proc importBlocks*(conf: NimbusConf, com: CommonRef) = +proc run(config: NimbusConf): PortalNode {. + raises: [CatchableError] +.} = + let rng = newRng() + + ## Network configuration + let + bindIp = config.listenAddress + udpPort = Port(config.udpPort) + # TODO: allow for no TCP port mapping! + (extIp, _, extUdpPort) = + try: + setupAddress(config.nat, config.listenAddress, udpPort, udpPort, "portal") + except CatchableError as exc: + raiseAssert exc.msg + # raise exc # TODO: Ideally we don't have the Exception here + except Exception as exc: + raiseAssert exc.msg + (netkey, newNetKey) = + # if config.netKey.isSome(): + # (config.netKey.get(), true) + # else: + getPersistentNetKey(rng[], config.dataDir / "netkey") + + enrFilePath = config.dataDir / "nimbus_portal_node.enr" + previousEnr = + if not newNetKey: + getPersistentEnr(enrFilePath) + else: + Opt.none(enr.Record) + + var bootstrapRecords: seq[Record] + # loadBootstrapFile(string config.bootstrapNodesFile, bootstrapRecords) + # bootstrapRecords.add(config.bootstrapNodes) + + # case config.network + # of PortalNetwork.none: + # discard # don't connect to any network bootstrap nodes + # of PortalNetwork.mainnet: + # for enrURI in mainnetBootstrapNodes: + # let res = enr.Record.fromURI(enrURI) + # if res.isOk(): + # bootstrapRecords.add(res.value) + # of PortalNetwork.angelfood: + # for enrURI in angelfoodBootstrapNodes: + # let res = enr.Record.fromURI(enrURI) + # if res.isOk(): + # bootstrapRecords.add(res.value) + + # Only mainnet + for enrURI in mainnetBootstrapNodes: + let res = enr.Record.fromURI(enrURI) + if res.isOk(): + bootstrapRecords.add(res.value) + + ## Discovery v5 protocol setup + let + discoveryConfig = + DiscoveryConfig.init(DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop) + d = newProtocol( + netkey, + extIp, + Opt.none(Port), + extUdpPort, + # Note: The addition of default clientInfo to the ENR is a temporary + # measure to easily identify & debug the clients used in the testnet. + # Might make this into a, default off, cli option. + localEnrFields = {"c": enrClientInfoShort}, + bootstrapRecords = bootstrapRecords, + previousRecord = previousEnr, + bindIp = bindIp, + bindPort = udpPort, + enrAutoUpdate = true, + config = discoveryConfig, + rng = rng, + ) + + d.open() + + ## Portal node setup + let + portalProtocolConfig = PortalProtocolConfig.init( + DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop, defaultAlpha, RadiusConfig(kind: Static, logRadius: 249), + defaultDisablePoke, defaultMaxGossipNodes, defaultContentCacheSize, + defaultDisableContentCache, defaultMaxConcurrentOffers, defaultDisableBanNodes, + ) + + portalNodeConfig = PortalNodeConfig( + accumulatorFile: Opt.none(string), + disableStateRootValidation: true, + trustedBlockRoot: Opt.none(Digest), + portalConfig: portalProtocolConfig, + dataDir: string config.dataDir, + storageCapacity: 0, + contentRequestRetries: 1 + ) + + node = PortalNode.new( + PortalNetwork.mainnet, + portalNodeConfig, + d, + {PortalSubnetwork.history}, + bootstrapRecords = bootstrapRecords, + rng = rng, + ) + + let enrFile = config.dataDir / "nimbus_portal_node.enr" + if io2.writeFile(enrFile, d.localNode.record.toURI()).isErr: + fatal "Failed to write the enr file", file = enrFile + quit 1 + + ## Start the Portal node. + node.start() + + node + +proc getBlockLoop(node: PortalNode, blockQueue: AsyncQueue[EthBlock], startBlock: uint64, portalWorkers: int): Future[void] {.async.} = + const bufferSize = 8192 + + let historyNetwork = node.historyNetwork.value() + let blockNumberQueue = newAsyncQueue[(uint64, uint64)](2048) + + var blockNumber = startBlock + var blocks: seq[EthBlock] = newSeq[EthBlock](bufferSize) + var count = 0 + var failureCount = 0 + + # Note: Could make these stuint bitmasks + var downloadFinished: array[bufferSize, bool] + var downloadStarted: array[bufferSize, bool] + + proc blockWorker(node: PortalNode): Future[void] {.async.} = + while true: + let (blockNumber, i) = await blockNumberQueue.popFirst() + var currentBlockFailures = 0 + while true: + let (header, body) = (await historyNetwork.getBlock(blockNumber + i)).valueOr: + currentBlockFailures.inc() + if currentBlockFailures > 10: + fatal "Block download failed too many times", blockNumber = blockNumber + i, currentBlockFailures + quit(QuitFailure) + + debug "Failed to get block", blockNumber = blockNumber + i, currentBlockFailures + failureCount.inc() + continue + + blocks[i] = init(EthBlock, header, body) + downloadFinished[i] = true + count.inc() + + break + + var workers: seq[Future[void]] = @[] + for i in 0 ..< portalWorkers: + workers.add node.blockWorker() + + info "Start downloading blocks", startBlock = blockNumber + var i = 0'u64 + var nextDownloadedIndex = 0 + let t0 = Moment.now() + + while true: + while downloadFinished[nextDownloadedIndex]: + debug "Adding block to the processing queue", blockNumber = nextDownloadedIndex.uint64 + blockNumber + await blockQueue.addLast(blocks[nextDownloadedIndex]) + downloadFinished[nextDownloadedIndex] = false + downloadStarted[nextDownloadedIndex] = false + nextDownloadedIndex = (nextDownloadedIndex + 1) mod bufferSize + + # TODO: can use the read pointer nextDownloadedIndex instead and get rid of downloadStarted + if not downloadStarted[i]: + debug "Adding block to the download queue", blockNumber = blockNumber + i + await blockNumberQueue.addLast((blockNumber, i)) + downloadStarted[i] = true + # TODO clean this up by directly using blocknumber with modulo calc + if i == bufferSize.uint64 - 1: + blockNumber += bufferSize.uint64 + let t1 = Moment.now() + let diff = (t1 - t0).nanoseconds().float / 1000000000 + let avgBps = count.float / diff + info "Total blocks downloaded", count = count, failureCount = failureCount, failureRate = failureCount.float / count.float, avgBps = avgBps + i = (i + 1'u64) mod bufferSize.uint64 + else: + await sleepAsync(1.nanoseconds) + +proc importBlocks*(conf: NimbusConf, com: CommonRef, node: PortalNode, blockQueue: AsyncQueue[EthBlock]) {.async.} = proc controlCHandler() {.noconv.} = when defined(windows): # workaround for https://github.com/nim-lang/Nim/issues/4057 @@ -126,7 +321,7 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = boolFlag(NoPersistBodies, not conf.storeBodies) + boolFlag({PersistBlockFlag.NoPersistReceipts}, not conf.storeReceipts) + boolFlag({PersistBlockFlag.NoPersistSlotHashes}, not conf.storeSlotHashes) - blk: Block + blk: blocks.Block persister = Persister.init(com, flags) cstats: PersistStats # stats at start of chunk @@ -299,11 +494,16 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = while running and persister.stats.blocks.uint64 < conf.maxBlocks and blockNumber <= lastEra1Block: - if not loadEraBlock(blockNumber): - notice "No more `era1` blocks to import", blockNumber, slot - break - persistBlock() - checkpoint() + if not conf.usePortal: + if not loadEraBlock(blockNumber): + notice "No more `era1` blocks to import", blockNumber, slot + break + persistBlock() + checkpoint() + else: + blk = await blockQueue.popFirst() + persistBlock() + checkpoint() block era1Import: if blockNumber > lastEra1Block: @@ -375,3 +575,22 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = blocks = persister.stats.blocks, txs = persister.stats.txs, mgas = f(persister.stats.gas.float / 1000000) + +proc importBlocksPortal*(conf: NimbusConf, com: CommonRef) {. + raises: [CatchableError] +.} = + let + portalNode = run(conf) + blockQueue = newAsyncQueue[EthBlock]() + start = com.db.baseTxFrame().getSavedStateBlockNumber() + 1 + + if conf.usePortal: + asyncSpawn portalNode.getBlockLoop(blockQueue, start, conf.portalWorkers) + + asyncSpawn importBlocks(conf, com, portalNode, blockQueue) + + while running: + try: + poll() + except CatchableError as e: + warn "Exception in poll()", exc = e.name, err = e.msg diff --git a/fluffy/network/history/history_network.nim b/fluffy/network/history/history_network.nim index 4315d8669a..71975dd0b1 100644 --- a/fluffy/network/history/history_network.nim +++ b/fluffy/network/history/history_network.nim @@ -201,7 +201,7 @@ proc getBlockBody*( n.portalProtocol.banNode( bodyContent.receivedFrom.id, NodeBanDurationContentLookupFailedValidation ) - warn "Validation of block body failed", + debug "Validation of block body failed", error, node = bodyContent.receivedFrom.record.toURI() continue From 2b255358dffd908d8c584a42f2d8bb83f1b577ae Mon Sep 17 00:00:00 2001 From: kdeme <7857583+kdeme@users.noreply.github.com> Date: Thu, 6 Mar 2025 11:46:28 +0100 Subject: [PATCH 2/3] Bit of cleanup --- execution_chain/config.nim | 2 +- execution_chain/nimbus_import.nim | 100 +++++++++++++++++------------- 2 files changed, 59 insertions(+), 43 deletions(-) diff --git a/execution_chain/config.nim b/execution_chain/config.nim index bbebf65124..f6acaba08d 100644 --- a/execution_chain/config.nim +++ b/execution_chain/config.nim @@ -541,7 +541,7 @@ type portalWorkers* {. hidden desc: "Amount of Portal workers to use for downloading blocks" - defaultValue: 512 + defaultValue: 128 name: "debug-portal-workers".}: int of `import-rlp`: diff --git a/execution_chain/nimbus_import.nim b/execution_chain/nimbus_import.nim index 6e8a655602..548f9f5865 100644 --- a/execution_chain/nimbus_import.nim +++ b/execution_chain/nimbus_import.nim @@ -221,39 +221,45 @@ proc run(config: NimbusConf): PortalNode {. node -proc getBlockLoop(node: PortalNode, blockQueue: AsyncQueue[EthBlock], startBlock: uint64, portalWorkers: int): Future[void] {.async.} = +proc getBlockLoop( + node: PortalNode, + blockQueue: AsyncQueue[EthBlock], + startBlock: uint64, + portalWorkers: int, +): Future[void] {.async.} = const bufferSize = 8192 - let historyNetwork = node.historyNetwork.value() - let blockNumberQueue = newAsyncQueue[(uint64, uint64)](2048) - - var blockNumber = startBlock - var blocks: seq[EthBlock] = newSeq[EthBlock](bufferSize) - var count = 0 - var failureCount = 0 + let + historyNetwork = node.historyNetwork.value() + blockNumberQueue = newAsyncQueue[(uint64, uint64)](portalWorkers * 2) - # Note: Could make these stuint bitmasks - var downloadFinished: array[bufferSize, bool] - var downloadStarted: array[bufferSize, bool] + var + blocks: array[bufferSize, EthBlock] + # Note: Could make this stuint bitmask + downloadFinished: array[bufferSize, bool] + # stats counters + totalDownloadCount = 0 + totalFailureCount = 0 proc blockWorker(node: PortalNode): Future[void] {.async.} = while true: - let (blockNumber, i) = await blockNumberQueue.popFirst() - var currentBlockFailures = 0 + let (blockNumberOffset, i) = await blockNumberQueue.popFirst() + var blockFailureCount = 0 while true: - let (header, body) = (await historyNetwork.getBlock(blockNumber + i)).valueOr: - currentBlockFailures.inc() - if currentBlockFailures > 10: - fatal "Block download failed too many times", blockNumber = blockNumber + i, currentBlockFailures + let blockNumber = blockNumberOffset + i + let (header, body) = (await historyNetwork.getBlock(blockNumber)).valueOr: + blockFailureCount.inc() + totalFailureCount.inc() + debug "Failed to get block", blockNumber, blockFailureCount + if blockFailureCount > 10: + fatal "Block download failed too many times", blockNumber, blockFailureCount quit(QuitFailure) - debug "Failed to get block", blockNumber = blockNumber + i, currentBlockFailures - failureCount.inc() continue blocks[i] = init(EthBlock, header, body) downloadFinished[i] = true - count.inc() + totalDownloadCount.inc() break @@ -261,34 +267,44 @@ proc getBlockLoop(node: PortalNode, blockQueue: AsyncQueue[EthBlock], startBlock for i in 0 ..< portalWorkers: workers.add node.blockWorker() - info "Start downloading blocks", startBlock = blockNumber - var i = 0'u64 - var nextDownloadedIndex = 0 + info "Start downloading blocks", startBlock + var + blockNumberOffset = startBlock + nextReadIndex = 0 + nextWriteIndex = 0 + let t0 = Moment.now() while true: - while downloadFinished[nextDownloadedIndex]: - debug "Adding block to the processing queue", blockNumber = nextDownloadedIndex.uint64 + blockNumber - await blockQueue.addLast(blocks[nextDownloadedIndex]) - downloadFinished[nextDownloadedIndex] = false - downloadStarted[nextDownloadedIndex] = false - nextDownloadedIndex = (nextDownloadedIndex + 1) mod bufferSize - - # TODO: can use the read pointer nextDownloadedIndex instead and get rid of downloadStarted - if not downloadStarted[i]: - debug "Adding block to the download queue", blockNumber = blockNumber + i - await blockNumberQueue.addLast((blockNumber, i)) - downloadStarted[i] = true - # TODO clean this up by directly using blocknumber with modulo calc - if i == bufferSize.uint64 - 1: - blockNumber += bufferSize.uint64 + while downloadFinished[nextReadIndex]: + debug "Adding block to the processing queue", + blockNumber = blockNumberOffset + nextReadIndex.uint64 + await blockQueue.addLast(blocks[nextReadIndex]) + downloadFinished[nextReadIndex] = false + nextReadIndex = (nextReadIndex + 1) mod bufferSize + if nextReadIndex == 0: let t1 = Moment.now() let diff = (t1 - t0).nanoseconds().float / 1000000000 - let avgBps = count.float / diff - info "Total blocks downloaded", count = count, failureCount = failureCount, failureRate = failureCount.float / count.float, avgBps = avgBps - i = (i + 1'u64) mod bufferSize.uint64 + let avgBps = totalDownloadCount.float / diff + info "Total blocks downloaded", + totalDownloadCount, + totalFailureCount, + avgBps, + failureRate = totalFailureCount.float / totalDownloadCount.float + + if nextWriteIndex != (nextReadIndex + bufferSize - 1) mod bufferSize: + debug "Adding block to the download queue", + blockNumber = blockNumberOffset + nextWriteIndex.uint64 + await blockNumberQueue.addLast((blockNumberOffset, nextWriteIndex.uint64)) + nextWriteIndex = (nextWriteIndex + 1) mod bufferSize + if nextWriteIndex == 0: + blockNumberOffset += bufferSize.uint64 else: - await sleepAsync(1.nanoseconds) + debug "Waiting to add block downloads", + nextReadIndex, + nextWriteIndex, + blockNumber = blockNumberOffset + nextReadIndex.uint64 + await sleepAsync(1.seconds) proc importBlocks*(conf: NimbusConf, com: CommonRef, node: PortalNode, blockQueue: AsyncQueue[EthBlock]) {.async.} = proc controlCHandler() {.noconv.} = From ca10478d4ee5aeede434ea3314a5b349a379eb4a Mon Sep 17 00:00:00 2001 From: kdeme <7857583+kdeme@users.noreply.github.com> Date: Wed, 19 Mar 2025 15:04:57 +0100 Subject: [PATCH 3/3] Add metrics for block downloads --- execution_chain/config.nim | 7 +++++++ execution_chain/nimbus_import.nim | 13 +++++++++++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/execution_chain/config.nim b/execution_chain/config.nim index f6acaba08d..971311bd50 100644 --- a/execution_chain/config.nim +++ b/execution_chain/config.nim @@ -544,6 +544,13 @@ type defaultValue: 128 name: "debug-portal-workers".}: int + alpha* {. + hidden, + desc: "The Kademlia concurrency factor", + defaultValue: 3, + name: "debug-alpha" + .}: int + of `import-rlp`: blocksFile* {. argument diff --git a/execution_chain/nimbus_import.nim b/execution_chain/nimbus_import.nim index 548f9f5865..bdc2f0d1e5 100644 --- a/execution_chain/nimbus_import.nim +++ b/execution_chain/nimbus_import.nim @@ -41,6 +41,11 @@ declareCounter nec_imported_transactions, "Transactions processed during import" declareCounter nec_imported_gas, "Gas processed during import" +declareGauge nec_download_block_number, "Latest in order downloaded block number" + +declareCounter nec_downloaded_blocks, "Blocks downloaded during import" + + var running {.volatile.} = true proc openCsv(name: string): File = @@ -187,7 +192,7 @@ proc run(config: NimbusConf): PortalNode {. ## Portal node setup let portalProtocolConfig = PortalProtocolConfig.init( - DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop, defaultAlpha, RadiusConfig(kind: Static, logRadius: 249), + DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop, config.alpha, RadiusConfig(kind: Static, logRadius: 249), defaultDisablePoke, defaultMaxGossipNodes, defaultContentCacheSize, defaultDisableContentCache, defaultMaxConcurrentOffers, defaultDisableBanNodes, ) @@ -227,7 +232,7 @@ proc getBlockLoop( startBlock: uint64, portalWorkers: int, ): Future[void] {.async.} = - const bufferSize = 8192 + const bufferSize = 8192*4 let historyNetwork = node.historyNetwork.value() @@ -257,6 +262,7 @@ proc getBlockLoop( continue + nec_downloaded_blocks.inc() blocks[i] = init(EthBlock, header, body) downloadFinished[i] = true totalDownloadCount.inc() @@ -277,6 +283,9 @@ proc getBlockLoop( while true: while downloadFinished[nextReadIndex]: + # TODO: Fix this counter, it's not accurate as blockNumberOffset updates + # differently than the block being passed around here + nec_download_block_number.set((blockNumberOffset + nextReadIndex.uint64).int64) debug "Adding block to the processing queue", blockNumber = blockNumberOffset + nextReadIndex.uint64 await blockQueue.addLast(blocks[nextReadIndex])