diff --git a/execution_chain/config.nim b/execution_chain/config.nim index 04b1e55bac..971311bd50 100644 --- a/execution_chain/config.nim +++ b/execution_chain/config.nim @@ -532,6 +532,25 @@ type defaultValue: false name: "debug-store-slot-hashes".}: bool + usePortal* {. + hidden + desc: "Use portal network instead of era files" + defaultValue: false + name: "debug-use-portal".}: bool + + portalWorkers* {. + hidden + desc: "Amount of Portal workers to use for downloading blocks" + defaultValue: 128 + name: "debug-portal-workers".}: int + + alpha* {. + hidden, + desc: "The Kademlia concurrency factor", + defaultValue: 3, + name: "debug-alpha" + .}: int + of `import-rlp`: blocksFile* {. argument diff --git a/execution_chain/nimbus_execution_client.nim b/execution_chain/nimbus_execution_client.nim index 730c4ecd72..a2bf3ab05a 100644 --- a/execution_chain/nimbus_execution_client.nim +++ b/execution_chain/nimbus_execution_client.nim @@ -247,7 +247,7 @@ proc run(nimbus: NimbusNode, conf: NimbusConf) = case conf.cmd of NimbusCmd.`import`: - importBlocks(conf, com) + importBlocksPortal(conf, com) of NimbusCmd.`import-rlp`: importRlpBlocks(conf, com) else: diff --git a/execution_chain/nimbus_import.nim b/execution_chain/nimbus_import.nim index 8ab72ac028..bdc2f0d1e5 100644 --- a/execution_chain/nimbus_import.nim +++ b/execution_chain/nimbus_import.nim @@ -13,7 +13,8 @@ import chronicles, metrics, chronos/timer, - std/[strformat, strutils], + chronos, + std/[strformat, strutils, os], stew/io2, beacon_chain/era_db, beacon_chain/networking/network_metadata, @@ -21,7 +22,16 @@ import ./common/common, ./core/chain, ./db/era1_db, - ./utils/era_helpers + ./utils/era_helpers, + eth/common/keys, # rng + eth/net/nat, # setupAddress + eth/p2p/discoveryv5/protocol as discv5_protocol, + eth/p2p/discoveryv5/routing_table, + eth/p2p/discoveryv5/enr, + ../fluffy/portal_node, + ../fluffy/common/common_utils, # getPersistentNetKey, getPersistentEnr + ../fluffy/network_metadata, + ../fluffy/version declareGauge nec_import_block_number, "Latest imported block number" @@ -31,6 +41,11 @@ declareCounter nec_imported_transactions, "Transactions processed during import" declareCounter nec_imported_gas, "Gas processed during import" +declareGauge nec_download_block_number, "Latest in order downloaded block number" + +declareCounter nec_downloaded_blocks, "Blocks downloaded during import" + + var running {.volatile.} = true proc openCsv(name: string): File = @@ -95,7 +110,212 @@ template boolFlag(flags, b): PersistBlockFlags = else: {} -proc importBlocks*(conf: NimbusConf, com: CommonRef) = +proc run(config: NimbusConf): PortalNode {. + raises: [CatchableError] +.} = + let rng = newRng() + + ## Network configuration + let + bindIp = config.listenAddress + udpPort = Port(config.udpPort) + # TODO: allow for no TCP port mapping! + (extIp, _, extUdpPort) = + try: + setupAddress(config.nat, config.listenAddress, udpPort, udpPort, "portal") + except CatchableError as exc: + raiseAssert exc.msg + # raise exc # TODO: Ideally we don't have the Exception here + except Exception as exc: + raiseAssert exc.msg + (netkey, newNetKey) = + # if config.netKey.isSome(): + # (config.netKey.get(), true) + # else: + getPersistentNetKey(rng[], config.dataDir / "netkey") + + enrFilePath = config.dataDir / "nimbus_portal_node.enr" + previousEnr = + if not newNetKey: + getPersistentEnr(enrFilePath) + else: + Opt.none(enr.Record) + + var bootstrapRecords: seq[Record] + # loadBootstrapFile(string config.bootstrapNodesFile, bootstrapRecords) + # bootstrapRecords.add(config.bootstrapNodes) + + # case config.network + # of PortalNetwork.none: + # discard # don't connect to any network bootstrap nodes + # of PortalNetwork.mainnet: + # for enrURI in mainnetBootstrapNodes: + # let res = enr.Record.fromURI(enrURI) + # if res.isOk(): + # bootstrapRecords.add(res.value) + # of PortalNetwork.angelfood: + # for enrURI in angelfoodBootstrapNodes: + # let res = enr.Record.fromURI(enrURI) + # if res.isOk(): + # bootstrapRecords.add(res.value) + + # Only mainnet + for enrURI in mainnetBootstrapNodes: + let res = enr.Record.fromURI(enrURI) + if res.isOk(): + bootstrapRecords.add(res.value) + + ## Discovery v5 protocol setup + let + discoveryConfig = + DiscoveryConfig.init(DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop) + d = newProtocol( + netkey, + extIp, + Opt.none(Port), + extUdpPort, + # Note: The addition of default clientInfo to the ENR is a temporary + # measure to easily identify & debug the clients used in the testnet. + # Might make this into a, default off, cli option. + localEnrFields = {"c": enrClientInfoShort}, + bootstrapRecords = bootstrapRecords, + previousRecord = previousEnr, + bindIp = bindIp, + bindPort = udpPort, + enrAutoUpdate = true, + config = discoveryConfig, + rng = rng, + ) + + d.open() + + ## Portal node setup + let + portalProtocolConfig = PortalProtocolConfig.init( + DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop, config.alpha, RadiusConfig(kind: Static, logRadius: 249), + defaultDisablePoke, defaultMaxGossipNodes, defaultContentCacheSize, + defaultDisableContentCache, defaultMaxConcurrentOffers, defaultDisableBanNodes, + ) + + portalNodeConfig = PortalNodeConfig( + accumulatorFile: Opt.none(string), + disableStateRootValidation: true, + trustedBlockRoot: Opt.none(Digest), + portalConfig: portalProtocolConfig, + dataDir: string config.dataDir, + storageCapacity: 0, + contentRequestRetries: 1 + ) + + node = PortalNode.new( + PortalNetwork.mainnet, + portalNodeConfig, + d, + {PortalSubnetwork.history}, + bootstrapRecords = bootstrapRecords, + rng = rng, + ) + + let enrFile = config.dataDir / "nimbus_portal_node.enr" + if io2.writeFile(enrFile, d.localNode.record.toURI()).isErr: + fatal "Failed to write the enr file", file = enrFile + quit 1 + + ## Start the Portal node. + node.start() + + node + +proc getBlockLoop( + node: PortalNode, + blockQueue: AsyncQueue[EthBlock], + startBlock: uint64, + portalWorkers: int, +): Future[void] {.async.} = + const bufferSize = 8192*4 + + let + historyNetwork = node.historyNetwork.value() + blockNumberQueue = newAsyncQueue[(uint64, uint64)](portalWorkers * 2) + + var + blocks: array[bufferSize, EthBlock] + # Note: Could make this stuint bitmask + downloadFinished: array[bufferSize, bool] + # stats counters + totalDownloadCount = 0 + totalFailureCount = 0 + + proc blockWorker(node: PortalNode): Future[void] {.async.} = + while true: + let (blockNumberOffset, i) = await blockNumberQueue.popFirst() + var blockFailureCount = 0 + while true: + let blockNumber = blockNumberOffset + i + let (header, body) = (await historyNetwork.getBlock(blockNumber)).valueOr: + blockFailureCount.inc() + totalFailureCount.inc() + debug "Failed to get block", blockNumber, blockFailureCount + if blockFailureCount > 10: + fatal "Block download failed too many times", blockNumber, blockFailureCount + quit(QuitFailure) + + continue + + nec_downloaded_blocks.inc() + blocks[i] = init(EthBlock, header, body) + downloadFinished[i] = true + totalDownloadCount.inc() + + break + + var workers: seq[Future[void]] = @[] + for i in 0 ..< portalWorkers: + workers.add node.blockWorker() + + info "Start downloading blocks", startBlock + var + blockNumberOffset = startBlock + nextReadIndex = 0 + nextWriteIndex = 0 + + let t0 = Moment.now() + + while true: + while downloadFinished[nextReadIndex]: + # TODO: Fix this counter, it's not accurate as blockNumberOffset updates + # differently than the block being passed around here + nec_download_block_number.set((blockNumberOffset + nextReadIndex.uint64).int64) + debug "Adding block to the processing queue", + blockNumber = blockNumberOffset + nextReadIndex.uint64 + await blockQueue.addLast(blocks[nextReadIndex]) + downloadFinished[nextReadIndex] = false + nextReadIndex = (nextReadIndex + 1) mod bufferSize + if nextReadIndex == 0: + let t1 = Moment.now() + let diff = (t1 - t0).nanoseconds().float / 1000000000 + let avgBps = totalDownloadCount.float / diff + info "Total blocks downloaded", + totalDownloadCount, + totalFailureCount, + avgBps, + failureRate = totalFailureCount.float / totalDownloadCount.float + + if nextWriteIndex != (nextReadIndex + bufferSize - 1) mod bufferSize: + debug "Adding block to the download queue", + blockNumber = blockNumberOffset + nextWriteIndex.uint64 + await blockNumberQueue.addLast((blockNumberOffset, nextWriteIndex.uint64)) + nextWriteIndex = (nextWriteIndex + 1) mod bufferSize + if nextWriteIndex == 0: + blockNumberOffset += bufferSize.uint64 + else: + debug "Waiting to add block downloads", + nextReadIndex, + nextWriteIndex, + blockNumber = blockNumberOffset + nextReadIndex.uint64 + await sleepAsync(1.seconds) + +proc importBlocks*(conf: NimbusConf, com: CommonRef, node: PortalNode, blockQueue: AsyncQueue[EthBlock]) {.async.} = proc controlCHandler() {.noconv.} = when defined(windows): # workaround for https://github.com/nim-lang/Nim/issues/4057 @@ -126,7 +346,7 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = boolFlag(NoPersistBodies, not conf.storeBodies) + boolFlag({PersistBlockFlag.NoPersistReceipts}, not conf.storeReceipts) + boolFlag({PersistBlockFlag.NoPersistSlotHashes}, not conf.storeSlotHashes) - blk: Block + blk: blocks.Block persister = Persister.init(com, flags) cstats: PersistStats # stats at start of chunk @@ -299,11 +519,16 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = while running and persister.stats.blocks.uint64 < conf.maxBlocks and blockNumber <= lastEra1Block: - if not loadEraBlock(blockNumber): - notice "No more `era1` blocks to import", blockNumber, slot - break - persistBlock() - checkpoint() + if not conf.usePortal: + if not loadEraBlock(blockNumber): + notice "No more `era1` blocks to import", blockNumber, slot + break + persistBlock() + checkpoint() + else: + blk = await blockQueue.popFirst() + persistBlock() + checkpoint() block era1Import: if blockNumber > lastEra1Block: @@ -375,3 +600,22 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) = blocks = persister.stats.blocks, txs = persister.stats.txs, mgas = f(persister.stats.gas.float / 1000000) + +proc importBlocksPortal*(conf: NimbusConf, com: CommonRef) {. + raises: [CatchableError] +.} = + let + portalNode = run(conf) + blockQueue = newAsyncQueue[EthBlock]() + start = com.db.baseTxFrame().getSavedStateBlockNumber() + 1 + + if conf.usePortal: + asyncSpawn portalNode.getBlockLoop(blockQueue, start, conf.portalWorkers) + + asyncSpawn importBlocks(conf, com, portalNode, blockQueue) + + while running: + try: + poll() + except CatchableError as e: + warn "Exception in poll()", exc = e.name, err = e.msg diff --git a/fluffy/network/history/history_network.nim b/fluffy/network/history/history_network.nim index 4315d8669a..71975dd0b1 100644 --- a/fluffy/network/history/history_network.nim +++ b/fluffy/network/history/history_network.nim @@ -201,7 +201,7 @@ proc getBlockBody*( n.portalProtocol.banNode( bodyContent.receivedFrom.id, NodeBanDurationContentLookupFailedValidation ) - warn "Validation of block body failed", + debug "Validation of block body failed", error, node = bodyContent.receivedFrom.record.toURI() continue