Skip to content

Commit d870f14

Browse files
committed
PoC Syncing nimbus EL from Portal network
To be heavily cleaned up, very quick version to make a streaming version work instead of the previous version which would do batches of 8192.
1 parent 407410d commit d870f14

File tree

4 files changed

+243
-11
lines changed

4 files changed

+243
-11
lines changed

execution_chain/config.nim

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,12 +540,25 @@ type
540540
defaultValue: false
541541
name: "debug-store-slot-hashes".}: bool
542542

543+
usePortal* {.
544+
hidden
545+
desc: "Use portal network instead of era files"
546+
defaultValue: false
547+
name: "debug-use-portal".}: bool
548+
549+
portalWorkers* {.
550+
hidden
551+
desc: "Amount of Portal workers to use for downloading blocks"
552+
defaultValue: 512
553+
name: "debug-portal-workers".}: int
554+
543555
of `import-rlp`:
544556
blocksFile* {.
545557
argument
546558
desc: "One or more RLP encoded block(s) files"
547559
name: "blocks-file" }: seq[InputFile]
548560

561+
549562
func parseCmdArg(T: type NetworkId, p: string): T
550563
{.gcsafe, raises: [ValueError].} =
551564
parseBiggestUInt(p).T

execution_chain/nimbus_execution_client.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ proc run(nimbus: NimbusNode, conf: NimbusConf) =
253253

254254
case conf.cmd
255255
of NimbusCmd.`import`:
256-
importBlocks(conf, com)
256+
importBlocksPortal(conf, com)
257257
of NimbusCmd.`import-rlp`:
258258
importRlpBlocks(conf, com)
259259
else:

execution_chain/nimbus_import.nim

Lines changed: 228 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,25 @@ import
1313
chronicles,
1414
metrics,
1515
chronos/timer,
16-
std/[strformat, strutils],
16+
chronos,
17+
std/[strformat, strutils, os],
1718
stew/io2,
1819
beacon_chain/era_db,
1920
beacon_chain/networking/network_metadata,
2021
./config,
2122
./common/common,
2223
./core/chain,
2324
./db/era1_db,
24-
./utils/era_helpers
25+
./utils/era_helpers,
26+
eth/common/keys, # rng
27+
eth/net/nat, # setupAddress
28+
eth/p2p/discoveryv5/protocol as discv5_protocol,
29+
eth/p2p/discoveryv5/routing_table,
30+
eth/p2p/discoveryv5/enr,
31+
../fluffy/portal_node,
32+
../fluffy/common/common_utils, # getPersistentNetKey, getPersistentEnr
33+
../fluffy/network_metadata,
34+
../fluffy/version
2535

2636
declareGauge nec_import_block_number, "Latest imported block number"
2737

@@ -87,7 +97,192 @@ template boolFlag(flags, b): PersistBlockFlags =
8797
else:
8898
{}
8999

