diff --git a/beacon_chain/beacon_chain_file.nim b/beacon_chain/beacon_chain_file.nim index f19d4cf84c..f8c4c2a45a 100644 --- a/beacon_chain/beacon_chain_file.nim +++ b/beacon_chain/beacon_chain_file.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2018-2024 Status Research & Development GmbH +# Copyright (c) 2018-2025 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -79,6 +79,8 @@ const int(ConsensusFork.Phase0) .. int(high(ConsensusFork)) BlobForkCodeRange = MaxForksCount .. (MaxForksCount + int(high(ConsensusFork)) - int(ConsensusFork.Deneb)) + DataColumnForkCodeRange = + MaxForksCount * 2 .. (MaxForksCount * 2 + int(high(ConsensusFork)) - int(ConsensusFork.Fulu)) func getBlockForkCode(fork: ConsensusFork): uint64 = uint64(fork) @@ -94,6 +96,13 @@ func getBlobForkCode(fork: ConsensusFork): uint64 = of ConsensusFork.Phase0 .. ConsensusFork.Capella: raiseAssert "Blobs are not supported for the fork" +func getDataColumnForkCode(fork: ConsensusFork): uint64 = + case fork + of ConsensusFork.Fulu: + uint64(MaxForksCount) + of ConsensusFork.Phase0 .. ConsensusFork.Electra: + raiseAssert "Data columns are not supported for the fork" + proc init(t: typedesc[ChainFileError], k: ChainFileErrorType, m: string): ChainFileError = ChainFileError(kind: k, message: m) @@ -134,7 +143,8 @@ proc checkKind(kind: uint64): Result[void, string] = if res > uint64(high(int)): return err("Unsuppoted chunk kind value") int(res) - if (hkind in BlockForkCodeRange) or (hkind in BlobForkCodeRange): + if (hkind in BlockForkCodeRange) or (hkind in BlobForkCodeRange) or + (hkind in DataColumnForkCodeRange): ok() else: err("Unsuppoted chunk kind value") @@ -260,6 +270,12 @@ template getBlobChunkKind(kind: ConsensusFork, last: bool): uint64 = else: getBlobForkCode(kind) +template getDataColumnChunkKind(kind: ConsensusFork,last: bool): uint64 = + if last: + maskKind(getDataColumnForkCode(kind)) + else: + getDataColumnForkCode(kind) + proc getBlockConsensusFork(header: ChainFileHeader): ConsensusFork = let hkind = unmaskKind(header.kind) if int(hkind) in BlockForkCodeRange: @@ -275,6 +291,10 @@ template isBlob(h: ChainFileHeader | ChainFileFooter): bool = let hkind = unmaskKind(h.kind) int(hkind) in BlobForkCodeRange +template isDataColumn(h: ChainFileHeader | ChainFileFooter): bool = + let hkind = unmaskKind(h.kind) + int(hkind) in DataColumnForkCodeRange + template isLast(h: ChainFileHeader | ChainFileFooter): bool = h.kind.isLast() @@ -291,7 +311,8 @@ proc setTail*(chandle: var ChainFileHandle, bdata: BlockData) = chandle.data.tail = Opt.some(bdata) proc store*(chandle: ChainFileHandle, signedBlock: ForkedSignedBeaconBlock, - blobs: Opt[BlobSidecars]): Result[void, string] = + blobs: Opt[BlobSidecars], dataColumns: Opt[DataColumnSidecars]): + Result[void, string] = let origOffset = updateFilePos(chandle.handle, 0'i64, SeekPosition.SeekEnd).valueOr: return err(ioErrorMsg(error)) @@ -342,6 +363,36 @@ proc store*(chandle: ChainFileHandle, signedBlock: ForkedSignedBeaconBlock, discard fsync(chandle.handle) return err(IncompleteWriteError) + if dataColumns.isSome(): + let dataColumnSidecars = + dataColumns.get + for index, dataColumn in dataColumnSidecars.pairs(): + let + kind = + getDataColumnChunkKind(signedBlock.kind, (index + 1) == + len(dataColumnSidecars)) + (data, plainSize) = + block: + let res = SSZ.encode(dataColumn[]) + (snappy.encode(res), len(res)) + slot = dataColumn[].signed_block_header.message.slot + buffer = Chunk.init(kind, uint64(slot), uint32(plainSize), data) + + setFilePos(chandle.handle, 0'i64, SeekPosition.SeekEnd).isOkOr: + discard truncate(chandle.handle, origOffset) + discard fsync(chandle.handle) + return err(ioErrorMsg(error)) + + let + wrote = writeFile(chandle.handle, buffer).valueOr: + discard truncate(chandle.handle, origOffset) + discard fsync(chandle.handle) + return err(ioErrorMsg(error)) + if wrote != uint(len(buffer)): + discard truncate(chandle.handle, origOffset) + discard fsync(chandle.handle) + return err(IncompleteWriteError) + fsync(chandle.handle).isOkOr: discard truncate(chandle.handle, origOffset) return err(ioErrorMsg(error)) @@ -550,6 +601,22 @@ proc decodeBlob( return err("Incorrect blob format") ok(blob) +proc decodeDataColumn( + header: ChainFileHeader, + data: openArray[byte], +): Result[DataColumnSidecar, string] = + if header.plainSize > uint32(MaxChunkSize): + return err("Size of data column is enormously big") + + let + decompressed = snappy.decode(data, uint32(header.plainSize)) + dataColumn = + try: + SSZ.decode(decompressed, DataColumnSidecar) + except SerializationError: + return err("Incorrect data column format") + ok(dataColumn) + proc getChainFileTail*(handle: IoHandle): Result[Opt[BlockData], string] = var sidecars: BlobSidecars while true: diff --git a/beacon_chain/consensus_object_pools/block_pools_types.nim b/beacon_chain/consensus_object_pools/block_pools_types.nim index 6235ea2bf3..47346b1754 100644 --- a/beacon_chain/consensus_object_pools/block_pools_types.nim +++ b/beacon_chain/consensus_object_pools/block_pools_types.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2018-2024 Status Research & Development GmbH +# Copyright (c) 2018-2025 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -288,6 +288,7 @@ type BlockData* = object blck*: ForkedSignedBeaconBlock blob*: Opt[BlobSidecars] + dataColumn*: Opt[DataColumnSidecars] OnBlockAdded*[T: ForkyTrustedSignedBeaconBlock] = proc( blckRef: BlockRef, blck: T, epochRef: EpochRef, diff --git a/beacon_chain/consensus_object_pools/blockchain_list.nim b/beacon_chain/consensus_object_pools/blockchain_list.nim index b7677f8b0b..e4c5e59b5b 100644 --- a/beacon_chain/consensus_object_pools/blockchain_list.nim +++ b/beacon_chain/consensus_object_pools/blockchain_list.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2018-2024 Status Research & Development GmbH +# Copyright (c) 2018-2025 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -9,6 +9,7 @@ import std/sequtils, stew/io2, chronicles, chronos, metrics, ../spec/forks, + ../spec/peerdas_helpers, ../[beacon_chain_file, beacon_clock], ../sszdump @@ -128,16 +129,17 @@ proc setTail*(clist: ChainListRef, bdata: BlockData) = clist.handle = Opt.some(handle) proc store*(clist: ChainListRef, signedBlock: ForkedSignedBeaconBlock, - blobs: Opt[BlobSidecars]): Result[void, string] = + blobs: Opt[BlobSidecars], dataColumns: Opt[DataColumnSidecars]): + Result[void, string] = if clist.handle.isNone(): let filename = clist.path.chainFilePath() flags = {ChainFileFlag.Repair, ChainFileFlag.OpenAlways} handle = ? ChainFileHandle.init(filename, flags) clist.handle = Opt.some(handle) - store(handle, signedBlock, blobs) + store(handle, signedBlock, blobs, dataColumns) else: - store(clist.handle.get(), signedBlock, blobs) + store(clist.handle.get(), signedBlock, blobs, dataColumns) proc checkBlobs(signedBlock: ForkedSignedBeaconBlock, blobsOpt: Opt[BlobSidecars]): Result[void, VerifierError] = @@ -167,9 +169,31 @@ proc checkBlobs(signedBlock: ForkedSignedBeaconBlock, return err(VerifierError.Invalid) ok() +proc checkDataColumns*(signedBlock: ForkedSignedBeaconBlock, + dataColumnsOpt: Opt[DataColumnSidecars]): + Result[void, VerifierError] = + withBlck(signedBlock): + when consensusFork >= ConsensusFork.Fulu: + if dataColumnsOpt.isSome: + let dataColumns = dataColumnsOpt.get() + if dataColumns.len > 0: + for i in 0..= ConsensusFork.Fulu: + var malformed_cols: seq[int] + if dataColumnsOpt.isSome: + let columns = dataColumnsOpt.get() + let kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq + if columns.len > 0 and kzgCommits.len > 0: + for i in 0..= (NUMBER_OF_COLUMNS div 2): + let + recovered_cps = + recover_cells_and_proofs(columns.mapIt(it[])) + recovered_columns = + signedBlock.get_data_column_sidecars(recovered_cps.get) + + for mc in malformed_cols: + # copy the healed columns only into the + # sidecar spaces + columns[mc][] = recovered_columns[mc] + columnsOk = true + + if not columnsOk: + return err(VerifierError.Invalid) + # Establish blob viability before calling addbackfillBlock to avoid # writing the block in case of blob error. var blobsOk = true - when typeof(signedBlock).kind >= ConsensusFork.Deneb: + when typeof(signedBlock).kind >= ConsensusFork.Deneb and + typeof(signedBlock).kind < ConsensusFork.Fulu: if blobsOpt.isSome: let blobs = blobsOpt.get() let kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq @@ -224,6 +278,11 @@ proc storeBackfillBlock( for b in blobs: self.consensusManager.dag.db.putBlobSidecar(b[]) + # Only store data columns after successfully establishing block validity + let columns = dataColumnsOpt.valueOr: DataColumnSidecars @[] + for c in columns: + self.consensusManager.dag.db.putDataColumnSidecar(c[]) + res from web3/engine_api_types import @@ -370,19 +429,19 @@ proc getExecutionValidity( blck = shortLog(blck) return NewPayloadStatus.noResponse -proc checkBloblessSignature( +proc checkBlobOrColumnlessSignature( self: BlockProcessor, signed_beacon_block: deneb.SignedBeaconBlock | electra.SignedBeaconBlock | fulu.SignedBeaconBlock): Result[void, cstring] = let dag = self.consensusManager.dag let parent = dag.getBlockRef(signed_beacon_block.message.parent_root).valueOr: - return err("checkBloblessSignature called with orphan block") + return err("checkBlobOrColumnlessSignature called with orphan block") let proposer = getProposer( dag, parent, signed_beacon_block.message.slot).valueOr: - return err("checkBloblessSignature: Cannot compute proposer") + return err("checkBlobOrColumnlessSignature: Cannot compute proposer") if distinctBase(proposer) != signed_beacon_block.message.proposer_index: - return err("checkBloblessSignature: Incorrect proposer") + return err("checkBlobOrColumnlessSignature: Incorrect proposer") if not verify_block_signature( dag.forkAtEpoch(signed_beacon_block.message.slot.epoch), getStateField(dag.headState, genesis_validators_root), @@ -390,12 +449,12 @@ proc checkBloblessSignature( signed_beacon_block.root, dag.validatorKey(proposer).get(), signed_beacon_block.signature): - return err("checkBloblessSignature: Invalid proposer signature") + return err("checkBlobOrColumnlessSignature: Invalid proposer signature") ok() proc enqueueBlock*( self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, - blobs: Opt[BlobSidecars], + blobs: Opt[BlobSidecars], data_columns: Opt[DataColumnSidecars], resfut: Future[Result[void, VerifierError]].Raising([CancelledError]) = nil, maybeFinalized = false, validationDur = Duration()) = @@ -403,7 +462,7 @@ proc enqueueBlock*( if forkyBlck.message.slot <= self.consensusManager.dag.finalizedHead.slot: # let backfill blocks skip the queue - these are always "fast" to process # because there are no state rewinds to deal with - let res = self.storeBackfillBlock(forkyBlck, blobs) + let res = self.storeBackfillBlock(forkyBlck, blobs, data_columns) resfut.complete(res) return @@ -411,6 +470,7 @@ proc enqueueBlock*( self.blockQueue.addLastNoWait(BlockEntry( blck: blck, blobs: blobs, + columns: data_columns, maybeFinalized: maybeFinalized, resfut: resfut, queueTick: Moment.now(), validationDur: validationDur, @@ -438,6 +498,7 @@ proc storeBlock( self: ref BlockProcessor, src: MsgSource, wallTime: BeaconTime, signedBlock: ForkySignedBeaconBlock, blobsOpt: Opt[BlobSidecars], + dataColumnsOpt: Opt[DataColumnSidecars], maybeFinalized = false, queueTick: Moment = Moment.now(), validationDur = Duration()): Future[Result[BlockRef, (VerifierError, ProcessingStatus)]] {.async: (raises: [CancelledError]).} = @@ -495,6 +556,9 @@ proc storeBlock( if blobsOpt.isSome: for blobSidecar in blobsOpt.get: self.blobQuarantine[].put(blobSidecar) + if dataColumnsOpt.isSome: + for dataColumnSidecar in dataColumnsOpt.get: + self.dataColumnQuarantine[].put(dataColumnSidecar) debug "Block quarantined", blockRoot = shortLog(signedBlock.root), blck = shortLog(signedBlock.message), @@ -535,10 +599,26 @@ proc storeBlock( parent_root = signedBlock.message.parent_root parentBlck = dag.getForkedBlock(parent_root) if parentBlck.isSome(): + var columnsOk = true + let columns = + withBlck(parentBlck.get()): + when consensusFork >= ConsensusFork.Fulu: + var data_column_sidecars: DataColumnSidecars + for i in self.dataColumnQuarantine[].custody_columns: + let data_column = DataColumnSidecar.new() + if not dag.db.getDataColumnSidecar(parent_root, i.ColumnIndex, data_column[]): + columnsOk = false + break + data_column_sidecars.add data_column + Opt.some data_column_sidecars + else: + Opt.none DataColumnSidecars + var blobsOk = true let blobs = withBlck(parentBlck.get()): - when consensusFork >= ConsensusFork.Deneb: + when consensusFork >= ConsensusFork.Deneb and + consensusFork < ConsensusFork.Fulu: var blob_sidecars: BlobSidecars for i in 0 ..< forkyBlck.message.body.blob_kzg_commitments.len: let blob = BlobSidecar.new() @@ -549,10 +629,27 @@ proc storeBlock( Opt.some blob_sidecars else: Opt.none BlobSidecars - if blobsOk: + # Blobs and columns can never co-exist in the same block + # Block has neither blob sidecar nor data column sidecar + if blobs.isNone and columns.isNone: + debug "Loaded parent block from storage", parent_root + self[].enqueueBlock( + MsgSource.gossip, parentBlck.unsafeGet().asSigned(), Opt.none(BlobSidecars), + Opt.none(DataColumnSidecars)) + # Block has blob sidecars associated and NO data column sidecars + # as they cannot co-exist. + if blobsOk and blobs.isSome: + debug "Loaded parent block from storage", parent_root + self[].enqueueBlock( + MsgSource.gossip, parentBlck.unsafeGet().asSigned(), blobs, + Opt.none(DataColumnSidecars)) + # Block has data column sidecars associated and NO blob sidecars + # as they cannot co-exist. + if columnsOk and columns.isSome: debug "Loaded parent block from storage", parent_root self[].enqueueBlock( - MsgSource.gossip, parentBlck.unsafeGet().asSigned(), blobs) + MsgSource.gossip, parentBlck.unsafeGet().asSigned(), Opt.none(BlobSidecars), + columns) return handleVerifierError(parent.error()) @@ -622,10 +719,26 @@ proc storeBlock( let newPayloadTick = Moment.now() + when typeof(signedBlock).kind >= ConsensusFork.Fulu: + if dataColumnsOpt.isSome: + let columns = dataColumnsOpt.get() + let kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq + if columns.len > 0 and kzgCommits.len > 0: + for i in 0..= ConsensusFork.Deneb: + elif typeof(signedBlock).kind >= ConsensusFork.Deneb: if blobsOpt.isSome: let blobs = blobsOpt.get() let kzgCommits = signedBlock.message.body.blob_kzg_commitments.asSeq @@ -688,6 +801,11 @@ proc storeBlock( for b in blobs: self.consensusManager.dag.db.putBlobSidecar(b[]) + # write data columns now that block has been written + let data_columns = dataColumnsOpt.valueOr: DataColumnSidecars @[] + for col in data_columns: + self.consensusManager.dag.db.putDataColumnSidecar(col[]) + let addHeadBlockTick = Moment.now() # Eagerly update head: the incoming block "should" get selected. @@ -830,25 +948,53 @@ proc storeBlock( withBlck(quarantined): when typeof(forkyBlck).kind < ConsensusFork.Deneb: self[].enqueueBlock( - MsgSource.gossip, quarantined, Opt.none(BlobSidecars)) - else: + MsgSource.gossip, quarantined, Opt.none(BlobSidecars), + Opt.none(DataColumnSidecars)) + elif typeof(forkyBlck).kind >= ConsensusFork.Fulu: if len(forkyBlck.message.body.blob_kzg_commitments) == 0: self[].enqueueBlock( - MsgSource.gossip, quarantined, Opt.some(BlobSidecars @[])) + MsgSource.gossip, quarantined, Opt.some(BlobSidecars @[]), + Opt.some(DataColumnSidecars @[])) else: - if (let res = checkBloblessSignature(self[], forkyBlck); res.isErr): + if (let res = checkBlobOrColumnlessSignature(self[], + forkyBlck); + res.isErr): warn "Failed to verify signature of unorphaned blobless block", blck = shortLog(forkyBlck), error = res.error() continue + if self.dataColumnQuarantine[].hasMissingDataColumns(forkyBlck): + let columns = self.dataColumnQuarantine[].popDataColumns( + forkyBlck.root, forkyBlck) + self[].enqueueBlock(MsgSource.gossip, quarantined, Opt.none(BlobSidecars), + Opt.some(columns)) + else: + discard self.consensusManager.quarantine[].addColumnless( + dag.finalizedHead.slot, forkyBlck) + elif typeof(forkyBlck).kind >= ConsensusFork.Deneb and + typeof(forkyBlck).kind < ConsensusFork.Fulu: + if len(forkyBlck.message.body.blob_kzg_commitments) == 0: + self[].enqueueBlock( + MsgSource.gossip, quarantined, Opt.some(BlobSidecars @[]), + Opt.some(DataColumnSidecars @[])) + else: + if (let res = checkBlobOrColumnlessSignature(self[], + forkyBlck); + res.isErr): + warn "Failed to verify signature of unorphaned columnless block", + blck = shortLog(forkyBlck), + error = res.error() + continue if self.blobQuarantine[].hasBlobs(forkyBlck): let blobs = self.blobQuarantine[].popBlobs( forkyBlck.root, forkyBlck) - self[].enqueueBlock(MsgSource.gossip, quarantined, Opt.some(blobs)) + self[].enqueueBlock(MsgSource.gossip, quarantined, Opt.some(blobs), + Opt.none(DataColumnSidecars)) else: discard self.consensusManager.quarantine[].addBlobless( dag.finalizedHead.slot, forkyBlck) + ok blck.value() # Enqueue @@ -856,7 +1002,7 @@ proc storeBlock( proc addBlock*( self: var BlockProcessor, src: MsgSource, blck: ForkedSignedBeaconBlock, - blobs: Opt[BlobSidecars], maybeFinalized = false, + blobs: Opt[BlobSidecars], dataColumns: Opt[DataColumnSidecars], maybeFinalized = false, validationDur = Duration()): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = ## Enqueue a Gossip-validated block for consensus verification # Backpressure: @@ -868,7 +1014,7 @@ proc addBlock*( # - RequestManager (missing ancestor blocks) # - API let resfut = newFuture[Result[void, VerifierError]]("BlockProcessor.addBlock") - enqueueBlock(self, src, blck, blobs, resfut, maybeFinalized, validationDur) + enqueueBlock(self, src, blck, blobs, dataColumns, resfut, maybeFinalized, validationDur) resfut # Event Loop @@ -889,8 +1035,8 @@ proc processBlock( let res = withBlck(entry.blck): await self.storeBlock( - entry.src, wallTime, forkyBlck, entry.blobs, entry.maybeFinalized, - entry.queueTick, entry.validationDur) + entry.src, wallTime, forkyBlck, entry.blobs, entry.columns, + entry.maybeFinalized, entry.queueTick, entry.validationDur) if res.isErr and res.error[1] == ProcessingStatus.notCompleted: # When an execution engine returns an error or fails to respond to a @@ -901,7 +1047,7 @@ proc processBlock( # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.9/sync/optimistic.md#execution-engine-errors await sleepAsync(chronos.seconds(1)) self[].enqueueBlock( - entry.src, entry.blck, entry.blobs, entry.resfut, entry.maybeFinalized, + entry.src, entry.blck, entry.blobs, entry.columns, entry.resfut, entry.maybeFinalized, entry.validationDur) # To ensure backpressure on the sync manager, do not complete these futures. return diff --git a/beacon_chain/gossip_processing/eth2_processor.nim b/beacon_chain/gossip_processing/eth2_processor.nim index 276d7e7c3c..bcd326d36b 100644 --- a/beacon_chain/gossip_processing/eth2_processor.nim +++ b/beacon_chain/gossip_processing/eth2_processor.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2018-2024 Status Research & Development GmbH +# Copyright (c) 2018-2025 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -8,14 +8,14 @@ {.push raises: [].} import - std/tables, + std/[tables, sequtils], chronicles, chronos, metrics, taskpools, - ../spec/[helpers, forks], + ../spec/[helpers, forks, peerdas_helpers], ../consensus_object_pools/[ blob_quarantine, block_clearance, block_quarantine, blockchain_dag, - attestation_pool, light_client_pool, sync_committee_msg_pool, - validator_change_pool], + data_column_quarantine, attestation_pool, light_client_pool, + sync_committee_msg_pool, validator_change_pool], ../validators/validator_pool, ../beacon_clock, "."/[gossip_validation, block_processor, batch_validation], @@ -46,6 +46,10 @@ declareCounter blob_sidecars_received, "Number of valid blobs processed by this node" declareCounter blob_sidecars_dropped, "Number of invalid blobs dropped by this node", labels = ["reason"] +declareCounter data_column_sidecars_received, + "Number of valid data columns processed by this node" +declareCounter data_column_sidecars_dropped, + "Number of invalid data columns dropped by this node", labels = ["reason"] declareCounter beacon_attester_slashings_received, "Number of valid attester slashings processed by this node" declareCounter beacon_attester_slashings_dropped, @@ -89,6 +93,10 @@ declareHistogram beacon_block_delay, declareHistogram blob_sidecar_delay, "Time(s) between slot start and blob sidecar reception", buckets = delayBuckets +declareHistogram data_column_sidecar_delay, + "Time(s) betweeen slot start and data column sidecar reception", + buckets = delayBuckets + type DoppelgangerProtection = object broadcastStartEpoch*: Epoch ##\ @@ -144,6 +152,8 @@ type blobQuarantine*: ref BlobQuarantine + dataColumnQuarantine*: ref DataColumnQuarantine + # Application-provided current time provider (to facilitate testing) getCurrentBeaconTime*: GetBeaconTimeFn @@ -167,6 +177,7 @@ proc new*(T: type Eth2Processor, lightClientPool: ref LightClientPool, quarantine: ref Quarantine, blobQuarantine: ref BlobQuarantine, + dataColumnQuarantine: ref DataColumnQuarantine, rng: ref HmacDrbgContext, getBeaconTime: GetBeaconTimeFn, taskpool: Taskpool @@ -185,6 +196,7 @@ proc new*(T: type Eth2Processor, lightClientPool: lightClientPool, quarantine: quarantine, blobQuarantine: blobQuarantine, + dataColumnQuarantine: dataColumnQuarantine, getCurrentBeaconTime: getBeaconTime, batchCrypto: BatchCrypto.new( rng = rng, @@ -237,7 +249,8 @@ proc processSignedBeaconBlock*( trace "Block validated" let blobs = - when typeof(signedBlock).kind >= ConsensusFork.Deneb: + when typeof(signedBlock).kind >= ConsensusFork.Deneb and + typeof(signedBlock).kind < ConsensusFork.Fulu: if self.blobQuarantine[].hasBlobs(signedBlock): Opt.some(self.blobQuarantine[].popBlobs(signedBlock.root, signedBlock)) else: @@ -247,9 +260,22 @@ proc processSignedBeaconBlock*( else: Opt.none(BlobSidecars) + let columns = + when typeof(signedBlock).kind >= ConsensusFork.Fulu: + if self.dataColumnQuarantine[].hasMissingDataColumns(signedBlock): + Opt.some(self.dataColumnQuarantine[].popDataColumns(signedBlock.root, + signedBlock)) + else: + discard self.quarantine[].addColumnless(self.dag.finalizedHead.slot, + signedBlock) + return v + else: + Opt.none(DataColumnSidecars) + self.blockProcessor[].enqueueBlock( src, ForkedSignedBeaconBlock.init(signedBlock), blobs, + columns, maybeFinalized = maybeFinalized, validationDur = nanoseconds( (self.getCurrentBeaconTime() - wallTime).nanoseconds)) @@ -299,11 +325,13 @@ proc processBlobSidecar*( if (let o = self.quarantine[].popBlobless(block_root); o.isSome): let blobless = o.unsafeGet() withBlck(blobless): - when consensusFork >= ConsensusFork.Deneb: + when consensusFork >= ConsensusFork.Deneb and + consensusFork < ConsensusFork.Fulu: if self.blobQuarantine[].hasBlobs(forkyBlck): self.blockProcessor[].enqueueBlock( MsgSource.gossip, blobless, - Opt.some(self.blobQuarantine[].popBlobs(block_root, forkyBlck))) + Opt.some(self.blobQuarantine[].popBlobs(block_root, forkyBlck)), + Opt.none(DataColumnSidecars)) else: discard self.quarantine[].addBlobless( self.dag.finalizedHead.slot, forkyBlck) @@ -315,6 +343,79 @@ proc processBlobSidecar*( v +proc processDataColumnSidecar*( + self: var Eth2Processor, src: MsgSource, + dataColumnSidecar: DataColumnSidecar, subnet_id: uint64): ValidationRes = + template block_header: untyped = dataColumnSidecar.signed_block_header.message + + let + wallTime = self.getCurrentBeaconTime() + (_, wallSlot) = wallTime.toSlot() + + logScope: + dcs = shortLog(dataColumnSidecar) + wallSlot + + # Potential under/overflows are fine; would just create odd metrics and logs + let delay = wallTime - block_header.slot.start_beacon_time + debug "Data column received", delay + + let v = + self.dag.validateDataColumnSidecar(self.quarantine, self.dataColumnQuarantine, + dataColumnSidecar, wallTime, subnet_id) + + if v.isErr(): + debug "Dropping data column", error = v.error() + data_column_sidecars_dropped.inc(1, [$v.error[0]]) + return v + + debug "Data column validated, putting data column in quarantine" + self.dataColumnQuarantine[].put(newClone(dataColumnSidecar)) + self.dag.db.putDataColumnSidecar(dataColumnSidecar) + + let block_root = hash_tree_root(block_header) + if (let o = self.quarantine[].popColumnless(block_root); o.isSome): + let columnless = o.unsafeGet() + withBlck(columnless): + when consensusFork >= ConsensusFork.Fulu: + if not self.dataColumnQuarantine[].supernode: + if self.dataColumnQuarantine[].hasMissingDataColumns(forkyBlck): + let gathered_columns = + self.dataColumnQuarantine[].gatherDataColumns(forkyBlck.root) + for gdc in gathered_columns: + self.dataColumnQuarantine[].put(newClone(gdc)) + self.blockProcessor[].enqueueBlock( + MsgSource.gossip, columnless, + Opt.none(BlobSidecars), + Opt.some(self.dataColumnQuarantine[].popDataColumns(block_root, + forkyBlck))) + elif self.dataColumnQuarantine[].hasEnoughDataColumns(forkyBlck): + let + columns = self.dataColumnQuarantine[].gatherDataColumns(forkyBlck.root) + if columns.len >= (NUMBER_OF_COLUMNS div 2) and + self.dataColumnQuarantine[].supernode: + let + recovered_cps = recover_cells_and_proofs(columns.mapIt(it[])) + reconstructed_columns = + get_data_column_sidecars(forkyBlck, recovered_cps.get) + for rc in reconstructed_columns: + if rc notin columns.mapIt(it[]): + self.dataColumnQuarantine[].put(newClone(rc)) + self.blockProcessor[].enqueueBlock( + MsgSource.gossip, columnless, + Opt.none(BlobSidecars), + Opt.some(self.dataColumnQuarantine[].popDataColumns(block_root, forkyBlck))) + else: + discard self.quarantine[].addColumnless( + self.dag.finalizedHead.slot, forkyBlck) + else: + raiseAssert "Could not have been added as columnless" + + data_column_sidecars_received.inc() + data_column_sidecar_delay.observe(delay.toFloatSeconds()) + + v + proc setupDoppelgangerDetection*(self: var Eth2Processor, slot: Slot) = # When another client's already running, this is very likely to detect # potential duplicate validators, which can trigger slashing. diff --git a/beacon_chain/gossip_processing/gossip_validation.nim b/beacon_chain/gossip_processing/gossip_validation.nim index 2b15ef75f0..5e1a2baca5 100644 --- a/beacon_chain/gossip_processing/gossip_validation.nim +++ b/beacon_chain/gossip_processing/gossip_validation.nim @@ -559,8 +559,6 @@ proc validateDataColumnSidecar*( # (block_header.slot, block_header.proposer_index, data_column_sidecar.index) # with valid header signature, sidecar inclusion proof, and kzg proof. let block_root = hash_tree_root(block_header) - if dag.getBlockRef(block_root).isSome(): - return errIgnore("DataColumnSidecar: already have block") if dataColumnQuarantine[].hasDataColumn( block_header.slot, block_header.proposer_index, data_column_sidecar.index): return errIgnore("DataColumnSidecar: already have valid data column from same proposer") diff --git a/beacon_chain/networking/eth2_network.nim b/beacon_chain/networking/eth2_network.nim index f521b5d40e..85c7199ce8 100644 --- a/beacon_chain/networking/eth2_network.nim +++ b/beacon_chain/networking/eth2_network.nim @@ -86,8 +86,8 @@ type validTopics: HashSet[string] peerPingerHeartbeatFut: Future[void].Raising([CancelledError]) peerTrimmerHeartbeatFut: Future[void].Raising([CancelledError]) - cfg: RuntimeConfig - getBeaconTime: GetBeaconTimeFn + cfg*: RuntimeConfig + getBeaconTime*: GetBeaconTimeFn quota: TokenBucket ## Global quota mainly for high-bandwidth stuff @@ -853,7 +853,7 @@ template gossipMaxSize(T: untyped): uint32 = fixedPortionSize(T).uint32 elif T is bellatrix.SignedBeaconBlock or T is capella.SignedBeaconBlock or T is deneb.SignedBeaconBlock or T is electra.SignedBeaconBlock or - T is fulu.SignedBeaconBlock: + T is fulu.SignedBeaconBlock or T is fulu.DataColumnSidecar: GOSSIP_MAX_SIZE # TODO https://github.com/status-im/nim-ssz-serialization/issues/20 for # Attestation, AttesterSlashing, and SignedAggregateAndProof, which all @@ -2774,6 +2774,15 @@ proc broadcastBlobSidecar*( node.forkDigestAtEpoch(contextEpoch), subnet_id) node.broadcast(topic, blob) +proc broadcastDataColumnSidecar*( + node: Eth2Node, subnet_id: uint64, data_column: DataColumnSidecar): + Future[SendResult] {.async: (raises: [CancelledError], raw: true).} = + let + contextEpoch = data_column.signed_block_header.message.slot.epoch + topic = getDataColumnSidecarTopic( + node.forkDigestAtEpoch(contextEpoch), subnet_id) + node.broadcast(topic, data_column) + proc broadcastSyncCommitteeMessage*( node: Eth2Node, msg: SyncCommitteeMessage, subcommitteeIdx: SyncSubcommitteeIndex): diff --git a/beacon_chain/nimbus_beacon_node.nim b/beacon_chain/nimbus_beacon_node.nim index c6a97c6d24..37e36c9bc1 100644 --- a/beacon_chain/nimbus_beacon_node.nim +++ b/beacon_chain/nimbus_beacon_node.nim @@ -429,10 +429,18 @@ proc initFullNode( NUMBER_OF_CUSTODY_GROUPS.uint64 else: CUSTODY_REQUIREMENT.uint64 - custody_columns_set = - node.network.nodeId.resolve_column_sets_from_custody_groups( - max(SAMPLES_PER_SLOT.uint64, + dataColumnQuarantine[].supernode = supernode + dataColumnQuarantine[].custody_columns = + node.network.nodeId.resolve_columns_from_custody_groups( + max(SAMPLES_PER_SLOT.uint64, localCustodyGroups)) + + let + custody_columns_set = + dataColumnQuarantine[].custody_columns.toHashSet() + custody_columns_list = + List[ColumnIndex, NUMBER_OF_COLUMNS].init( + dataColumnQuarantine[].custody_columns) consensusManager = ConsensusManager.new( dag, attestationPool, quarantine, node.elManager, ActionTracker.init(node.network.nodeId, config.subscribeAllSubnets), @@ -442,42 +450,70 @@ proc initFullNode( blockProcessor = BlockProcessor.new( config.dumpEnabled, config.dumpDirInvalid, config.dumpDirIncoming, batchVerifier, consensusManager, node.validatorMonitor, - blobQuarantine, getBeaconTime) + blobQuarantine, dataColumnQuarantine, getBeaconTime) blockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, - blobs: Opt[BlobSidecars], maybeFinalized: bool): + blobs: Opt[BlobSidecars], data_columns: Opt[DataColumnSidecars], + maybeFinalized: bool): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError], raw: true).} = # The design with a callback for block verification is unusual compared # to the rest of the application, but fits with the general approach # taken in the sync/request managers - this is an architectural compromise # that should probably be reimagined more holistically in the future. blockProcessor[].addBlock( - MsgSource.gossip, signedBlock, blobs, maybeFinalized = maybeFinalized) + MsgSource.gossip, signedBlock, blobs, data_columns, + maybeFinalized = maybeFinalized) untrustedBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, blobs: Opt[BlobSidecars], - maybeFinalized: bool): Future[Result[void, VerifierError]] {. + data_columns: Opt[DataColumnSidecars], maybeFinalized: bool): + Future[Result[void, VerifierError]] {. async: (raises: [CancelledError], raw: true).} = - clist.untrustedBackfillVerifier(signedBlock, blobs, maybeFinalized) + clist.untrustedBackfillVerifier(signedBlock, blobs, data_columns, maybeFinalized) rmanBlockVerifier = proc(signedBlock: ForkedSignedBeaconBlock, maybeFinalized: bool): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} = withBlck(signedBlock): - when consensusFork >= ConsensusFork.Deneb: + # Keeping Fulu first else >= Deneb means Fulu case never hits + when consensusFork >= ConsensusFork.Fulu: + let + accumulatedDataColumns = dataColumnQuarantine[].gatherDataColumns(forkyBlck.root) + + if accumulatedDataColumns.len == 0: + # no data columns were sent for this post Fulu block, yet + return await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, + Opt.none(BlobSidecars), Opt.none(DataColumnSidecars), + maybeFinalized = maybeFinalized) + elif dataColumnQuarantine[].supernode and + accumulatedDataColumns.len >= (dataColumnQuarantine[].custody_columns.len div 2): + # We have seen 50%+ data columns, we can attempt to add this block + let dataColumns = dataColumnQuarantine[].popDataColumns(forkyBlck.root, forkyBlck) + return await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, + Opt.none(BlobSidecars), Opt.some(dataColumns), + maybeFinalized = maybeFinalized) + + else: + let dataColumns = dataColumnQuarantine[].popDataColumns(forkyBlck.root, forkyBlck) + return await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, + Opt.none(BlobSidecars), Opt.some(dataColumns), + maybeFinalized = maybeFinalized) + elif consensusFork >= ConsensusFork.Deneb and + consensusFork < ConsensusFork.Fulu: if not blobQuarantine[].hasBlobs(forkyBlck): # We don't have all the blobs for this block, so we have # to put it in blobless quarantine. if not quarantine[].addBlobless(dag.finalizedHead.slot, forkyBlck): - err(VerifierError.UnviableFork) + return err(VerifierError.UnviableFork) else: - err(VerifierError.MissingParent) + return err(VerifierError.MissingParent) else: let blobs = blobQuarantine[].popBlobs(forkyBlck.root, forkyBlck) await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, - Opt.some(blobs), + Opt.some(blobs), Opt.none(DataColumnSidecars), maybeFinalized = maybeFinalized) + else: await blockProcessor[].addBlock(MsgSource.gossip, signedBlock, - Opt.none(BlobSidecars), + Opt.none(BlobSidecars), Opt.none(DataColumnSidecars), maybeFinalized = maybeFinalized) rmanBlockLoader = proc( blockRoot: Eth2Digest): Opt[ForkedTrustedSignedBeaconBlock] = @@ -501,15 +537,17 @@ proc initFullNode( config.doppelgangerDetection, blockProcessor, node.validatorMonitor, dag, attestationPool, validatorChangePool, node.attachedValidators, syncCommitteeMsgPool, - lightClientPool, quarantine, blobQuarantine, rng, getBeaconTime, taskpool) + lightClientPool, quarantine, blobQuarantine, dataColumnQuarantine, + rng, getBeaconTime, taskpool) syncManagerFlags = if node.config.longRangeSync != LongRangeSyncMode.Lenient: {SyncManagerFlag.NoGenesisSync} else: {} syncManager = newSyncManager[Peer, PeerId]( - node.network.peerPool, - dag.cfg.DENEB_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, + node.network.peerPool, supernode, custody_columns_set, + custody_columns_list, dag.cfg.DENEB_FORK_EPOCH, + dag.cfg.FULU_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, SyncQueueKind.Forward, getLocalHeadSlot, getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, getFrontfillSlot, isWithinWeakSubjectivityPeriod, @@ -517,9 +555,10 @@ proc initFullNode( shutdownEvent = node.shutdownEvent, flags = syncManagerFlags) backfiller = newSyncManager[Peer, PeerId]( - node.network.peerPool, - dag.cfg.DENEB_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, - SyncQueueKind.Backward, getLocalHeadSlot, + node.network.peerPool, supernode, custody_columns_set, + custody_columns_list, dag.cfg.DENEB_FORK_EPOCH, + dag.cfg.FULU_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, + SyncQueueKind.Forward, getLocalHeadSlot, getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getBackfillSlot, getFrontfillSlot, isWithinWeakSubjectivityPeriod, dag.backfill.slot, blockVerifier, maxHeadAge = 0, @@ -531,9 +570,10 @@ proc initFullNode( else: getLocalWallSlot() untrustedManager = newSyncManager[Peer, PeerId]( - node.network.peerPool, - dag.cfg.DENEB_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, - SyncQueueKind.Backward, getLocalHeadSlot, + node.network.peerPool, supernode, custody_columns_set, + custody_columns_list, dag.cfg.DENEB_FORK_EPOCH, + dag.cfg.FULU_FORK_EPOCH, dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS, + SyncQueueKind.Forward, getLocalHeadSlot, getLocalWallSlot, getFirstSlotAtFinalizedEpoch, getUntrustedBackfillSlot, getUntrustedFrontfillSlot, isWithinWeakSubjectivityPeriod, clistPivotSlot, untrustedBlockVerifier, maxHeadAge = 0, @@ -567,12 +607,6 @@ proc initFullNode( # during peer selection, sync with columns, and so on. That is why, # the rationale of populating it at boot and using it gloabally. - dataColumnQuarantine[].supernode = supernode - dataColumnQuarantine[].custody_columns = - node.network.nodeId.resolve_columns_from_custody_groups( - max(SAMPLES_PER_SLOT.uint64, - localCustodyGroups)) - if node.config.peerdasSupernode: node.network.loadCgcnetMetadataAndEnr(NUMBER_OF_CUSTODY_GROUPS.uint8) else: @@ -601,6 +635,7 @@ proc initFullNode( node.dag = dag node.list = clist node.blobQuarantine = blobQuarantine + node.dataColumnQuarantine = dataColumnQuarantine node.quarantine = quarantine node.attestationPool = attestationPool node.syncCommitteeMsgPool = syncCommitteeMsgPool @@ -1274,6 +1309,12 @@ func getSyncCommitteeSubnets(node: BeaconNode, epoch: Epoch): SyncnetBits = subnets + node.getNextSyncCommitteeSubnets(epoch) +func readCustodyGroupSubnets(node: BeaconNode): uint64 = + if node.config.peerdasSupernode: + NUMBER_OF_CUSTODY_GROUPS.uint64 + else: + CUSTODY_REQUIREMENT.uint64 + proc addAltairMessageHandlers( node: BeaconNode, forkDigest: ForkDigest, slot: Slot) = node.addPhase0MessageHandlers(forkDigest, slot) @@ -1309,7 +1350,15 @@ proc addElectraMessageHandlers( proc addFuluMessageHandlers( node: BeaconNode, forkDigest: ForkDigest, slot: Slot) = - node.addElectraMessageHandlers(forkDigest, slot) + node.addCapellaMessageHandlers(forkDigest, slot) + let + targetSubnets = node.readCustodyGroupSubnets() + custody = node.network.nodeId.get_custody_groups(max(SAMPLES_PER_SLOT.uint64, + targetSubnets.uint64)) + + for i in custody: + let topic = getDataColumnSidecarTopic(forkDigest, i) + node.network.subscribe(topic, basicParams) proc removeAltairMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) = node.removePhase0MessageHandlers(forkDigest) @@ -1335,7 +1384,15 @@ proc removeElectraMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) = node.removeDenebMessageHandlers(forkDigest) proc removeFuluMessageHandlers(node: BeaconNode, forkDigest: ForkDigest) = - node.removeElectraMessageHandlers(forkDigest) + node.removeCapellaMessageHandlers(forkDigest) + let + targetSubnets = node.readCustodyGroupSubnets() + custody = node.network.nodeId.get_custody_groups(max(SAMPLES_PER_SLOT.uint64, + targetSubnets.uint64)) + + for i in custody: + let topic = getDataColumnSidecarTopic(forkDigest, i) + node.network.unsubscribe(topic) proc updateSyncCommitteeTopics(node: BeaconNode, slot: Slot) = template lastSyncUpdate: untyped = @@ -1611,6 +1668,24 @@ proc pruneBlobs(node: BeaconNode, slot: Slot) = count = count + 1 debug "pruned blobs", count, blobPruneEpoch +proc pruneDataColumns(node: BeaconNode, slot: Slot) = + let dataColumnPruneEpoch = (slot.epoch - + node.dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS - 1) + if slot.is_epoch() and dataColumnPruneEpoch >= node.dag.cfg.FULU_FORK_EPOCH: + var blocks: array[SLOTS_PER_EPOCH.int, BlockId] + var count = 0 + let startIndex = node.dag.getBlockRange( + dataColumnPruneEpoch.start_slot, blocks.toOpenArray(0, SLOTS_PER_EPOCH - 1)) + for i in startIndex..= ConsensusFork.Fulu: + # data_column_sidecar_{subnet_id} + for it in 0'u64..= ConsensusFork.Deneb: # blob_sidecar_{subnet_id} # https://github.com/ethereum/consensus-specs/blob/v1.4.0-beta.5/specs/deneb/p2p-interface.md#blob_sidecar_subnet_id @@ -2135,6 +2225,9 @@ proc run(node: BeaconNode) {.raises: [CatchableError].} = node.startLightClient() node.requestManager.start() + if node.network.getBeaconTime().slotOrZero.epoch >= + node.network.cfg.FULU_FORK_EPOCH: + node.requestManager.switchToColumnLoop() node.syncOverseer.start() waitFor node.updateGossipStatus(wallSlot) diff --git a/beacon_chain/spec/beaconstate.nim b/beacon_chain/spec/beaconstate.nim index 06aa862bc0..f28f9408a7 100644 --- a/beacon_chain/spec/beaconstate.nim +++ b/beacon_chain/spec/beaconstate.nim @@ -2286,18 +2286,6 @@ func upgrade_to_fulu*( blob_gas_used: pre.latest_execution_payload_header.blob_gas_used, excess_blob_gas: pre.latest_execution_payload_header.excess_blob_gas) - var max_exit_epoch = FAR_FUTURE_EPOCH - for v in pre.validators: - if v.exit_epoch != FAR_FUTURE_EPOCH: - max_exit_epoch = - if max_exit_epoch == FAR_FUTURE_EPOCH: - v.exit_epoch - else: - max(max_exit_epoch, v.exit_epoch) - if max_exit_epoch == FAR_FUTURE_EPOCH: - max_exit_epoch = get_current_epoch(pre) - let earliest_exit_epoch = max_exit_epoch + 1 - let post = (ref fulu.BeaconState)( # Versioning genesis_time: pre.genesis_time, @@ -2358,54 +2346,19 @@ func upgrade_to_fulu*( historical_summaries: pre.historical_summaries, # [New in Electra:EIP6110] - deposit_requests_start_index: UNSET_DEPOSIT_REQUESTS_START_INDEX, + deposit_requests_start_index: pre.deposit_requests_start_index, # [New in Electra:EIP7251] - deposit_balance_to_consume: 0.Gwei, - exit_balance_to_consume: 0.Gwei, - earliest_exit_epoch: earliest_exit_epoch, - consolidation_balance_to_consume: 0.Gwei, - earliest_consolidation_epoch: - compute_activation_exit_epoch(get_current_epoch(pre)) - - # pending_balance_deposits, pending_partial_withdrawals, and - # pending_consolidations are default empty lists + deposit_balance_to_consume: pre.deposit_balance_to_consume, + exit_balance_to_consume: pre.exit_balance_to_consume, + earliest_exit_epoch: pre.earliest_exit_epoch, + consolidation_balance_to_consume: pre.consolidation_balance_to_consume, + earliest_consolidation_epoch: pre.earliest_consolidation_epoch, + pending_deposits: pre.pending_deposits, + pending_partial_withdrawals: pre.pending_partial_withdrawals, + pending_consolidations: pre.pending_consolidations ) - post.exit_balance_to_consume = - get_activation_exit_churn_limit(cfg, post[], cache) - post.consolidation_balance_to_consume = - get_consolidation_churn_limit(cfg, post[], cache) - - # [New in Electra:EIP7251] - # add validators that are not yet active to pending balance deposits - var pre_activation: seq[(Epoch, uint64)] - for index, validator in post.validators: - if validator.activation_epoch == FAR_FUTURE_EPOCH: - pre_activation.add((validator.activation_eligibility_epoch, index.uint64)) - sort(pre_activation) - - for (_, index) in pre_activation: - let balance = post.balances.item(index) - post.balances[index] = 0.Gwei - let validator = addr post.validators.mitem(index) - validator[].effective_balance = 0.Gwei - validator[].activation_eligibility_epoch = FAR_FUTURE_EPOCH - # Use bls.G2_POINT_AT_INFINITY as a signature field placeholder and - # GENESIS_SLOT to distinguish from a pending deposit request - discard post.pending_deposits.add PendingDeposit( - pubkey: validator[].pubkey, - withdrawal_credentials: validator[].withdrawal_credentials, - amount: balance, - signature: ValidatorSig.infinity, - slot: GENESIS_SLOT) - - # Ensure early adopters of compounding credentials go through the activation - # churn - for index, validator in post.validators: - if has_compounding_withdrawal_credential(validator): - queue_excess_active_balance(post[], index.uint64) - post func latest_block_root*(state: ForkyBeaconState, state_root: Eth2Digest): diff --git a/beacon_chain/spec/peerdas_helpers.nim b/beacon_chain/spec/peerdas_helpers.nim index e47cf32f64..d338077a1c 100644 --- a/beacon_chain/spec/peerdas_helpers.nim +++ b/beacon_chain/spec/peerdas_helpers.nim @@ -10,6 +10,7 @@ # Uncategorized helper functions from the spec import std/[algorithm, sequtils], + chronicles, results, eth/p2p/discoveryv5/[node], kzg4844/[kzg], @@ -85,11 +86,20 @@ func resolve_columns_from_custody_groups*(node_id: NodeId, flattened func resolve_column_sets_from_custody_groups*(node_id: NodeId, - custody_group_count: CustodyIndex): - HashSet[ColumnIndex] = + custody_group_count: CustodyIndex): + HashSet[ColumnIndex] = node_id.resolve_columns_from_custody_groups(custody_group_count).toHashSet() +func resolve_column_list_from_custody_groups*(node_id: NodeId, + custody_group_count: CustodyIndex): + List[ColumnIndex, NUMBER_OF_COLUMNS] = + + let list = + List[ColumnIndex, NUMBER_OF_COLUMNS].init( + node_id.resolve_columns_from_custody_groups(custody_group_count)) + list + # https://github.com/ethereum/consensus-specs/blob/v1.5.0-alpha.10/specs/fulu/das-core.md#compute_matrix proc compute_matrix*(blobs: seq[KzgBlob]): Result[seq[MatrixEntry], cstring] = ## `compute_matrix` helper demonstrates the relationship @@ -146,8 +156,57 @@ proc recover_matrix*(partial_matrix: seq[MatrixEntry], ok(extended_matrix) +proc recover_cells_and_proofs*( + data_columns: seq[DataColumnSidecar]): + Result[seq[CellsAndProofs], cstring] = + + # This helper recovers blobs from the data column sidecars + if not (data_columns.len != 0): + return err("DataColumnSidecar: Length should not be 0") + + let start = Moment.now() + + let + columnCount = data_columns.len + blobCount = data_columns[0].column.len + + for data_column in data_columns: + if not (blobCount == data_column.column.len): + return err ("DataColumns do not have the same length") + + var + recovered_cps = + newSeq[CellsAndProofs](blobCount) + + for blobIdx in 0..= wallEpoch - man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS) +proc checkPeerCustody(man: SyncManager, + peer: Peer): + bool = + # Returns TRUE if the peer custodies atleast, + # ONE of the common custody columns, straight + # away return TRUE if the peer is a supernode. + if man.supernode: + # For a supernode, it is always best/optimistic + # to filter other supernodes, rather than filter + # too many full nodes that have a subset of the + # custody columns + if peer.lookupCgcFromPeer() == + NUMBER_OF_CUSTODY_GROUPS.uint64: + return true + + else: + if peer.lookupCgcFromPeer() == + NUMBER_OF_CUSTODY_GROUPS.uint64: + return true + + elif peer.lookupCgcFromPeer() == + CUSTODY_REQUIREMENT.uint64: + + # Fetch the remote custody count + let remoteCustodyGroupCount = + peer.lookupCgcFromPeer() + + # Extract remote peer's nodeID from peerID + # Fetch custody groups from remote peer + let + remoteNodeId = fetchNodeIdFromPeerId(peer) + remoteCustodyColumns = + remoteNodeId.resolve_column_sets_from_custody_groups( + max(SAMPLES_PER_SLOT.uint64, + remoteCustodyGroupCount)) + + return disjoint(man.custody_columns_set, + remoteCustodyColumns) + + else: + return false + proc shouldGetBlobs[A, B](man: SyncManager[A, B], r: SyncRequest[A]): bool = man.shouldGetBlobs(r.slot) or man.shouldGetBlobs(r.slot + (r.count - 1)) +proc shouldGetDataColumns[A, B](man: SyncManager[A,B], s: Slot): bool = + let + wallEpoch = man.getLocalWallSlot().epoch + epoch = s.epoch() + (epoch >= man.FULU_FORK_EPOCH) and + (wallEpoch < man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS or + epoch >= wallEpoch - man.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS) + +proc shouldGetDataColumns[A, B](man: SyncManager[A, B], r: SyncRequest[A]): bool = + man.shouldGetDataColumns(r.slot) or man.shouldGetDataColumns(r.slot + (r.count - 1)) + proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A, req: SyncRequest[A]): Future[BlobSidecarsRes] {.async: (raises: [CancelledError], raw: true).} = @@ -222,6 +292,24 @@ proc getBlobSidecars[A, B](man: SyncManager[A, B], peer: A, debug "Requesting blobs sidecars from peer", request = req blobSidecarsByRange(peer, req.slot, req.count) +proc getDataColumnSidecars[A, B](man: SyncManager[A, B], + peer: A, + req: SyncRequest): + Future[DataColumnSidecarsRes] + {.async: (raises: [CancelledError], raw: true).} = + mixin getScore, `==` + + logScope: + peer_score = peer.getScore() + peer_speed = peer.netKbps() + sync_ident = man.direction + topics = "syncman" + + doAssert(not(req.isEmpty()), "Request must not be empty!") + debug "Requesting data column sidecars from peer", request = req + dataColumnSidecarsByRange(peer, req.slot, req.count, man.custody_columns_list) + + proc remainingSlots(man: SyncManager): uint64 = let first = man.getFirstSlot() @@ -281,6 +369,43 @@ func checkBlobs(blobs: seq[BlobSidecars]): Result[void, string] = ? blob_sidecar[].verify_blob_sidecar_inclusion_proof() ok() +func groupDataColumns*( + blocks: seq[ref ForkedSignedBeaconBlock], + data_columns: seq[ref DataColumnSidecar] +): Result[seq[DataColumnSidecars], string] = + var + grouped = newSeq[DataColumnSidecars](len(blocks)) + column_cursor = 0 + for block_idx, blck in blocks: + withBlck(blck[]): + when consensusFork >= ConsensusFork.Fulu: + template kzgs: untyped = forkyBlck.message.body.blob_kzg_commitments + if kzgs.len == 0: + continue + # Clients MUST include all data column sidecars of each block from which they include data column sidecars. + # The following data column sidecars, where they exist, MUST be sent in consecutive (slot, index) order. + # https://github.com/ethereum/consensus-specs/blob/v1.5.0-beta.1/specs/fulu/p2p-interface.md#datacolumnsidecarsbyrange-v1 + let header = forkyBlck.toSignedBeaconBlockHeader() + for column_idx in 0..= ConsensusFork.Fulu: + if forkyBlck.message.body.blob_kzg_commitments.len > 0: + hasDataColumns = true + break + hasDataColumns + let blobData = if shouldGetBlobs: let blobs = await man.getBlobSidecars(peer, req) @@ -578,6 +716,54 @@ proc syncStep[A, B]( else: Opt.none(seq[BlobSidecars]) + let dataColumnData = + if shouldGetDataColumns and man.checkPeerCustody(peer): + let data_columns = await man.getDataColumnSidecars(peer, req) + if data_columns.isErr(): + peer.updateScore(PeerScoreNoValues) + man.queue.push(req) + debug "Failed to receive data_columns on request", + request = req, err = data_columns.error + return + let dataColumnData = data_columns.get().asSeq() + debug "Received data columns on request", + data_columns_count = len(dataColumnData), + data_columns_map = getShortMap(req, dataColumnData), + request = req + + if len(dataColumnData) > 0: + let slots = + mapIt(dataColumnData, it[].signed_block_header.message.slot) + checkDataColumnsResponse(req, slots).isOkOr: + peer.updateScore(PeerScoreBadResponse) + man.queue.push(req) + warn "Incorrect data column sequence received", + data_columns_count = len(dataColumnData), + data_columns_map = getShortMap(req, dataColumnData), + request = req, + reason = error + return + let groupedDataColumns = groupDataColumns(blockData, dataColumnData).valueOr: + peer.updateScore(PeerScoreNoValues) + man.queue.push(req) + info "Received data columns sequence is inconsistent", + data_columns_map = getShortMap(req, dataColumnData), + request = req, msg = error + return + + groupedDataColumns.checkDataColumns().isOkOr: + peer.updateScore(PeerScoreBadResponse) + man.queue.push(req) + warn "Received data columns verification failed", + data_columns_count = len(dataColumnData), + data_columns_map = getShortMap(req, dataColumnData), + request = req, + reason = error + return + Opt.some(groupedDataColumns) + else: + Opt.none(seq[DataColumnSidecars]) + if len(blockData) == 0 and man.direction == SyncQueueKind.Backward and req.contains(man.getSafeSlot()): # The sync protocol does not distinguish between: @@ -602,7 +788,9 @@ proc syncStep[A, B]( # TODO descore peers that lie maybeFinalized = lastSlot < peerFinalized - await man.queue.push(req, blockData, blobData, maybeFinalized, proc() = + await man.queue.push( + req, blockData, blobData, + dataColumnData, maybeFinalized, proc() = man.workers[index].status = SyncWorkerStatus.Processing) proc syncWorker[A, B]( diff --git a/beacon_chain/sync/sync_overseer.nim b/beacon_chain/sync/sync_overseer.nim index cbbab92849..23a7f07299 100644 --- a/beacon_chain/sync/sync_overseer.nim +++ b/beacon_chain/sync/sync_overseer.nim @@ -394,13 +394,16 @@ proc initUntrustedSync(overseer: SyncOverseerRef): Future[void] {. let blck = await overseer.getBlock(blockHeader.slot, blockHeader) blobsCount = if blck.blob.isNone(): 0 else: len(blck.blob.get()) + dataColumnsCount = + if blck.dataColumn.isNone(): 0 else: len(blck.dataColumn.get()) notice "Received beacon block", blck = shortLog(blck.blck), - blobs_count = blobsCount + blobs_count = blobsCount, + data_columns_count = dataColumnsCount overseer.statusMsg = Opt.some("storing block") - let res = overseer.clist.addBackfillBlockData(blck.blck, blck.blob) + let res = overseer.clist.addBackfillBlockData(blck.blck, blck.blob, blck.dataColumn) if res.isErr(): warn "Unable to store initial block", reason = res.error return @@ -408,7 +411,8 @@ proc initUntrustedSync(overseer: SyncOverseerRef): Future[void] {. overseer.statusMsg = Opt.none(string) notice "Initial block being stored", - blck = shortLog(blck.blck), blobs_count = blobsCount + blck = shortLog(blck.blck), blobs_count = blobsCount, + data_columns_count = dataColumnsCount proc startBackfillTask(overseer: SyncOverseerRef): Future[void] {. async: (raises: []).} = diff --git a/beacon_chain/sync/sync_queue.nim b/beacon_chain/sync/sync_queue.nim index 75840c4bfe..36be5d40a3 100644 --- a/beacon_chain/sync/sync_queue.nim +++ b/beacon_chain/sync/sync_queue.nim @@ -27,7 +27,9 @@ type GetBoolCallback* = proc(): bool {.gcsafe, raises: [].} ProcessingCallback* = proc() {.gcsafe, raises: [].} BlockVerifier* = proc(signedBlock: ForkedSignedBeaconBlock, - blobs: Opt[BlobSidecars], maybeFinalized: bool): + blobs: Opt[BlobSidecars], + data_columns: Opt[DataColumnSidecars], + maybeFinalized: bool): Future[Result[void, VerifierError]] {.async: (raises: [CancelledError]).} SyncQueueKind* {.pure.} = enum @@ -44,6 +46,7 @@ type request*: SyncRequest[T] data*: seq[ref ForkedSignedBeaconBlock] blobs*: Opt[seq[BlobSidecars]] + dataColumns*: Opt[seq[DataColumnSidecars]] GapItem*[T] = object start*: Slot @@ -132,6 +135,27 @@ proc getShortMap*[T](req: SyncRequest[T], res.add('|') res +proc getShortMap*[T](req: SyncRequest[T], + data: openArray[ref DataColumnSidecar]): string = + ## Returns all slot numbers in ``data`` as placement map. + var res = newStringOfCap(req.count * MAX_BLOBS_PER_BLOCK) + var cur : uint64 = 0 + for slot in req.slot..= lenu64(data): + res.add('|') + continue + if slot == data[cur].signed_block_header.message.slot: + for k in cur..= lenu64(data) or slot != data[k].signed_block_header.message.slot: + res.add('|') + break + else: + inc(cur) + res.add('x') + else: + res.add('|') + res + proc contains*[T](req: SyncRequest[T], slot: Slot): bool {.inline.} = slot >= req.slot and slot < req.slot + req.count @@ -200,6 +224,36 @@ proc checkBlobsResponse*[T](req: SyncRequest[T], ok() +proc checkDataColumnsResponse*[T](req: SyncRequest[T], + data: openArray[Slot]): + Result[void, cstring] = + if data.len == 0: + # Impossible to verify empty response + return ok() + + if lenu64(data) > (req.count * NUMBER_OF_COLUMNS): + # Number of data columns in response should be less or equal to + # number of requested (blocks * MAX_BLOCKS_PER_BLOCK_ELECTRA). + return err("Too many data columns received") + + var + pSlot = data[0] + counter = 0'u64 + for slot in data: + if (slot < req.slot) or (slot >= req.slot + req.count): + return err("Some of the data columns are not in requested range") + if slot < pSlot: + return err("incorrect order") + if slot == pSlot: + inc counter + if counter > MAX_BLOBS_PER_BLOCK_ELECTRA: + return err("Number of data columns in the block exceeds the limit") + else: + counter = 1'u64 + pSlot = slot + + ok() + proc init[T](t1: typedesc[SyncRequest], kind: SyncQueueKind, start: Slot, finish: Slot, t2: typedesc[T]): SyncRequest[T] = let count = finish - start + 1'u64 @@ -579,15 +633,24 @@ func getOpt(blobs: Opt[seq[BlobSidecars]], i: int): Opt[BlobSidecars] = else: Opt.none(BlobSidecars) +# This belongs inside the blocks iterator below, but can't be there due to +# https://github.com/nim-lang/Nim/issues/21242 +func getOpt(data_columns: Opt[seq[DataColumnSidecars]], i: int): Opt[DataColumnSidecars] = + if data_columns.isSome: + Opt.some(data_columns.get()[i]) + else: + Opt.none DataColumnSidecars + iterator blocks[T](sq: SyncQueue[T], - sr: SyncResult[T]): (ref ForkedSignedBeaconBlock, Opt[BlobSidecars]) = + sr: SyncResult[T]): (ref ForkedSignedBeaconBlock, Opt[BlobSidecars], + Opt[DataColumnSidecars]) = case sq.kind of SyncQueueKind.Forward: for i in countup(0, len(sr.data) - 1): - yield (sr.data[i], sr.blobs.getOpt(i)) + yield (sr.data[i], sr.blobs.getOpt(i), sr.dataColumns.getOpt(i)) of SyncQueueKind.Backward: for i in countdown(len(sr.data) - 1, 0): - yield (sr.data[i], sr.blobs.getOpt(i)) + yield (sr.data[i], sr.blobs.getOpt(i), sr.dataColumns.getOpt(i)) proc advanceOutput*[T](sq: SyncQueue[T], number: uint64) = case sq.kind @@ -642,6 +705,7 @@ func numAlreadyKnownSlots[T](sq: SyncQueue[T], sr: SyncRequest[T]): uint64 = proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], data: seq[ref ForkedSignedBeaconBlock], blobs: Opt[seq[BlobSidecars]], + dataColumns: Opt[seq[DataColumnSidecars]], maybeFinalized: bool = false, processingCb: ProcessingCallback = nil) {.async: (raises: [CancelledError]).} = logScope: @@ -669,7 +733,8 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], # SyncQueue reset happens. We are exiting to wake up sync-worker. return else: - let syncres = SyncResult[T](request: sr, data: data, blobs: blobs) + let syncres = SyncResult[T](request: sr, data: data, blobs: blobs, + dataColumns: dataColumns) sq.readyQueue.push(syncres) break @@ -719,8 +784,8 @@ proc push*[T](sq: SyncQueue[T], sr: SyncRequest[T], res: Result[void, VerifierError] var i=0 - for blk, blb in sq.blocks(item): - res = await sq.blockVerifier(blk[], blb, maybeFinalized) + for blk, blb, cols in sq.blocks(item): + res = await sq.blockVerifier(blk[], blb, cols, maybeFinalized) inc(i) if res.isOk(): diff --git a/beacon_chain/validators/message_router.nim b/beacon_chain/validators/message_router.nim index 0965453b9f..c1d1ddcfd1 100644 --- a/beacon_chain/validators/message_router.nim +++ b/beacon_chain/validators/message_router.nim @@ -1,5 +1,5 @@ # beacon_chain -# Copyright (c) 2018-2024 Status Research & Development GmbH +# Copyright (c) 2018-2025 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -12,11 +12,13 @@ import chronicles, metrics, ../spec/network, + ../spec/peerdas_helpers, ../consensus_object_pools/spec_cache, ../gossip_processing/eth2_processor, ../networking/eth2_network, ./activity_metrics, - ../spec/datatypes/deneb + ../spec/datatypes/[deneb, fulu] + from ../spec/state_transition_block import validate_blobs export eth2_processor, eth2_network @@ -89,7 +91,8 @@ proc routeSignedBeaconBlock*( ## Validate and broadcast beacon block, then add it to the block database ## Returns the new Head when block is added successfully to dag, none when ## block passes validation but is not added, and error otherwise - let wallTime = router[].getCurrentBeaconTime() + let + wallTime = router[].getCurrentBeaconTime() block: let vindex = ValidatorIndex(blck.message.proposer_index) @@ -112,6 +115,30 @@ proc routeSignedBeaconBlock*( signature = shortLog(blck.signature), error = res.error() return err($(res.error()[1])) + # # May not be required as we are already + # # KZG verifying the blobs once + # when typeof(blck).kind >= ConsensusFork.Fulu: + # if blobsOpt.isSome: + # let + # dataColumns = + # newClone get_data_column_sidecars(blck, + # blobsOpt.get.mapIt( + # KzgBlob(bytes: it.blob))) + # let kzgCommits = + # blck.message.body.blob_kzg_commitments.asSeq + # if dataColumns[].get().len > 0 and kzgCommits.len > 0: + # for i in 0..= ConsensusFork.Deneb: if blobsOpt.isSome: let blobs = blobsOpt.get() @@ -153,25 +180,71 @@ proc routeSignedBeaconBlock*( signature = shortLog(blck.signature), error = res.error() var blobRefs = Opt.none(BlobSidecars) - if blobsOpt.isSome(): - let blobs = blobsOpt.get() - var workers = newSeq[Future[SendResult]](blobs.len) - for i in 0..= ConsensusFork.Fulu: + let blobs = blobsOpt.get + if blobsOpt.isSome() and blobs.len != 0: + let dataColumnsRes = + newClone get_data_column_sidecars(blck, blobs.mapIt(KzgBlob(bytes: it.blob))) + if not dataColumnsRes[].isOk: + debug "Issue with extracting data columns from blob bundle" + let dataColumns = dataColumnsRes[].get() + var das_workers = + newSeq[Future[SendResult]](len(dataColumnsRes[].get())) + for i in 0..= ConsensusFork.Deneb and + typeof(blck).kind < ConsensusFork.Fulu: + if blobsOpt.isSome(): + let blobs = blobsOpt.get() + var workers = newSeq[Future[SendResult]](blobs.len) + for i in 0..= ConsensusFork.Fulu: + template kzgs: untyped = forkyBlck.message.body.blob_kzg_commitments + for i, slot in slots: + if slot == forkyBlck.message.slot: + doAssert kzgs.add default(KzgCommitment) + if kzgs.len > 0: + forkyBlck.root = hash_tree_root(forkyBlck.message) + var + kzg_proofs: KzgProofs + blobs: Blobs + for _ in kzgs: + doAssert kzg_proofs.add default(KzgProof) + doAssert blobs.add default(Blob) + let bsidecars = forkyBlck.create_blob_sidecars(kzg_proofs, blobs) + let dcsidecars = + forkyBlck.get_data_column_sidecars(bsidecars.mapIt(KzgBlob(bytes: it.blob))) + var sidecarIdx = 0 + for i, slot in slots: + if slot == forkyBlck.message.slot: + res[i] = newClone dcsidecars.get[sidecarIdx] + inc sidecarIdx + res + func getSlice(chain: openArray[ref ForkedSignedBeaconBlock], startSlot: Slot, request: SyncRequest[SomeTPeer]): seq[ref ForkedSignedBeaconBlock] = let @@ -354,9 +396,9 @@ suite "SyncManager test suite": if request.isEmpty(): break await queue.push(request, getSlice(chain, start, request), - Opt.none(seq[BlobSidecars])) + Opt.none(seq[BlobSidecars]), + Opt.none(seq[DataColumnSidecars])) await validatorFut.cancelAndWait() - waitFor runSmokeTest() case kkind of SyncQueueKind.Forward: @@ -429,7 +471,8 @@ suite "SyncManager test suite": var r13 = queue.pop(finishSlot, p3) var f13 = queue.push(r13, chain.getSlice(startSlot, r13), - Opt.none(seq[BlobSidecars])) + Opt.none(seq[BlobSidecars]), + Opt.none(seq[DataColumnSidecars])) await sleepAsync(100.milliseconds) check: f13.finished == false @@ -438,7 +481,8 @@ suite "SyncManager test suite": of SyncQueueKind.Backward: counter == int(finishSlot) var f11 = queue.push(r11, chain.getSlice(startSlot, r11), - Opt.none(seq[BlobSidecars])) + Opt.none(seq[BlobSidecars]), + Opt.none(seq[DataColumnSidecars])) await sleepAsync(100.milliseconds) check: case kkind @@ -448,7 +492,8 @@ suite "SyncManager test suite": f13.finished == false var f12 = queue.push(r12, chain.getSlice(startSlot, r12), - Opt.none(seq[BlobSidecars])) + Opt.none(seq[BlobSidecars]), + Opt.none(seq[DataColumnSidecars])) await allFutures(f11, f12, f13) check: f12.finished == true and f12.failed == false @@ -551,7 +596,8 @@ suite "SyncManager test suite": check response[0][].slot >= getFowardSafeSlotCb() else: check response[^1][].slot <= getBackwardSafeSlotCb() - await queue.push(request, response, Opt.none(seq[BlobSidecars])) + await queue.push(request, response, Opt.none(seq[BlobSidecars]), + Opt.none(seq[DataColumnSidecars])) await validatorFut.cancelAndWait() waitFor runTest() @@ -634,7 +680,8 @@ suite "SyncManager test suite": # Handle request 1. Should be re-enqueued as it simulates `Invalid`. let response1 = getSlice(chain, start, request1) - await queue.push(request1, response1, Opt.none(seq[BlobSidecars])) + await queue.push(request1, response1, Opt.none(seq[BlobSidecars]), + Opt.none(seq[DataColumnSidecars])) check debtLen(queue) == request2.count + request1.count # Request 1 should be discarded as it is no longer relevant. @@ -646,7 +693,8 @@ suite "SyncManager test suite": # Handle request 3. Should be re-enqueued as it simulates `Invalid`. let response3 = getSlice(chain, start, request3) - await queue.push(request3, response3, Opt.none(seq[BlobSidecars])) + await queue.push(request3, response3, Opt.none(seq[BlobSidecars]), + Opt.none(seq[DataColumnSidecars])) check debtLen(queue) == request3.count # Request 2 should be re-issued. @@ -660,7 +708,8 @@ suite "SyncManager test suite": # Handle request 4. Should be re-enqueued as it simulates `Invalid`. let response4 = getSlice(chain, start, request4) - await queue.push(request4, response4, Opt.none(seq[BlobSidecars])) + await queue.push(request4, response4, Opt.none(seq[BlobSidecars]), + Opt.none(seq[DataColumnSidecars])) check debtLen(queue) == request4.count # Advance `safeSlot` out of band. @@ -777,14 +826,16 @@ suite "SyncManager test suite": var r14 = queue.pop(finishSlot, p4) var f14 = queue.push(r14, chain.getSlice(startSlot, r14), - Opt.none(seq[BlobSidecars])) + Opt.none(seq[BlobSidecars]), + Opt.none(seq[DataColumnSidecars])) await sleepAsync(100.milliseconds) check: f14.finished == false counter == int(startSlot) var f12 = queue.push(r12, chain.getSlice(startSlot, r12), - Opt.none(seq[BlobSidecars])) + Opt.none(seq[BlobSidecars]), + Opt.none(seq[DataColumnSidecars])) await sleepAsync(100.milliseconds) check: counter == int(startSlot) @@ -792,7 +843,8 @@ suite "SyncManager test suite": f14.finished == false var f11 = queue.push(r11, chain.getSlice(startSlot, r11), - Opt.none(seq[BlobSidecars])) + Opt.none(seq[BlobSidecars]), + Opt.none(seq[DataColumnSidecars])) await allFutures(f11, f12) check: counter == int(startSlot + chunkSize + chunkSize) @@ -804,7 +856,8 @@ suite "SyncManager test suite": withBlck(missingSlice[0][]): forkyBlck.message.proposer_index = 0xDEADBEAF'u64 var f13 = queue.push(r13, missingSlice, - Opt.none(seq[BlobSidecars])) + Opt.none(seq[BlobSidecars]), + Opt.none(seq[DataColumnSidecars])) await allFutures(f13, f14) check: f11.finished == true and f11.failed == false @@ -826,17 +879,20 @@ suite "SyncManager test suite": check r18.isEmpty() == true var f17 = queue.push(r17, chain.getSlice(startSlot, r17), - Opt.none(seq[BlobSidecars])) + Opt.none(seq[BlobSidecars]), + Opt.none(seq[DataColumnSidecars])) await sleepAsync(100.milliseconds) check f17.finished == false var f16 = queue.push(r16, chain.getSlice(startSlot, r16), - Opt.none(seq[BlobSidecars])) + Opt.none(seq[BlobSidecars]), + Opt.none(seq[DataColumnSidecars])) await sleepAsync(100.milliseconds) check f16.finished == false var f15 = queue.push(r15, chain.getSlice(startSlot, r15), - Opt.none(seq[BlobSidecars])) + Opt.none(seq[BlobSidecars]), + Opt.none(seq[DataColumnSidecars])) await allFutures(f15, f16, f17) check: f15.finished == true and f15.failed == false @@ -883,7 +939,8 @@ suite "SyncManager test suite": # Push a single request that will fail with all blocks being unviable var f11 = queue.push(r11, chain.getSlice(startSlot, r11), - Opt.none(seq[BlobSidecars])) + Opt.none(seq[BlobSidecars]), + Opt.none(seq[DataColumnSidecars])) discard await f11.withTimeout(1.seconds) check: @@ -949,14 +1006,16 @@ suite "SyncManager test suite": var r14 = queue.pop(finishSlot, p4) var f14 = queue.push(r14, chain.getSlice(startSlot, r14), - Opt.none(seq[BlobSidecars])) + Opt.none(seq[BlobSidecars]), + Opt.none(seq[DataColumnSidecars])) await sleepAsync(100.milliseconds) check: f14.finished == false counter == int(finishSlot) var f12 = queue.push(r12, chain.getSlice(startSlot, r12), - Opt.none(seq[BlobSidecars])) + Opt.none(seq[BlobSidecars]), + Opt.none(seq[DataColumnSidecars])) await sleepAsync(100.milliseconds) check: counter == int(finishSlot) @@ -964,7 +1023,8 @@ suite "SyncManager test suite": f14.finished == false var f11 = queue.push(r11, chain.getSlice(startSlot, r11), - Opt.none(seq[BlobSidecars])) + Opt.none(seq[BlobSidecars]), + Opt.none(seq[DataColumnSidecars])) await allFutures(f11, f12) check: counter == int(finishSlot - chunkSize - chunkSize) @@ -975,7 +1035,8 @@ suite "SyncManager test suite": var missingSlice = chain.getSlice(startSlot, r13) withBlck(missingSlice[0][]): forkyBlck.message.proposer_index = 0xDEADBEAF'u64 - var f13 = queue.push(r13, missingSlice, Opt.none(seq[BlobSidecars])) + var f13 = queue.push(r13, missingSlice, Opt.none(seq[BlobSidecars]), + Opt.none(seq[DataColumnSidecars])) await allFutures(f13, f14) check: f11.finished == true and f11.failed == false @@ -993,12 +1054,14 @@ suite "SyncManager test suite": check r17.isEmpty() == true var f16 = queue.push(r16, chain.getSlice(startSlot, r16), - Opt.none(seq[BlobSidecars])) + Opt.none(seq[BlobSidecars]), + Opt.none(seq[DataColumnSidecars])) await sleepAsync(100.milliseconds) check f16.finished == false var f15 = queue.push(r15, chain.getSlice(startSlot, r15), - Opt.none(seq[BlobSidecars])) + Opt.none(seq[BlobSidecars]), + Opt.none(seq[DataColumnSidecars])) await allFutures(f15, f16) check: f15.finished == true and f15.failed == false