Skip to content

Commit

Permalink
Unified writer interface for Data Availability providers
Browse files Browse the repository at this point in the history
  • Loading branch information
ganeshvanahalli committed Feb 23, 2024
1 parent 58e4b50 commit 3d2d427
Show file tree
Hide file tree
Showing 46 changed files with 512 additions and 451 deletions.
30 changes: 11 additions & 19 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import (
"github.com/offchainlabs/nitro/arbnode/redislock"
"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/cmd/chaininfo"
"github.com/offchainlabs/nitro/cmd/genericconf"
"github.com/offchainlabs/nitro/das"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
"github.com/offchainlabs/nitro/util"
Expand Down Expand Up @@ -87,7 +87,7 @@ type BatchPoster struct {
bridgeAddr common.Address
gasRefunderAddr common.Address
building *buildingBatch
daWriter das.DataAvailabilityServiceWriter
dapWriter daprovider.Writer
dataPoster *dataposter.DataPoster
redisLock *redislock.Simple
messagesPerBatch *arbmath.MovingAverage[uint64]
Expand Down Expand Up @@ -117,7 +117,7 @@ const (

type BatchPosterConfig struct {
Enable bool `koanf:"enable"`
DisableDasFallbackStoreDataOnChain bool `koanf:"disable-das-fallback-store-data-on-chain" reload:"hot"`
DisableDapFallbackStoreDataOnChain bool `koanf:"disable-dap-fallback-store-data-on-chain" reload:"hot"`
// Max batch size.
MaxSize int `koanf:"max-size" reload:"hot"`
// Maximum 4844 blob enabled batch size.
Expand Down Expand Up @@ -176,7 +176,7 @@ type BatchPosterConfigFetcher func() *BatchPosterConfig

func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {
f.Bool(prefix+".enable", DefaultBatchPosterConfig.Enable, "enable posting batches to l1")
f.Bool(prefix+".disable-das-fallback-store-data-on-chain", DefaultBatchPosterConfig.DisableDasFallbackStoreDataOnChain, "If unable to batch to DAS, disable fallback storing data on chain")
f.Bool(prefix+".disable-dap-fallback-store-data-on-chain", DefaultBatchPosterConfig.DisableDapFallbackStoreDataOnChain, "If unable to batch to DA provider, disable fallback storing data on chain")
f.Int(prefix+".max-size", DefaultBatchPosterConfig.MaxSize, "maximum batch size")
f.Int(prefix+".max-4844-batch-size", DefaultBatchPosterConfig.Max4844BatchSize, "maximum 4844 blob enabled batch size")
f.Duration(prefix+".max-delay", DefaultBatchPosterConfig.MaxDelay, "maximum batch posting delay")
Expand All @@ -200,7 +200,7 @@ func BatchPosterConfigAddOptions(prefix string, f *pflag.FlagSet) {

var DefaultBatchPosterConfig = BatchPosterConfig{
Enable: false,
DisableDasFallbackStoreDataOnChain: false,
DisableDapFallbackStoreDataOnChain: false,
// This default is overridden for L3 chains in applyChainParameters in cmd/nitro/nitro.go
MaxSize: 100000,
// TODO: is 1000 bytes an appropriate margin for error vs blob space efficiency?
Expand Down Expand Up @@ -262,7 +262,7 @@ type BatchPosterOpts struct {
Config BatchPosterConfigFetcher
DeployInfo *chaininfo.RollupAddresses
TransactOpts *bind.TransactOpts
DAWriter das.DataAvailabilityServiceWriter
DAPWriter daprovider.Writer
ParentChainID *big.Int
}

Expand Down Expand Up @@ -308,7 +308,7 @@ func NewBatchPoster(ctx context.Context, opts *BatchPosterOpts) (*BatchPoster, e
seqInboxAddr: opts.DeployInfo.SequencerInbox,
gasRefunderAddr: opts.Config().gasRefunder,
bridgeAddr: opts.DeployInfo.Bridge,
daWriter: opts.DAWriter,
dapWriter: opts.DAPWriter,
redisLock: redisLock,
}
b.messagesPerBatch, err = arbmath.NewMovingAverage[uint64](20)
Expand Down Expand Up @@ -787,7 +787,7 @@ func (s *batchSegments) CloseAndGetBytes() ([]byte, error) {
}
compressedBytes := s.compressedBuffer.Bytes()
fullMsg := make([]byte, 1, len(compressedBytes)+1)
fullMsg[0] = arbstate.BrotliMessageHeaderByte
fullMsg[0] = daprovider.BrotliMessageHeaderByte
fullMsg = append(fullMsg, compressedBytes...)
return fullMsg, nil
}
Expand Down Expand Up @@ -1131,7 +1131,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
return false, nil
}

if b.daWriter != nil {
if b.dapWriter != nil {
if !b.redisLock.AttemptLock(ctx) {
return false, errAttemptLockFailed
}
Expand All @@ -1143,17 +1143,9 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
if nonce != gotNonce || !bytes.Equal(batchPositionBytes, gotMeta) {
return false, fmt.Errorf("%w: nonce changed from %d to %d while creating batch", storage.ErrStorageRace, nonce, gotNonce)
}

cert, err := b.daWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) // b.daWriter will append signature if enabled
if errors.Is(err, das.BatchToDasFailed) {
if config.DisableDasFallbackStoreDataOnChain {
return false, errors.New("unable to batch to DAS and fallback storing data on chain is disabled")
}
log.Warn("Falling back to storing data on chain", "err", err)
} else if err != nil {
sequencerMsg, err = b.dapWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}, config.DisableDapFallbackStoreDataOnChain)
if err != nil {
return false, err
} else {
sequencerMsg = das.Serialize(cert)
}
}

Expand Down
15 changes: 8 additions & 7 deletions arbnode/inbox_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/offchainlabs/nitro/arbos/arbostypes"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcaster"
m "github.com/offchainlabs/nitro/broadcaster/message"
Expand All @@ -37,14 +38,14 @@ type InboxTracker struct {
txStreamer *TransactionStreamer
mutex sync.Mutex
validator *staker.BlockValidator
das arbstate.DataAvailabilityReader
blobReader arbstate.BlobReader
das daprovider.DASReader
blobReader daprovider.BlobReader

batchMetaMutex sync.Mutex
batchMeta *containers.LruCache[uint64, BatchMetadata]
}

func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das arbstate.DataAvailabilityReader, blobReader arbstate.BlobReader) (*InboxTracker, error) {
func NewInboxTracker(db ethdb.Database, txStreamer *TransactionStreamer, das daprovider.DASReader, blobReader daprovider.BlobReader) (*InboxTracker, error) {
// We support a nil txStreamer for the pruning code
if txStreamer != nil && txStreamer.chainConfig.ArbitrumChainParams.DataAvailabilityCommittee && das == nil {
return nil, errors.New("data availability service required but unconfigured")
Expand Down Expand Up @@ -606,14 +607,14 @@ func (t *InboxTracker) AddSequencerBatches(ctx context.Context, client arbutil.L
ctx: ctx,
client: client,
}
var daProviders []arbstate.DataAvailabilityProvider
var daProviders []daprovider.Reader
if t.das != nil {
daProviders = append(daProviders, arbstate.NewDAProviderDAS(t.das))
daProviders = append(daProviders, daprovider.NewReaderForDAS(t.das))
}
if t.blobReader != nil {
daProviders = append(daProviders, arbstate.NewDAProviderBlobReader(t.blobReader))
daProviders = append(daProviders, daprovider.NewReaderForBlobReader(t.blobReader))
}
multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, daProviders, arbstate.KeysetValidate)
multiplexer := arbstate.NewInboxMultiplexer(backend, prevbatchmeta.DelayedMessageCount, daProviders, daprovider.KeysetValidate)
batchMessageCounts := make(map[uint64]arbutil.MessageIndex)
currentpos := prevbatchmeta.MessageCount + 1
for {
Expand Down
14 changes: 9 additions & 5 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/offchainlabs/nitro/arbnode/dataposter"
"github.com/offchainlabs/nitro/arbnode/dataposter/storage"
"github.com/offchainlabs/nitro/arbnode/resourcemanager"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/broadcastclient"
"github.com/offchainlabs/nitro/broadcastclients"
Expand Down Expand Up @@ -251,7 +251,7 @@ type Node struct {
L1Reader *headerreader.HeaderReader
TxStreamer *TransactionStreamer
DeployInfo *chaininfo.RollupAddresses
BlobReader arbstate.BlobReader
BlobReader daprovider.BlobReader
InboxReader *InboxReader
InboxTracker *InboxTracker
DelayedSequencer *DelayedSequencer
Expand Down Expand Up @@ -370,7 +370,7 @@ func createNodeImpl(
dataSigner signature.DataSignerFunc,
fatalErrChan chan error,
parentChainID *big.Int,
blobReader arbstate.BlobReader,
blobReader daprovider.BlobReader,
) (*Node, error) {
config := configFetcher.Get()

Expand Down Expand Up @@ -661,6 +661,10 @@ func createNodeImpl(
if txOptsBatchPoster == nil && config.BatchPoster.DataPoster.ExternalSigner.URL == "" {
return nil, errors.New("batchposter, but no TxOpts")
}
var dapWriter daprovider.Writer
if daWriter != nil {
dapWriter = daprovider.NewWriterForDAS(daWriter)
}
batchPoster, err = NewBatchPoster(ctx, &BatchPosterOpts{
DataPosterDB: rawdb.NewTable(arbDb, storage.BatchPosterPrefix),
L1Reader: l1Reader,
Expand All @@ -671,7 +675,7 @@ func createNodeImpl(
Config: func() *BatchPosterConfig { return &configFetcher.Get().BatchPoster },
DeployInfo: deployInfo,
TransactOpts: txOptsBatchPoster,
DAWriter: daWriter,
DAPWriter: dapWriter,
ParentChainID: parentChainID,
})
if err != nil {
Expand Down Expand Up @@ -732,7 +736,7 @@ func CreateNode(
dataSigner signature.DataSignerFunc,
fatalErrChan chan error,
parentChainID *big.Int,
blobReader arbstate.BlobReader,
blobReader daprovider.BlobReader,
) (*Node, error) {
currentNode, err := createNodeImpl(ctx, stack, exec, arbDb, configFetcher, l2Config, l1client, deployInfo, txOptsValidator, txOptsBatchPoster, dataSigner, fatalErrChan, parentChainID, blobReader)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions arbnode/sequencer_inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/offchainlabs/nitro/arbstate"
"github.com/offchainlabs/nitro/arbstate/daprovider"
"github.com/offchainlabs/nitro/arbutil"

"github.com/offchainlabs/nitro/solgen/go/bridgegen"
Expand Down Expand Up @@ -159,7 +159,7 @@ func (m *SequencerInboxBatch) getSequencerData(ctx context.Context, client arbut
if len(tx.BlobHashes()) == 0 {
return nil, fmt.Errorf("blob batch transaction %v has no blobs", tx.Hash())
}
data := []byte{arbstate.BlobHashesHeaderFlag}
data := []byte{daprovider.BlobHashesHeaderFlag}
for _, h := range tx.BlobHashes() {
data = append(data, h[:]...)
}
Expand Down
96 changes: 96 additions & 0 deletions arbstate/daprovider/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2021-2022, Offchain Labs, Inc.
// For license information, see https://github.com/nitro/blob/master/LICENSE

package daprovider

import (
"context"
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/offchainlabs/nitro/arbutil"
"github.com/offchainlabs/nitro/util/blobs"
)

type Reader interface {
// IsValidHeaderByte returns true if the given headerByte has bits corresponding to the DA provider
IsValidHeaderByte(headerByte byte) bool

// RecoverPayloadFromBatch fetches the underlying payload from the DA provider given the batch header information
RecoverPayloadFromBatch(
ctx context.Context,
batchNum uint64,
batchBlockHash common.Hash,
sequencerMsg []byte,
preimages map[arbutil.PreimageType]map[common.Hash][]byte,
keysetValidationMode KeysetValidationMode,
) ([]byte, error)
}

// NewReaderForDAS is generally meant to be only used by nitro.
// DA Providers should implement methods in the Reader interface independently
func NewReaderForDAS(dasReader DASReader) *readerForDAS {
return &readerForDAS{dasReader: dasReader}
}

type readerForDAS struct {
dasReader DASReader
}

func (d *readerForDAS) IsValidHeaderByte(headerByte byte) bool {
return IsDASMessageHeaderByte(headerByte)
}

func (d *readerForDAS) RecoverPayloadFromBatch(
ctx context.Context,
batchNum uint64,
batchBlockHash common.Hash,
sequencerMsg []byte,
preimages map[arbutil.PreimageType]map[common.Hash][]byte,
keysetValidationMode KeysetValidationMode,
) ([]byte, error) {
return RecoverPayloadFromDasBatch(ctx, batchNum, sequencerMsg, d.dasReader, preimages, keysetValidationMode)
}

// NewReaderForBlobReader is generally meant to be only used by nitro.
// DA Providers should implement methods in the Reader interface independently
func NewReaderForBlobReader(blobReader BlobReader) *readerForBlobReader {
return &readerForBlobReader{blobReader: blobReader}
}

type readerForBlobReader struct {
blobReader BlobReader
}

func (b *readerForBlobReader) IsValidHeaderByte(headerByte byte) bool {
return IsBlobHashesHeaderByte(headerByte)
}

func (b *readerForBlobReader) RecoverPayloadFromBatch(
ctx context.Context,
batchNum uint64,
batchBlockHash common.Hash,
sequencerMsg []byte,
preimages map[arbutil.PreimageType]map[common.Hash][]byte,
keysetValidationMode KeysetValidationMode,
) ([]byte, error) {
blobHashes := sequencerMsg[41:]
if len(blobHashes)%len(common.Hash{}) != 0 {
return nil, fmt.Errorf("blob batch data is not a list of hashes as expected")
}
versionedHashes := make([]common.Hash, len(blobHashes)/len(common.Hash{}))
for i := 0; i*32 < len(blobHashes); i += 1 {
copy(versionedHashes[i][:], blobHashes[i*32:(i+1)*32])
}
kzgBlobs, err := b.blobReader.GetBlobs(ctx, batchBlockHash, versionedHashes)
if err != nil {
return nil, fmt.Errorf("failed to get blobs: %w", err)
}
payload, err := blobs.DecodeBlobs(kzgBlobs)
if err != nil {
log.Warn("Failed to decode blobs", "batchBlockHash", batchBlockHash, "versionedHashes", versionedHashes, "err", err)
return nil, nil
}
return payload, nil
}
Loading

0 comments on commit 3d2d427

Please sign in to comment.