90-
proc importBlocks*(conf: NimbusConf, com: CommonRef) =
100+
proc run(config: NimbusConf): PortalNode {.
101+
raises: [CatchableError]
102+
.} =
103+
let rng = newRng()
104+
105+
## Network configuration
106+
let
107+
bindIp = config.listenAddress
108+
udpPort = Port(config.udpPort)
109+
# TODO: allow for no TCP port mapping!
110+
(extIp, _, extUdpPort) =
111+
try:
112+
setupAddress(config.nat, config.listenAddress, udpPort, udpPort, "portal")
113+
except CatchableError as exc:
114+
raiseAssert exc.msg
115+
# raise exc # TODO: Ideally we don't have the Exception here
116+
except Exception as exc:
117+
raiseAssert exc.msg
118+
(netkey, newNetKey) =
119+
# if config.netKey.isSome():
120+
# (config.netKey.get(), true)
121+
# else:
122+
getPersistentNetKey(rng[], config.dataDir / "netkey")
123+
124+
enrFilePath = config.dataDir / "nimbus_portal_node.enr"
125+
previousEnr =
126+
if not newNetKey:
127+
getPersistentEnr(enrFilePath)
128+
else:
129+
Opt.none(enr.Record)
130+
131+
var bootstrapRecords: seq[Record]
132+
# loadBootstrapFile(string config.bootstrapNodesFile, bootstrapRecords)
133+
# bootstrapRecords.add(config.bootstrapNodes)
134+
135+
# case config.network
136+
# of PortalNetwork.none:
137+
# discard # don't connect to any network bootstrap nodes
138+
# of PortalNetwork.mainnet:
139+
# for enrURI in mainnetBootstrapNodes:
140+
# let res = enr.Record.fromURI(enrURI)
141+
# if res.isOk():
142+
# bootstrapRecords.add(res.value)
143+
# of PortalNetwork.angelfood:
144+
# for enrURI in angelfoodBootstrapNodes:
145+
# let res = enr.Record.fromURI(enrURI)
146+
# if res.isOk():
147+
# bootstrapRecords.add(res.value)
148+
149+
# Only mainnet
150+
for enrURI in mainnetBootstrapNodes:
151+
let res = enr.Record.fromURI(enrURI)
152+
if res.isOk():
153+
bootstrapRecords.add(res.value)
154+
155+
## Discovery v5 protocol setup
156+
let
157+
discoveryConfig =
158+
DiscoveryConfig.init(DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop)
159+
d = newProtocol(
160+
netkey,
161+
extIp,
162+
Opt.none(Port),
163+
extUdpPort,
164+
# Note: The addition of default clientInfo to the ENR is a temporary
165+
# measure to easily identify & debug the clients used in the testnet.
166+
# Might make this into a, default off, cli option.
167+
localEnrFields = {"c": enrClientInfoShort},
168+
bootstrapRecords = bootstrapRecords,
169+
previousRecord = previousEnr,
170+
bindIp = bindIp,
171+
bindPort = udpPort,
172+
enrAutoUpdate = true,
173+
config = discoveryConfig,
174+
rng = rng,
175+
)
176+
177+
d.open()
178+
179+
## Portal node setup
180+
let
181+
portalProtocolConfig = PortalProtocolConfig.init(
182+
DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop, defaultAlpha, RadiusConfig(kind: Static, logRadius: 249),
183+
defaultDisablePoke, defaultMaxGossipNodes, defaultContentCacheSize,
184+
defaultDisableContentCache, defaultMaxConcurrentOffers, defaultDisableBanNodes,
185+
)
186+
187+
portalNodeConfig = PortalNodeConfig(
188+
accumulatorFile: Opt.none(string),
189+
disableStateRootValidation: true,
190+
trustedBlockRoot: Opt.none(Digest),
191+
portalConfig: portalProtocolConfig,
192+
dataDir: string config.dataDir,
193+
storageCapacity: 0,
194+
contentRequestRetries: 1
195+
)
196+
197+
node = PortalNode.new(
198+
PortalNetwork.mainnet,
199+
portalNodeConfig,
200+
d,
201+
{PortalSubnetwork.history},
202+
bootstrapRecords = bootstrapRecords,
203+
rng = rng,
204+
)
205+
206+
let enrFile = config.dataDir / "nimbus_portal_node.enr"
207+
if io2.writeFile(enrFile, d.localNode.record.toURI()).isErr:
208+
fatal "Failed to write the enr file", file = enrFile
209+
quit 1
210+
211+
## Start the Portal node.
212+
node.start()
213+
214+
node
215+
216+
proc getBlockLoop(node: PortalNode, blockQueue: AsyncQueue[EthBlock], startBlock: uint64, portalWorkers: int): Future[void] {.async.} =
217+
const bufferSize = 8192
218+
219+
let historyNetwork = node.historyNetwork.value()
220+
let blockNumberQueue = newAsyncQueue[(uint64, uint64)](2048)
221+
222+
var blockNumber = startBlock
223+
var blocks: seq[EthBlock] = newSeq[EthBlock](bufferSize)
224+
var count = 0
225+
var failureCount = 0
226+
227+
# Note: Could make these stuint bitmasks
228+
var downloadFinished: array[bufferSize, bool]
229+
var downloadStarted: array[bufferSize, bool]
230+
231+
proc blockWorker(node: PortalNode): Future[void] {.async.} =
232+
while true:
233+
let (blockNumber, i) = await blockNumberQueue.popFirst()
234+
var currentBlockFailures = 0
235+
while true:
236+
let (header, body) = (await historyNetwork.getBlock(blockNumber + i)).valueOr:
237+
currentBlockFailures.inc()
238+
if currentBlockFailures > 10:
239+
fatal "Block download failed too many times", blockNumber = blockNumber + i, currentBlockFailures
240+
quit(QuitFailure)
241+
242+
debug "Failed to get block", blockNumber = blockNumber + i, currentBlockFailures
243+
failureCount.inc()
244+
continue
245+
246+
blocks[i] = init(EthBlock, header, body)
247+
downloadFinished[i] = true
248+
count.inc()
249+
250+
break
251+
252+
var workers: seq[Future[void]] = @[]
253+
for i in 0 ..< portalWorkers:
254+
workers.add node.blockWorker()
255+
256+
info "Start downloading blocks", startBlock = blockNumber
257+
var i = 0'u64
258+
var nextDownloadedIndex = 0
259+
let t0 = Moment.now()
260+
261+
while true:
262+
while downloadFinished[nextDownloadedIndex]:
263+
debug "Adding block to the processing queue", blockNumber = nextDownloadedIndex.uint64 + blockNumber
264+
await blockQueue.addLast(blocks[nextDownloadedIndex])
265+
downloadFinished[nextDownloadedIndex] = false
266+
downloadStarted[nextDownloadedIndex] = false
267+
nextDownloadedIndex = (nextDownloadedIndex + 1) mod bufferSize
268+
269+
# TODO: can use the read pointer nextDownloadedIndex instead and get rid of downloadStarted
270+
if not downloadStarted[i]:
271+
debug "Adding block to the download queue", blockNumber = blockNumber + i
272+
await blockNumberQueue.addLast((blockNumber, i))
273+
downloadStarted[i] = true
274+
# TODO clean this up by directly using blocknumber with modulo calc
275+
if i == bufferSize.uint64 - 1:
276+
blockNumber += bufferSize.uint64
277+
let t1 = Moment.now()
278+
let diff = (t1 - t0).nanoseconds().float / 1000000000
279+
let avgBps = count.float / diff
280+
info "Total blocks downloaded", count = count, failureCount = failureCount, failureRate = failureCount.float / count.float, avgBps = avgBps
281+
i = (i + 1'u64) mod bufferSize.uint64
282+
else:
283+
await sleepAsync(1.nanoseconds)
284+
285+
proc importBlocks*(conf: NimbusConf, com: CommonRef, node: PortalNode, blockQueue: AsyncQueue[EthBlock]) {.async.} =
91286
proc controlCHandler() {.noconv.} =
92287
when defined(windows):
93288
# workaround for https://github.com/nim-lang/Nim/issues/4057
@@ -118,7 +313,7 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
118313
boolFlag(NoPersistBodies, not conf.storeBodies) +
119314
boolFlag({PersistBlockFlag.NoPersistReceipts}, not conf.storeReceipts) +
120315
boolFlag({PersistBlockFlag.NoPersistSlotHashes}, not conf.storeSlotHashes)
121-
blk: Block
316+
blk: blocks.Block
122317
persister = Persister.init(com, flags)
123318
cstats: PersistStats # stats at start of chunk
124319

@@ -292,11 +487,16 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
292487

293488
while running and persister.stats.blocks.uint64 < conf.maxBlocks and
294489
blockNumber <= lastEra1Block:
295-
if not loadEraBlock(blockNumber):
296-
notice "No more `era1` blocks to import", blockNumber, slot
297-
break
298-
persistBlock()
299-
checkpoint()
490+
if not conf.usePortal:
491+
if not loadEraBlock(blockNumber):
492+
notice "No more `era1` blocks to import", blockNumber, slot
493+
break
494+
persistBlock()
495+
checkpoint()
496+
else:
497+
blk = await blockQueue.popFirst()
498+
persistBlock()
499+
checkpoint()
300500

301501
block era1Import:
302502
if blockNumber > lastEra1Block:
@@ -368,3 +568,22 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
368568
blocks = persister.stats.blocks,
369569
txs = persister.stats.txs,
370570
mgas = f(persister.stats.gas.float / 1000000)
571+
572+
proc importBlocksPortal*(conf: NimbusConf, com: CommonRef) {.
573+
raises: [CatchableError]
574+
.} =
575+
let
576+
portalNode = run(conf)
577+
blockQueue = newAsyncQueue[EthBlock]()
578+
start = com.db.baseTxFrame().getSavedStateBlockNumber() + 1
579+
580+
if conf.usePortal:
581+
asyncSpawn portalNode.getBlockLoop(blockQueue, start, conf.portalWorkers)
582+
583+
asyncSpawn importBlocks(conf, com, portalNode, blockQueue)
584+
585+
while running:
586+
try:
587+
poll()
588+
except CatchableError as e:
589+
warn "Exception in poll()", exc = e.name, err = e.msg

fluffy/network/history/history_network.nim

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ proc getBlockBody*(
201201
n.portalProtocol.banNode(
202202
bodyContent.receivedFrom.id, NodeBanDurationContentLookupFailedValidation
203203
)
204-
warn "Validation of block body failed",
204+
debug "Validation of block body failed",
205205
error, node = bodyContent.receivedFrom.record.toURI()
206206
continue
207207

0 commit comments

Comments
 (0)